From f74b82a4004ef70dfc9543f3af718d413ae633eb Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Sat, 30 Jan 2021 17:14:19 +0300 Subject: [PATCH] Explode Response to Frames --- .php_cs.dist | 11 +-- README.md | 2 +- docker-compose.yml | 2 +- src/Connection.php | 76 +++++++++++++---- src/Consumer.php | 26 +++++- src/Exception/AuthenticationRequired.php | 8 +- src/Exception/BadResponse.php | 15 ++++ src/Exception/ConnectionFail.php | 3 +- src/Exception/MessageAlreadyFinished.php | 5 +- src/Exception/NsqError.php | 8 +- src/Exception/NsqException.php | 4 +- src/Exception/NullReceived.php | 9 ++ src/Exception/UnexpectedResponse.php | 18 ---- src/Producer.php | 6 +- src/Protocol/Error.php | 23 ++++++ src/Protocol/ErrorType.php | 100 +++++++++++++++++++++++ src/Protocol/Frame.php | 16 ++++ src/{ => Protocol}/Message.php | 14 +++- src/Protocol/Response.php | 41 ++++++++++ src/Response.php | 87 -------------------- src/Subscriber.php | 3 +- tests/MessageTest.php | 2 +- tests/NsqTest.php | 4 +- tests/Protocol/ErrorTypeTest.php | 32 ++++++++ 24 files changed, 367 insertions(+), 148 deletions(-) create mode 100644 src/Exception/BadResponse.php create mode 100644 src/Exception/NullReceived.php delete mode 100644 src/Exception/UnexpectedResponse.php create mode 100644 src/Protocol/Error.php create mode 100644 src/Protocol/ErrorType.php create mode 100644 src/Protocol/Frame.php rename src/{ => Protocol}/Message.php (83%) create mode 100644 src/Protocol/Response.php delete mode 100644 src/Response.php create mode 100644 tests/Protocol/ErrorTypeTest.php diff --git a/.php_cs.dist b/.php_cs.dist index 9a7393c..a4cfc42 100644 --- a/.php_cs.dist +++ b/.php_cs.dist @@ -8,14 +8,15 @@ return (new PhpCsFixer\Config()) '@PhpCsFixer:risky' => true, '@PSR12' => true, '@PSR12:risky' => true, - 'declare_strict_types' => true, - 'php_unit_internal_class' => false, - 'php_unit_test_class_requires_covers' => false, - 'yoda_style' => true, - 'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'], 'blank_line_before_statement' => [ 'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try'] ], + 'declare_strict_types' => true, + 'php_unit_internal_class' => false, + 'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'], + 'php_unit_test_class_requires_covers' => false, + 'phpdoc_to_comment' => false, + 'yoda_style' => true, ]) ->setFinder( PhpCsFixer\Finder::create() diff --git a/README.md b/README.md index f9563c5..fd7d390 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,7 @@ $producer->dpub('topic', 'Deferred message', delay: 5000); ```php use Nsq\Consumer; -use Nsq\Message; +use Nsq\Protocol\Message; use Nsq\Subscriber; $consumer = new Consumer('tcp://nsqd:4150'); diff --git a/docker-compose.yml b/docker-compose.yml index c25701c..0fa9af6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ version: '3.7' services: nsqd: image: nsqio/nsq:v1.2.0 - command: /nsqd + command: /nsqd -log-level debug ports: - 4150:4150 - 4151:4151 diff --git a/src/Connection.php b/src/Connection.php index 455c7ac..55bf29d 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -8,7 +8,14 @@ use Nsq\Config\ClientConfig; use Nsq\Config\ConnectionConfig; use Nsq\Exception\AuthenticationRequired; use Nsq\Exception\ConnectionFail; -use Nsq\Exception\UnexpectedResponse; +use Nsq\Exception\NsqError; +use Nsq\Exception\BadResponse; +use Nsq\Exception\NsqException; +use Nsq\Exception\NullReceived; +use Nsq\Protocol\Error; +use Nsq\Protocol\Frame; +use Nsq\Protocol\Message; +use Nsq\Protocol\Response; use Nsq\Reconnect\ExponentialStrategy; use Nsq\Reconnect\ReconnectStrategy; use PHPinnacle\Buffer\ByteBuffer; @@ -77,20 +84,27 @@ abstract class Connection $body = json_encode($this->clientConfig, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT); - $response = $this->command('IDENTIFY', data: $body)->response(); - - $this->connectionConfig = ConnectionConfig::fromArray($response->toArray()); + $this->connectionConfig = ConnectionConfig::fromArray( + $this + ->command('IDENTIFY', data: $body) + ->readResponse() + ->toArray() + ); if ($this->connectionConfig->snappy || $this->connectionConfig->deflate) { - $this->response()->okOrFail(); + $this->checkIsOK(); } if ($this->connectionConfig->authRequired) { if (null === $this->clientConfig->authSecret) { - throw new AuthenticationRequired('NSQ requires authorization, set ClientConfig::$authSecret before connecting'); + throw new AuthenticationRequired(); } - $authResponse = $this->command('AUTH', data: $this->clientConfig->authSecret)->response()->toArray(); + $authResponse = $this + ->command('AUTH', data: $this->clientConfig->authSecret) + ->readResponse() + ->toArray() + ; $this->logger->info('Authorization response: '.http_build_query($authResponse)); } @@ -171,7 +185,7 @@ abstract class Connection // @codeCoverageIgnoreEnd } - public function receive(float $timeout = null): ?Response + protected function readFrame(float $timeout = null): ?Frame { $socket = $this->socket(); @@ -206,12 +220,23 @@ abstract class Connection $this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL)); - $response = new Response($buffer); + $frame = match ($type = $buffer->consumeUint32()) { + 0 => new Response($buffer->flush()), + 1 => new Error($buffer->flush()), + 2 => new Message( + timestamp: $buffer->consumeInt64(), + attempts: $buffer->consumeUint16(), + id: $buffer->consume(Bytes::BYTES_ID), + body: $buffer->flush(), + consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'), + ), + default => throw new NsqException('Unexpected frame type: '.$type) + }; - if ($response->isHeartBeat()) { + if ($frame instanceof Response && $frame->isHeartBeat()) { $this->command('NOP'); - return $this->receive( + return $this->readFrame( ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime ); } @@ -224,12 +249,35 @@ abstract class Connection } // @codeCoverageIgnoreEnd - return $response; + return $frame; } - protected function response(): Response + protected function checkIsOK(): void { - return $this->receive() ?? throw UnexpectedResponse::null(); + $response = $this->readResponse(); + + if (!$response->isOk()) { + throw new BadResponse($response); + } + } + + private function readResponse(): Response + { + $frame = $this->readFrame() ?? throw new NullReceived(); + + if ($frame instanceof Response) { + return $frame; + } + + if ($frame instanceof Error) { + if ($frame->type->terminateConnection) { + $this->disconnect(); + } + + throw new NsqError($frame); + } + + throw new NsqException('Unreachable statement.'); } private function socket(): Socket diff --git a/src/Consumer.php b/src/Consumer.php index 6a67901..4f9801c 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -4,6 +4,11 @@ declare(strict_types=1); namespace Nsq; +use Nsq\Exception\NsqError; +use Nsq\Exception\NsqException; +use Nsq\Protocol\Error; +use Nsq\Protocol\Message; + final class Consumer extends Connection { private int $rdy = 0; @@ -13,7 +18,26 @@ final class Consumer extends Connection */ public function sub(string $topic, string $channel): void { - $this->command('SUB', [$topic, $channel])->response()->okOrFail(); + $this->command('SUB', [$topic, $channel])->checkIsOK(); + } + + public function readMessage(): ?Message + { + $frame = $this->readFrame(); + + if ($frame instanceof Message || null === $frame) { + return $frame; + } + + if ($frame instanceof Error) { + if ($frame->type->terminateConnection) { + $this->disconnect(); + } + + throw new NsqError($frame); + } + + throw new NsqException('Unreachable statement.'); } /** diff --git a/src/Exception/AuthenticationRequired.php b/src/Exception/AuthenticationRequired.php index 0c493a5..9d21d42 100644 --- a/src/Exception/AuthenticationRequired.php +++ b/src/Exception/AuthenticationRequired.php @@ -4,8 +4,10 @@ declare(strict_types=1); namespace Nsq\Exception; -use RuntimeException; - -final class AuthenticationRequired extends RuntimeException implements NsqException +final class AuthenticationRequired extends NsqException { + public function __construct() + { + parent::__construct('NSQ requires authorization, set ClientConfig::$authSecret before connecting'); + } } diff --git a/src/Exception/BadResponse.php b/src/Exception/BadResponse.php new file mode 100644 index 0000000..6be3633 --- /dev/null +++ b/src/Exception/BadResponse.php @@ -0,0 +1,15 @@ +msg); + } +} diff --git a/src/Exception/ConnectionFail.php b/src/Exception/ConnectionFail.php index d6add25..a91db05 100644 --- a/src/Exception/ConnectionFail.php +++ b/src/Exception/ConnectionFail.php @@ -4,10 +4,9 @@ declare(strict_types=1); namespace Nsq\Exception; -use RuntimeException; use Throwable; -final class ConnectionFail extends RuntimeException implements NsqException +final class ConnectionFail extends NsqException { /** * @codeCoverageIgnore diff --git a/src/Exception/MessageAlreadyFinished.php b/src/Exception/MessageAlreadyFinished.php index e91b938..b50bbcc 100644 --- a/src/Exception/MessageAlreadyFinished.php +++ b/src/Exception/MessageAlreadyFinished.php @@ -4,10 +4,9 @@ declare(strict_types=1); namespace Nsq\Exception; -use Nsq\Message; -use RuntimeException; +use Nsq\Protocol\Message; -final class MessageAlreadyFinished extends RuntimeException implements NsqException +final class MessageAlreadyFinished extends NsqException { public static function finish(Message $message): self { diff --git a/src/Exception/NsqError.php b/src/Exception/NsqError.php index 9f355dd..44cc06d 100644 --- a/src/Exception/NsqError.php +++ b/src/Exception/NsqError.php @@ -4,8 +4,12 @@ declare(strict_types=1); namespace Nsq\Exception; -use RuntimeException; +use Nsq\Protocol\Error; -final class NsqError extends RuntimeException implements NsqException +final class NsqError extends NsqException { + public function __construct(Error $error) + { + parent::__construct($error->rawData); + } } diff --git a/src/Exception/NsqException.php b/src/Exception/NsqException.php index 291fb14..a81620b 100644 --- a/src/Exception/NsqException.php +++ b/src/Exception/NsqException.php @@ -4,8 +4,8 @@ declare(strict_types=1); namespace Nsq\Exception; -use Throwable; +use RuntimeException; -interface NsqException extends Throwable +class NsqException extends RuntimeException { } diff --git a/src/Exception/NullReceived.php b/src/Exception/NullReceived.php new file mode 100644 index 0000000..8ac0055 --- /dev/null +++ b/src/Exception/NullReceived.php @@ -0,0 +1,9 @@ +command('PUB', $topic, $body)->response()->okOrFail(); + $this->command('PUB', $topic, $body)->checkIsOK(); } /** @@ -31,7 +31,7 @@ final class Producer extends Connection return pack('N', \strlen($body)).$body; }, $bodies)); - $this->command('MPUB', $topic, $num.$mb)->response()->okOrFail(); + $this->command('MPUB', $topic, $num.$mb)->checkIsOK(); } /** @@ -39,6 +39,6 @@ final class Producer extends Connection */ public function dpub(string $topic, string $body, int $delay): void { - $this->command('DPUB', [$topic, $delay], $body)->response()->okOrFail(); + $this->command('DPUB', [$topic, $delay], $body)->checkIsOK(); } } diff --git a/src/Protocol/Error.php b/src/Protocol/Error.php new file mode 100644 index 0000000..a480f3f --- /dev/null +++ b/src/Protocol/Error.php @@ -0,0 +1,23 @@ +rawData) + Bytes::BYTES_TYPE); + + $this->type = new ErrorType(explode(' ', $this->rawData)[0]); + } +} diff --git a/src/Protocol/ErrorType.php b/src/Protocol/ErrorType.php new file mode 100644 index 0000000..828afc5 --- /dev/null +++ b/src/Protocol/ErrorType.php @@ -0,0 +1,100 @@ +terminateConnection = \constant('self::'.$this->type) ?? self::E_INVALID; + } +} diff --git a/src/Protocol/Frame.php b/src/Protocol/Frame.php new file mode 100644 index 0000000..b366a7c --- /dev/null +++ b/src/Protocol/Frame.php @@ -0,0 +1,16 @@ +timestamp = $timestamp; $this->attempts = $attempts; $this->id = $id; diff --git a/src/Protocol/Response.php b/src/Protocol/Response.php new file mode 100644 index 0000000..4340ab8 --- /dev/null +++ b/src/Protocol/Response.php @@ -0,0 +1,41 @@ +msg) + Bytes::BYTES_TYPE); + } + + public function isOk(): bool + { + return self::OK === $this->msg; + } + + public function isHeartBeat(): bool + { + return self::HEARTBEAT === $this->msg; + } + + /** + * @return array + */ + public function toArray(): array + { + return json_decode($this->msg, true, flags: JSON_THROW_ON_ERROR); + } +} diff --git a/src/Response.php b/src/Response.php deleted file mode 100644 index a9e2d4d..0000000 --- a/src/Response.php +++ /dev/null @@ -1,87 +0,0 @@ -type = $buffer->consumeUint32(); - $this->buffer = $buffer; - } - - public function okOrFail(): void - { - if (self::TYPE_ERROR === $this->type) { - throw new NsqError($this->buffer->bytes()); - } - - if (self::TYPE_RESPONSE !== $this->type) { - // @codeCoverageIgnoreStart - throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type)); - // @codeCoverageIgnoreEnd - } - - if (self::OK !== $this->buffer->bytes()) { - // @codeCoverageIgnoreStart - throw new UnexpectedResponse(sprintf('OK response expected, but "%s" received.', $this->buffer->bytes())); - // @codeCoverageIgnoreEnd - } - } - - public function isHeartBeat(): bool - { - return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes(); - } - - /** - * @phpstan-ignore-next-line - */ - public function toArray(): array - { - if (self::TYPE_RESPONSE !== $this->type) { - // @codeCoverageIgnoreStart - throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type)); - // @codeCoverageIgnoreEnd - } - - return json_decode($this->buffer->bytes(), true, flags: JSON_THROW_ON_ERROR); - } - - public function toMessage(Consumer $reader): Message - { - if (self::TYPE_MESSAGE !== $this->type) { - // @codeCoverageIgnoreStart - throw new UnexpectedResponse(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type)); - // @codeCoverageIgnoreEnd - } - - $buffer = new ByteBuffer($this->buffer->bytes()); - - $timestamp = $buffer->consumeInt64(); - $attempts = $buffer->consumeUint16(); - $id = $buffer->consume(Bytes::BYTES_ID); - $body = $buffer->flush(); - - return new Message($timestamp, $attempts, $id, $body, $reader); - } -} diff --git a/src/Subscriber.php b/src/Subscriber.php index 5707484..bd5f195 100644 --- a/src/Subscriber.php +++ b/src/Subscriber.php @@ -5,6 +5,7 @@ declare(strict_types=1); namespace Nsq; use Generator; +use Nsq\Protocol\Message; final class Subscriber { @@ -27,7 +28,7 @@ final class Subscriber while (true) { $this->reader->rdy(1); - $command = yield $this->reader->receive()?->toMessage($this->reader); + $command = yield $this->reader->readMessage(); if (self::STOP === $command) { break; diff --git a/tests/MessageTest.php b/tests/MessageTest.php index b67864f..7489018 100644 --- a/tests/MessageTest.php +++ b/tests/MessageTest.php @@ -4,7 +4,7 @@ declare(strict_types=1); use Nsq\Consumer; use Nsq\Exception\MessageAlreadyFinished; -use Nsq\Message; +use Nsq\Protocol\Message; use PHPUnit\Framework\TestCase; final class MessageTest extends TestCase diff --git a/tests/NsqTest.php b/tests/NsqTest.php index fbecdd9..b7b6e06 100644 --- a/tests/NsqTest.php +++ b/tests/NsqTest.php @@ -4,8 +4,8 @@ declare(strict_types=1); use Nsq\Config\ClientConfig; use Nsq\Consumer; -use Nsq\Message; use Nsq\Producer; +use Nsq\Protocol\Message; use Nsq\Subscriber; use Nyholm\NSA; use PHPUnit\Framework\TestCase; @@ -20,7 +20,7 @@ final class NsqTest extends TestCase $consumer = new Consumer( address: 'tcp://localhost:4150', clientConfig: new ClientConfig( - heartbeatInterval: 1000, + heartbeatInterval: 3000, readTimeout: 1, ), ); diff --git a/tests/Protocol/ErrorTypeTest.php b/tests/Protocol/ErrorTypeTest.php new file mode 100644 index 0000000..2a3ba5a --- /dev/null +++ b/tests/Protocol/ErrorTypeTest.php @@ -0,0 +1,32 @@ +terminateConnection); + } + + /** + * @return Generator> + */ + public function data(): Generator + { + foreach ((new ReflectionClass(ErrorType::class))->getConstants() as $constant => $isTerminated) { + yield $constant => [$constant, $isTerminated]; + } + } +}