NSQ Connection can subscribe only for one topic

This commit is contained in:
2021-01-30 18:13:50 +03:00
parent a7b847146a
commit 1a24efacfb
4 changed files with 30 additions and 13 deletions

View File

@ -68,10 +68,14 @@ use Nsq\Consumer;
use Nsq\Protocol\Message; use Nsq\Protocol\Message;
use Nsq\Subscriber; use Nsq\Subscriber;
$consumer = new Consumer('tcp://nsqd:4150'); $consumer = new Consumer(
$subscriber = new Subscriber($consumer); topic: 'topic',
channel: 'channel',
address: 'tcp://nsqd:4150',
);
$generator = (new Subscriber($consumer))->run();
$generator = $subscriber->subscribe('topic', 'channel');
foreach ($generator as $message) { foreach ($generator as $message) {
if ($message instanceof Message) { if ($message instanceof Message) {
$payload = $message->body; $payload = $message->body;

View File

@ -4,21 +4,34 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Nsq\Config\ClientConfig;
use Nsq\Exception\NsqError; use Nsq\Exception\NsqError;
use Nsq\Exception\NsqException; use Nsq\Exception\NsqException;
use Nsq\Protocol\Error; use Nsq\Protocol\Error;
use Nsq\Protocol\Message; use Nsq\Protocol\Message;
use Nsq\Reconnect\ReconnectStrategy;
use Psr\Log\LoggerInterface;
final class Consumer extends Connection final class Consumer extends Connection
{ {
private int $rdy = 0; private int $rdy = 0;
/** public function __construct(
* Subscribe to a topic/channel. private string $topic,
*/ private string $channel,
public function sub(string $topic, string $channel): void string $address,
ClientConfig $clientConfig = null,
ReconnectStrategy $reconnectStrategy = null,
LoggerInterface $logger = null
) {
parent::__construct($address, $clientConfig, $reconnectStrategy, $logger);
}
public function connect(): void
{ {
$this->command('SUB', [$topic, $channel])->checkIsOK(); parent::connect();
$this->command('SUB', [$this->topic, $this->channel])->checkIsOK();
} }
public function readMessage(): ?Message public function readMessage(): ?Message

View File

@ -19,12 +19,10 @@ final class Subscriber
} }
/** /**
* @psalm-return Generator<int, Message|float|null, int|float|null, void> * @psalm-return Generator<int, Message|float|null, int|null, void>
*/ */
public function subscribe(string $topic, string $channel): Generator public function run(): Generator
{ {
$this->reader->sub($topic, $channel);
while (true) { while (true) {
$this->reader->rdy(1); $this->reader->rdy(1);

View File

@ -18,6 +18,8 @@ final class NsqTest extends TestCase
$producer->pub(__FUNCTION__, __FUNCTION__); $producer->pub(__FUNCTION__, __FUNCTION__);
$consumer = new Consumer( $consumer = new Consumer(
topic: 'test',
channel: 'test',
address: 'tcp://localhost:4150', address: 'tcp://localhost:4150',
clientConfig: new ClientConfig( clientConfig: new ClientConfig(
heartbeatInterval: 3000, heartbeatInterval: 3000,
@ -25,7 +27,7 @@ final class NsqTest extends TestCase
), ),
); );
$subscriber = new Subscriber($consumer); $subscriber = new Subscriber($consumer);
$generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__); $generator = $subscriber->run();
/** @var null|Message $message */ /** @var null|Message $message */
$message = $generator->current(); $message = $generator->current();