From e1cca2d3eb7041a8bdb21d6241cac26fbca9324d Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Sat, 30 Jan 2021 18:22:34 +0300 Subject: [PATCH] Remove Subscriber --- README.md | 13 +++++-------- src/Consumer.php | 19 +++++++++++++++++++ src/Subscriber.php | 38 -------------------------------------- tests/NsqTest.php | 6 ++---- 4 files changed, 26 insertions(+), 50 deletions(-) delete mode 100644 src/Subscriber.php diff --git a/README.md b/README.md index dba3b90..4e1154a 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ Features Usage ----- -### Publish +### Producer ```php use Nsq\Producer; @@ -61,12 +61,11 @@ $producer->mpub('topic', [ $producer->dpub('topic', 'Deferred message', delay: 5000); ``` -### Subscription +### Consumer ```php use Nsq\Consumer; use Nsq\Protocol\Message; -use Nsq\Subscriber; $consumer = new Consumer( topic: 'topic', @@ -74,7 +73,8 @@ $consumer = new Consumer( address: 'tcp://nsqd:4150', ); -$generator = (new Subscriber($consumer))->run(); +// Simple blocking loop based on generator +$generator = $consumer->generator(); foreach ($generator as $message) { if ($message instanceof Message) { @@ -89,12 +89,9 @@ foreach ($generator as $message) { // In case of nothing received during timeout generator will return NULL // Here we can do something between messages, like pcntl_signal_dispatch() - - // We can also communicate with generator through send - // for example: // Gracefully close connection (loop will be ended) - $generator->send(Subscriber::STOP); + $generator->send(0); } ``` diff --git a/src/Consumer.php b/src/Consumer.php index d94f3a6..9a10813 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace Nsq; +use Generator; use Nsq\Config\ClientConfig; use Nsq\Exception\NsqError; use Nsq\Exception\NsqException; @@ -34,6 +35,24 @@ final class Consumer extends Connection $this->command('SUB', [$this->topic, $this->channel])->checkIsOK(); } + /** + * @psalm-return Generator + */ + public function generator(): Generator + { + while (true) { + $this->rdy(1); + + $command = yield $this->readMessage(); + + if (0 === $command) { + break; + } + } + + $this->disconnect(); + } + public function readMessage(): ?Message { $frame = $this->readFrame(); diff --git a/src/Subscriber.php b/src/Subscriber.php deleted file mode 100644 index 9cb190e..0000000 --- a/src/Subscriber.php +++ /dev/null @@ -1,38 +0,0 @@ -reader = $reader; - } - - /** - * @psalm-return Generator - */ - public function run(): Generator - { - while (true) { - $this->reader->rdy(1); - - $command = yield $this->reader->readMessage(); - - if (self::STOP === $command) { - break; - } - } - - $this->reader->disconnect(); - } -} diff --git a/tests/NsqTest.php b/tests/NsqTest.php index cf89b6b..b7c9bd4 100644 --- a/tests/NsqTest.php +++ b/tests/NsqTest.php @@ -6,7 +6,6 @@ use Nsq\Config\ClientConfig; use Nsq\Consumer; use Nsq\Producer; use Nsq\Protocol\Message; -use Nsq\Subscriber; use Nyholm\NSA; use PHPUnit\Framework\TestCase; @@ -26,8 +25,7 @@ final class NsqTest extends TestCase readTimeout: 1, ), ); - $subscriber = new Subscriber($consumer); - $generator = $subscriber->run(); + $generator = $consumer->generator(); /** @var null|Message $message */ $message = $generator->current(); @@ -88,7 +86,7 @@ final class NsqTest extends TestCase $message->finish(); self::assertTrue($consumer->isReady()); - $generator->send(Subscriber::STOP); + $generator->send(0); self::assertFalse($consumer->isReady()); } }