diff --git a/src/Bytes.php b/src/Bytes.php new file mode 100644 index 0000000..c8e959e --- /dev/null +++ b/src/Bytes.php @@ -0,0 +1,14 @@ +logger->info('Feature Negotiation: '.http_build_query($this->features)); - $this->send('IDENTIFY '.PHP_EOL.$size.$body)->expectResponse(self::OK); + $this->send('IDENTIFY '.PHP_EOL.$size.$body)->getResponse()->okOrFail(); } /** @@ -89,7 +76,7 @@ abstract class Connection } try { - $this->send('CLS'.PHP_EOL)->expectResponse(self::CLOSE_WAIT); + $this->send('CLS'.PHP_EOL); if (null !== $this->socket) { $this->socket->close(); @@ -135,7 +122,7 @@ abstract class Connection return $this; } - protected function receive(float $timeout = 0): ?ByteBuffer + protected function receive(float $timeout = 0): ?Response { $socket = $this->socket(); @@ -143,32 +130,20 @@ abstract class Connection return null; } - $size = (new ByteBuffer($socket->read(self::BYTES_SIZE)))->consumeUint32(); + $size = (new ByteBuffer($socket->read(Bytes::BYTES_SIZE)))->consumeUint32(); - return new ByteBuffer($socket->read($size)); + return new Response(new ByteBuffer($socket->read($size))); } - protected function expectResponse(string $expected): void + protected function getResponse(): Response { - $buffer = $this->receive(0.1); - if (null === $buffer) { - throw new Exception('Success response was expected, but null received.'); + $response = $this->receive(0.1); + + if (null === $response) { + throw new Exception('Response was expected, but null received.'); } - $type = $buffer->consumeUint32(); - $response = $buffer->flush(); - - if (self::TYPE_ERROR === $type) { - throw new Exception($response); - } - - if (self::TYPE_RESPONSE !== $type) { - throw new Exception(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $type)); - } - - if ($expected !== $response) { - throw new Exception(sprintf('"%s" response expected, but "%s" received.', $expected, $response)); - } + return $response; } private function socket(): Socket diff --git a/src/Reader.php b/src/Reader.php index e123d64..acdc610 100644 --- a/src/Reader.php +++ b/src/Reader.php @@ -13,7 +13,7 @@ class Reader extends Connection { $buffer = sprintf('SUB %s %s', $topic, $channel).PHP_EOL; - $this->send($buffer)->expectResponse(self::OK); + $this->send($buffer)->getResponse()->okOrFail(); } /** diff --git a/src/Response.php b/src/Response.php new file mode 100644 index 0000000..2e1a3c6 --- /dev/null +++ b/src/Response.php @@ -0,0 +1,62 @@ +type = $buffer->consumeUint32(); + $this->buffer = $buffer; + } + + public function okOrFail(): void + { + if (self::TYPE_ERROR === $this->type) { + throw new Exception($this->buffer->bytes()); + } + + if (self::TYPE_RESPONSE !== $this->type) { + throw new Exception(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type)); + } + + if (self::OK !== $this->buffer->bytes()) { + throw new Exception(sprintf('OK response expected, but "%s" received.', $this->buffer->bytes())); + } + } + + public function isHeartBeat(): bool + { + return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes(); + } + + public function toMessage(): Message + { + if (self::TYPE_MESSAGE !== $this->type) { + throw new Exception(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type)); + } + + $buffer = new ByteBuffer($this->buffer->bytes()); + + $timestamp = $buffer->consumeInt64(); + $attempts = $buffer->consumeUint16(); + $id = $buffer->consume(Bytes::BYTES_ID); + $body = $buffer->flush(); + + return new Message($timestamp, $attempts, $id, $body); + } +} diff --git a/src/Subscriber.php b/src/Subscriber.php index 2791e77..20d1534 100644 --- a/src/Subscriber.php +++ b/src/Subscriber.php @@ -50,40 +50,19 @@ final class Subscriber extends Reader { $deadline = microtime(true) + $timeout; - $buffer = $this->receive($timeout); - if (null === $buffer) { + $response = $this->receive($timeout); + if (null === $response) { return null; } - $type = $buffer->consumeUint32(); + if ($response->isHeartBeat()) { + $this->send('NOP'.PHP_EOL); - if (self::TYPE_RESPONSE === $type) { - $response = $buffer->flush(); - - if (self::HEARTBEAT === $response) { - $this->send('NOP'.PHP_EOL); - - return $this->consume( - ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime - ); - } - - throw new Exception(sprintf('Unexpected response: %s', $response)); + return $this->consume( + ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime + ); } - if (self::TYPE_ERROR === $type) { - throw new Exception(sprintf('NSQ return error: "%s"', $buffer->flush())); - } - - if (self::TYPE_MESSAGE !== $type) { - throw new Exception(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $type)); - } - - $timestamp = $buffer->consumeInt64(); - $attempts = $buffer->consumeUint16(); - $id = $buffer->consume(self::BYTES_ID); - $body = $buffer->flush(); - - return new Message($timestamp, $attempts, $id, $body); + return $response->toMessage(); } } diff --git a/src/Writer.php b/src/Writer.php index d3535e6..d0da99b 100644 --- a/src/Writer.php +++ b/src/Writer.php @@ -21,7 +21,7 @@ final class Writer extends Connection $buffer = 'PUB '.$topic.PHP_EOL.$size.$body; - $this->send($buffer)->expectResponse(self::OK); + $this->send($buffer)->getResponse()->okOrFail(); } /** @@ -41,7 +41,7 @@ final class Writer extends Connection $buffer = 'MPUB '.$topic.PHP_EOL.$size.$num.$mb; - $this->send($buffer)->expectResponse(self::OK); + $this->send($buffer)->getResponse()->okOrFail(); } /** @@ -53,6 +53,6 @@ final class Writer extends Connection $buffer = sprintf('DPUB %s %s', $topic, $deferTime).PHP_EOL.$size.$body; - $this->send($buffer)->expectResponse(self::OK); + $this->send($buffer)->getResponse()->okOrFail(); } }