diff --git a/README.md b/README.md index fd7d390..dba3b90 100644 --- a/README.md +++ b/README.md @@ -68,10 +68,14 @@ use Nsq\Consumer; use Nsq\Protocol\Message; use Nsq\Subscriber; -$consumer = new Consumer('tcp://nsqd:4150'); -$subscriber = new Subscriber($consumer); +$consumer = new Consumer( + topic: 'topic', + channel: 'channel', + address: 'tcp://nsqd:4150', +); + +$generator = (new Subscriber($consumer))->run(); -$generator = $subscriber->subscribe('topic', 'channel'); foreach ($generator as $message) { if ($message instanceof Message) { $payload = $message->body; diff --git a/src/Consumer.php b/src/Consumer.php index 4f9801c..d94f3a6 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -4,21 +4,34 @@ declare(strict_types=1); namespace Nsq; +use Nsq\Config\ClientConfig; use Nsq\Exception\NsqError; use Nsq\Exception\NsqException; use Nsq\Protocol\Error; use Nsq\Protocol\Message; +use Nsq\Reconnect\ReconnectStrategy; +use Psr\Log\LoggerInterface; final class Consumer extends Connection { private int $rdy = 0; - /** - * Subscribe to a topic/channel. - */ - public function sub(string $topic, string $channel): void + public function __construct( + private string $topic, + private string $channel, + string $address, + ClientConfig $clientConfig = null, + ReconnectStrategy $reconnectStrategy = null, + LoggerInterface $logger = null + ) { + parent::__construct($address, $clientConfig, $reconnectStrategy, $logger); + } + + public function connect(): void { - $this->command('SUB', [$topic, $channel])->checkIsOK(); + parent::connect(); + + $this->command('SUB', [$this->topic, $this->channel])->checkIsOK(); } public function readMessage(): ?Message diff --git a/src/Subscriber.php b/src/Subscriber.php index bd5f195..9cb190e 100644 --- a/src/Subscriber.php +++ b/src/Subscriber.php @@ -19,12 +19,10 @@ final class Subscriber } /** - * @psalm-return Generator + * @psalm-return Generator */ - public function subscribe(string $topic, string $channel): Generator + public function run(): Generator { - $this->reader->sub($topic, $channel); - while (true) { $this->reader->rdy(1); diff --git a/tests/NsqTest.php b/tests/NsqTest.php index b7b6e06..cf89b6b 100644 --- a/tests/NsqTest.php +++ b/tests/NsqTest.php @@ -18,6 +18,8 @@ final class NsqTest extends TestCase $producer->pub(__FUNCTION__, __FUNCTION__); $consumer = new Consumer( + topic: 'test', + channel: 'test', address: 'tcp://localhost:4150', clientConfig: new ClientConfig( heartbeatInterval: 3000, @@ -25,7 +27,7 @@ final class NsqTest extends TestCase ), ); $subscriber = new Subscriber($consumer); - $generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__); + $generator = $subscriber->run(); /** @var null|Message $message */ $message = $generator->current();