From 0a926ce6076bf5e2ab68b0a80726c8625e7f2c13 Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Wed, 20 Jan 2021 16:02:06 +0300 Subject: [PATCH] Refactoring: Extend Reader and Writer from Connection --- src/Connection.php | 93 +++++++++++++++++++++++++++++++++++----------- src/Envelope.php | 53 +++++++++++++------------- src/Reader.php | 59 ++++------------------------- src/Subscriber.php | 49 ++++-------------------- src/Writer.php | 21 ++++------- tests/NsqTest.php | 11 ++---- 6 files changed, 124 insertions(+), 162 deletions(-) diff --git a/src/Connection.php b/src/Connection.php index a2f48e8..548fe13 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -10,13 +10,17 @@ use Socket\Raw\Factory; use Socket\Raw\Socket; use Throwable; use function json_encode; +use function microtime; use function pack; use function sprintf; use const JSON_FORCE_OBJECT; use const JSON_THROW_ON_ERROR; use const PHP_EOL; -final class Connection +/** + * @internal + */ +abstract class Connection { private const OK = 'OK'; private const HEARTBEAT = '_heartbeat_'; @@ -31,27 +35,47 @@ final class Connection private const BYTES_ID = 16; private const MAGIC_V2 = ' V2'; - public Socket $socket; + public ?Socket $socket = null; public bool $closed = false; - private function __construct(Socket $socket) + private Config $config; + + public function __construct(string $address) { - $this->socket = $socket; + $this->config = new Config($address); } /** * @psalm-suppress UnsafeInstantiation - * - * @return static */ - public static function connect(Config $config): self + public function connect(): void { - $socket = (new Factory())->createClient($config->address); - $socket->write(self::MAGIC_V2); + $this->socket = (new Factory())->createClient($this->config->address); + $this->socket->write(self::MAGIC_V2); + } - // @phpstan-ignore-next-line - return new self($socket); + /** + * Cleanly close your connection (no more messages are sent). + */ + public function disconnect(): void + { + if ($this->closed) { + return; + } + + try { + $this->write('CLS'.PHP_EOL); + $this->consume(); // receive CLOSE_WAIT + + if (null !== $this->socket) { + $this->socket->close(); + } + } catch (Throwable $e) { + // Not interested + } + + $this->closed = true; } /** @@ -59,7 +83,7 @@ final class Connection * * @psalm-suppress PossiblyFalseOperand */ - public function identify(array $arr): string + protected function identify(array $arr): string { $body = json_encode($arr, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT); $size = pack('N', \strlen($body)); @@ -70,21 +94,22 @@ final class Connection /** * @psalm-suppress PossiblyFalseOperand */ - public function auth(string $secret): string + protected function auth(string $secret): string { $size = pack('N', \strlen($secret)); return 'AUTH'.PHP_EOL.$size.$secret; } + /** + * @internal + */ public function write(string $buffer): void { - if ($this->closed) { - throw new LogicException('This connection is closed, create new one.'); - } + $socket = $this->socket(); try { - $this->socket->write($buffer); + $socket->write($buffer); } catch (Throwable $e) { $this->closed = true; @@ -92,9 +117,15 @@ final class Connection } } - public function read(): ?Message + protected function consume(?float $timeout = 0): ?Message { - $socket = $this->socket; + $deadline = microtime(true) + ($timeout ?? 0); + + $socket = $this->socket(); + + if (false === $socket->selectRead($timeout)) { + return null; + } $buffer = new ByteBuffer($socket->read(self::BYTES_SIZE + self::BYTES_TYPE)); $size = $buffer->consumeUint32(); @@ -105,14 +136,21 @@ final class Connection if (self::TYPE_RESPONSE === $type) { $response = $buffer->consume($size - self::BYTES_TYPE); + $isInternalMessage = false; if (self::OK === $response || self::CLOSE_WAIT === $response) { - return null; + $isInternalMessage = true; } if (self::HEARTBEAT === $response) { $socket->write('NOP'.PHP_EOL); - return null; + $isInternalMessage = true; + } + + if ($isInternalMessage) { + return $this->consume( + ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime + ); } throw new LogicException(sprintf('Unexpected response from nsq: "%s"', $response)); @@ -133,4 +171,17 @@ final class Connection return new Message($timestamp, $attempts, $id, $body); } + + private function socket(): Socket + { + if ($this->closed) { + throw new LogicException('This connection is closed, create new one.'); + } + + if (null === $this->socket) { + $this->connect(); + } + + return $this->socket; + } } diff --git a/src/Envelope.php b/src/Envelope.php index 85cbc90..b13cd87 100644 --- a/src/Envelope.php +++ b/src/Envelope.php @@ -4,48 +4,51 @@ declare(strict_types=1); namespace Nsq; -/** - * @psalm-immutable - */ +use LogicException; + final class Envelope { + /** + * @psalm-readonly + */ public Message $message; - /** - * @var callable - */ - private $acknowledge; + private bool $finished = false; - /** - * @var callable - */ - private $requeue; + private Reader $connection; - /** - * @var callable - */ - private $touching; - - public function __construct(Message $message, callable $ack, callable $req, callable $touch) + public function __construct(Message $message, Reader $connection) { $this->message = $message; - $this->acknowledge = $ack; - $this->requeue = $req; - $this->touching = $touch; + $this->connection = $connection; } - public function ack(): void + public function finish(): void { - \call_user_func($this->acknowledge); + if ($this->finished) { + throw new LogicException('Can\'t finish message as it already finished.'); + } + + $this->connection->fin($this->message->id); + $this->finished = true; } - public function retry(int $timeout): void + public function requeue(int $timeout): void { - \call_user_func($this->requeue, $timeout); + if ($this->finished) { + throw new LogicException('Can\'t requeue message as it already finished.'); + } + + $this->connection->req($this->message->id, $timeout); + $this->finished = true; } public function touch(): void { - \call_user_func($this->touching); + if ($this->finished) { + throw new LogicException('Can\'t touch message as it already finished.'); + } + + $this->connection->touch($this->message->id); } } diff --git a/src/Reader.php b/src/Reader.php index fcbe16f..adf15a2 100644 --- a/src/Reader.php +++ b/src/Reader.php @@ -4,24 +4,8 @@ declare(strict_types=1); namespace Nsq; -use Throwable; -use function sprintf; -use const PHP_EOL; - -class Reader +class Reader extends Connection { - private Connection $connection; - - public function __construct(Connection $connection) - { - $this->connection = $connection; - } - - public function __destruct() - { - $this->close(); - } - /** * Subscribe to a topic/channel. */ @@ -29,8 +13,8 @@ class Reader { $buffer = sprintf('SUB %s %s', $topic, $channel).PHP_EOL; - $this->connection->write($buffer); - $this->connection->read(); + $this->write($buffer); + $this->consume(); } /** @@ -38,7 +22,7 @@ class Reader */ public function rdy(int $count): void { - $this->connection->write('RDY '.$count.PHP_EOL); + $this->write('RDY '.$count.PHP_EOL); } /** @@ -46,7 +30,7 @@ class Reader */ public function fin(string $id): void { - $this->connection->write('FIN '.$id.PHP_EOL); + $this->write('FIN '.$id.PHP_EOL); } /** @@ -57,7 +41,7 @@ class Reader */ public function req(string $id, int $timeout): void { - $this->connection->write(sprintf('REQ %s %s', $id, $timeout).PHP_EOL); + $this->write(sprintf('REQ %s %s', $id, $timeout).PHP_EOL); } /** @@ -65,35 +49,6 @@ class Reader */ public function touch(string $id): void { - $this->connection->write('TOUCH '.$id.PHP_EOL); - } - - public function consume(?float $timeout = null): ?Message - { - if (false === $this->connection->socket->selectRead($timeout)) { - return null; - } - - return $this->connection->read() ?? $this->consume(0); - } - - /** - * Cleanly close your connection (no more messages are sent). - */ - public function close(): void - { - if ($this->connection->closed) { - return; - } - - $this->connection->closed = true; - - $this->connection->socket->write('CLS'.PHP_EOL); - $this->connection->read(); - - try { - $this->connection->socket->close(); - } catch (Throwable $e) { - } + $this->write('TOUCH '.$id.PHP_EOL); } } diff --git a/src/Subscriber.php b/src/Subscriber.php index 1aeef57..32807cc 100644 --- a/src/Subscriber.php +++ b/src/Subscriber.php @@ -5,28 +5,19 @@ declare(strict_types=1); namespace Nsq; use Generator; -use LogicException; -final class Subscriber +final class Subscriber extends Reader { - private Reader $reader; - - public function __construct(Reader $reader) - { - $this->reader = $reader; - } - /** * @psalm-return Generator */ public function subscribe(string $topic, string $channel, ?float $timeout = 0): Generator { - $reader = $this->reader; - $reader->sub($topic, $channel); - $reader->rdy(1); + $this->sub($topic, $channel); + $this->rdy(1); while (true) { - $message = $reader->consume($timeout); + $message = $this->consume($timeout); if (null === $message) { if (true === yield null) { @@ -36,39 +27,13 @@ final class Subscriber continue; } - $finished = false; - $envelop = new Envelope( - $message, - static function () use ($reader, $message, &$finished): void { - if ($finished) { - throw new LogicException('Can\'t ack, message already finished.'); - } - - $finished = true; - - $reader->fin($message->id); - }, - static function (int $timeout) use ($reader, $message, &$finished): void { - if ($finished) { - throw new LogicException('Can\'t retry, message already finished.'); - } - - $finished = true; - - $reader->req($message->id, $timeout); - }, - static function () use ($reader, $message): void { - $reader->touch($message->id); - }, - ); - - if (true === yield $envelop) { + if (true === yield new Envelope($message, $this)) { break; } - $reader->rdy(1); + $this->rdy(1); } - $reader->close(); + $this->disconnect(); } } diff --git a/src/Writer.php b/src/Writer.php index 2c9a182..33be17e 100644 --- a/src/Writer.php +++ b/src/Writer.php @@ -10,15 +10,8 @@ use function pack; use function sprintf; use const PHP_EOL; -final class Writer +final class Writer extends Connection { - private Connection $connection; - - public function __construct(Connection $connection) - { - $this->connection = $connection; - } - /** * @psalm-suppress PossiblyFalseOperand */ @@ -28,8 +21,8 @@ final class Writer $buffer = 'PUB '.$topic.PHP_EOL.$size.$body; - $this->connection->write($buffer); - $this->connection->read(); + $this->write($buffer); + $this->consume(); } /** @@ -49,8 +42,8 @@ final class Writer $buffer = 'MPUB '.$topic.PHP_EOL.$size.$num.$mb; - $this->connection->write($buffer); - $this->connection->read(); + $this->write($buffer); + $this->consume(); } /** @@ -62,7 +55,7 @@ final class Writer $buffer = sprintf('DPUB %s %s', $topic, $deferTime).PHP_EOL.$size.$body; - $this->connection->write($buffer); - $this->connection->read(); + $this->write($buffer); + $this->consume(); } } diff --git a/tests/NsqTest.php b/tests/NsqTest.php index 7bde73b..6d3d96c 100644 --- a/tests/NsqTest.php +++ b/tests/NsqTest.php @@ -2,10 +2,7 @@ declare(strict_types=1); -use Nsq\Config; -use Nsq\Connection; use Nsq\Envelope; -use Nsq\Reader; use Nsq\Subscriber; use Nsq\Writer; use PHPUnit\Framework\TestCase; @@ -14,12 +11,10 @@ final class NsqTest extends TestCase { public function test(): void { - $config = new Config('tcp://localhost:4150'); - - $writer = new Writer(Connection::connect($config)); + $writer = new Writer('tcp://localhost:4150'); $writer->pub(__FUNCTION__, __FUNCTION__); - $subscriber = new Subscriber(new Reader(Connection::connect($config))); + $subscriber = new Subscriber('tcp://localhost:4150'); $generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__, 1); $envelope = $generator->current(); @@ -27,7 +22,7 @@ final class NsqTest extends TestCase static::assertInstanceOf(Envelope::class, $envelope); /** @var Envelope $envelope */ static::assertSame(__FUNCTION__, $envelope->message->body); - $envelope->ack(); + $envelope->finish(); $generator->next(); static::assertNull($generator->current());