From 7873a9f010c7163c738ae7afd07d8f0d747f415f Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Sat, 23 Jan 2021 01:24:35 +0300 Subject: [PATCH] Receive Reader as dependency in Subscriber instead of extending it --- src/Connection.php | 2 +- src/Reader.php | 5 +++++ src/Subscriber.php | 21 ++++++++++++++------- tests/NsqTest.php | 8 +++++--- 4 files changed, 25 insertions(+), 11 deletions(-) diff --git a/src/Connection.php b/src/Connection.php index ffd2559..11cbc70 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -122,7 +122,7 @@ abstract class Connection return $this; } - protected function receive(float $timeout = 0): ?Response + public function receive(float $timeout = 0): ?Response { $socket = $this->socket(); diff --git a/src/Reader.php b/src/Reader.php index acdc610..d884e68 100644 --- a/src/Reader.php +++ b/src/Reader.php @@ -50,4 +50,9 @@ class Reader extends Connection { $this->send('TOUCH '.$id.PHP_EOL); } + + public function nop(): void + { + $this->send('NOP'.PHP_EOL); + } } diff --git a/src/Subscriber.php b/src/Subscriber.php index 20d1534..f248fee 100644 --- a/src/Subscriber.php +++ b/src/Subscriber.php @@ -9,24 +9,31 @@ use function get_debug_type; use function microtime; use function sprintf; -final class Subscriber extends Reader +final class Subscriber { public const STOP = 0; public const CHANGE_TIMEOUT = 1; + private Reader $reader; + + public function __construct(Reader $reader) + { + $this->reader = $reader; + } + /** * @psalm-return Generator */ public function subscribe(string $topic, string $channel, float $timeout = 0): Generator { - $this->sub($topic, $channel); + $this->reader->sub($topic, $channel); while (true) { - $this->rdy(1); + $this->reader->rdy(1); $message = $this->consume($timeout); - $command = yield null === $message ? null : new Envelope($message, $this); + $command = yield null === $message ? null : new Envelope($message, $this->reader); if (self::STOP === $command) { break; @@ -43,20 +50,20 @@ final class Subscriber extends Reader } } - $this->disconnect(); + $this->reader->disconnect(); } private function consume(float $timeout): ?Message { $deadline = microtime(true) + $timeout; - $response = $this->receive($timeout); + $response = $this->reader->receive($timeout); if (null === $response) { return null; } if ($response->isHeartBeat()) { - $this->send('NOP'.PHP_EOL); + $this->reader->nop(); return $this->consume( ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime diff --git a/tests/NsqTest.php b/tests/NsqTest.php index e4aee5c..221f44c 100644 --- a/tests/NsqTest.php +++ b/tests/NsqTest.php @@ -3,6 +3,7 @@ declare(strict_types=1); use Nsq\Envelope; +use Nsq\Reader; use Nsq\Subscriber; use Nsq\Writer; use Nsq\Exception; @@ -15,7 +16,8 @@ final class NsqTest extends TestCase $writer = new Writer('tcp://localhost:4150'); $writer->pub(__FUNCTION__, __FUNCTION__); - $subscriber = new Subscriber('tcp://localhost:4150'); + $reader = new Reader('tcp://localhost:4150'); + $subscriber = new Subscriber($reader); $generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__, 1); /** @var null|Envelope $envelope */ @@ -70,9 +72,9 @@ final class NsqTest extends TestCase static::assertSame('Deferred message.', $envelope->message->body); $envelope->finish(); - static::assertFalse($subscriber->isClosed()); + static::assertFalse($reader->isClosed()); $generator->send(Subscriber::STOP); - static::assertTrue($subscriber->isClosed()); + static::assertTrue($reader->isClosed()); } /**