diff --git a/src/Connection.php b/src/Connection.php index 548fe13..01f4e95 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -4,13 +4,11 @@ declare(strict_types=1); namespace Nsq; -use LogicException; use PHPinnacle\Buffer\ByteBuffer; use Socket\Raw\Factory; use Socket\Raw\Socket; use Throwable; use function json_encode; -use function microtime; use function pack; use function sprintf; use const JSON_FORCE_OBJECT; @@ -22,22 +20,22 @@ use const PHP_EOL; */ abstract class Connection { - private const OK = 'OK'; - private const HEARTBEAT = '_heartbeat_'; - private const CLOSE_WAIT = 'CLOSE_WAIT'; - private const TYPE_RESPONSE = 0; - private const TYPE_ERROR = 1; - private const TYPE_MESSAGE = 2; - private const BYTES_SIZE = 4; - private const BYTES_TYPE = 4; - private const BYTES_ATTEMPTS = 2; - private const BYTES_TIMESTAMP = 8; - private const BYTES_ID = 16; + protected const OK = 'OK'; + protected const HEARTBEAT = '_heartbeat_'; + protected const CLOSE_WAIT = 'CLOSE_WAIT'; + protected const TYPE_RESPONSE = 0; + protected const TYPE_ERROR = 1; + protected const TYPE_MESSAGE = 2; + protected const BYTES_SIZE = 4; + protected const BYTES_TYPE = 4; + protected const BYTES_ATTEMPTS = 2; + protected const BYTES_TIMESTAMP = 8; + protected const BYTES_ID = 16; private const MAGIC_V2 = ' V2'; public ?Socket $socket = null; - public bool $closed = false; + private bool $closed = false; private Config $config; @@ -65,8 +63,7 @@ abstract class Connection } try { - $this->write('CLS'.PHP_EOL); - $this->consume(); // receive CLOSE_WAIT + $this->send('CLS'.PHP_EOL)->expectResponse(self::CLOSE_WAIT); if (null !== $this->socket) { $this->socket->close(); @@ -78,6 +75,11 @@ abstract class Connection $this->closed = true; } + public function isClosed(): bool + { + return $this->closed; + } + /** * @psalm-param array $arr * @@ -101,10 +103,7 @@ abstract class Connection return 'AUTH'.PHP_EOL.$size.$secret; } - /** - * @internal - */ - public function write(string $buffer): void + protected function send(string $buffer): self { $socket = $this->socket(); @@ -115,67 +114,50 @@ abstract class Connection throw $e; } + + return $this; } - protected function consume(?float $timeout = 0): ?Message + protected function receive(float $timeout = 0): ?ByteBuffer { - $deadline = microtime(true) + ($timeout ?? 0); - $socket = $this->socket(); if (false === $socket->selectRead($timeout)) { return null; } - $buffer = new ByteBuffer($socket->read(self::BYTES_SIZE + self::BYTES_TYPE)); - $size = $buffer->consumeUint32(); - $type = $buffer->consumeUint32(); + $size = (new ByteBuffer($socket->read(self::BYTES_SIZE)))->consumeUint32(); - $buffer->append($socket->read($size - self::BYTES_TYPE)); + return new ByteBuffer($socket->read($size)); + } - if (self::TYPE_RESPONSE === $type) { - $response = $buffer->consume($size - self::BYTES_TYPE); - - $isInternalMessage = false; - if (self::OK === $response || self::CLOSE_WAIT === $response) { - $isInternalMessage = true; - } - - if (self::HEARTBEAT === $response) { - $socket->write('NOP'.PHP_EOL); - - $isInternalMessage = true; - } - - if ($isInternalMessage) { - return $this->consume( - ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime - ); - } - - throw new LogicException(sprintf('Unexpected response from nsq: "%s"', $response)); + protected function expectResponse(string $expected): void + { + $buffer = $this->receive(0.1); + if (null === $buffer) { + throw new Exception('Success response was expected, but null received.'); } + $type = $buffer->consumeUint32(); + $response = $buffer->flush(); + if (self::TYPE_ERROR === $type) { - throw new LogicException(sprintf('NSQ return error: "%s"', $socket->read($size))); + throw new Exception($response); } - if (self::TYPE_MESSAGE !== $type) { - throw new LogicException(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $type)); + if (self::TYPE_RESPONSE !== $type) { + throw new Exception(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $type)); } - $timestamp = $buffer->consumeInt64(); - $attempts = $buffer->consumeUint16(); - $id = $buffer->consume(self::BYTES_ID); - $body = $buffer->consume($size - self::BYTES_TYPE - self::BYTES_TIMESTAMP - self::BYTES_ATTEMPTS - self::BYTES_ID); - - return new Message($timestamp, $attempts, $id, $body); + if ($expected !== $response) { + throw new Exception(sprintf('"%s" response expected, but "%s" received.', $expected, $response)); + } } private function socket(): Socket { if ($this->closed) { - throw new LogicException('This connection is closed, create new one.'); + throw new Exception('This connection is closed, create new one.'); } if (null === $this->socket) { diff --git a/src/Exception.php b/src/Exception.php new file mode 100644 index 0000000..a9d429f --- /dev/null +++ b/src/Exception.php @@ -0,0 +1,9 @@ +write($buffer); - $this->consume(); + $this->send($buffer)->expectResponse(self::OK); } /** @@ -22,7 +21,7 @@ class Reader extends Connection */ public function rdy(int $count): void { - $this->write('RDY '.$count.PHP_EOL); + $this->send('RDY '.$count.PHP_EOL); } /** @@ -30,7 +29,7 @@ class Reader extends Connection */ public function fin(string $id): void { - $this->write('FIN '.$id.PHP_EOL); + $this->send('FIN '.$id.PHP_EOL); } /** @@ -41,7 +40,7 @@ class Reader extends Connection */ public function req(string $id, int $timeout): void { - $this->write(sprintf('REQ %s %s', $id, $timeout).PHP_EOL); + $this->send(sprintf('REQ %s %s', $id, $timeout).PHP_EOL); } /** @@ -49,6 +48,6 @@ class Reader extends Connection */ public function touch(string $id): void { - $this->write('TOUCH '.$id.PHP_EOL); + $this->send('TOUCH '.$id.PHP_EOL); } } diff --git a/src/Subscriber.php b/src/Subscriber.php index e280d84..2791e77 100644 --- a/src/Subscriber.php +++ b/src/Subscriber.php @@ -5,37 +5,85 @@ declare(strict_types=1); namespace Nsq; use Generator; +use function get_debug_type; +use function microtime; +use function sprintf; final class Subscriber extends Reader { - public const STOP = true; + public const STOP = 0; + public const CHANGE_TIMEOUT = 1; /** - * @psalm-return Generator + * @psalm-return Generator */ - public function subscribe(string $topic, string $channel, ?float $timeout = 0): Generator + public function subscribe(string $topic, string $channel, float $timeout = 0): Generator { $this->sub($topic, $channel); - $this->rdy(1); while (true) { + $this->rdy(1); + $message = $this->consume($timeout); - if (null === $message) { - if (self::STOP === yield null) { - break; - } + $command = yield null === $message ? null : new Envelope($message, $this); - continue; - } - - if (self::STOP === yield new Envelope($message, $this)) { + if (self::STOP === $command) { break; } - $this->rdy(1); + if (self::CHANGE_TIMEOUT === $command) { + $newTimeout = yield null; + + if (!\is_float($newTimeout)) { + throw new Exception(sprintf('Timeout must be float, "%s" given.', get_debug_type($newTimeout))); + } + + $timeout = $newTimeout; + } } $this->disconnect(); } + + private function consume(float $timeout): ?Message + { + $deadline = microtime(true) + $timeout; + + $buffer = $this->receive($timeout); + if (null === $buffer) { + return null; + } + + $type = $buffer->consumeUint32(); + + if (self::TYPE_RESPONSE === $type) { + $response = $buffer->flush(); + + if (self::HEARTBEAT === $response) { + $this->send('NOP'.PHP_EOL); + + return $this->consume( + ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime + ); + } + + throw new Exception(sprintf('Unexpected response: %s', $response)); + } + + if (self::TYPE_ERROR === $type) { + throw new Exception(sprintf('NSQ return error: "%s"', $buffer->flush())); + } + + if (self::TYPE_MESSAGE !== $type) { + throw new Exception(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $type)); + } + + $timestamp = $buffer->consumeInt64(); + $attempts = $buffer->consumeUint16(); + $id = $buffer->consume(self::BYTES_ID); + $body = $buffer->flush(); + + return new Message($timestamp, $attempts, $id, $body); + } } diff --git a/src/Writer.php b/src/Writer.php index 33be17e..d3535e6 100644 --- a/src/Writer.php +++ b/src/Writer.php @@ -21,8 +21,7 @@ final class Writer extends Connection $buffer = 'PUB '.$topic.PHP_EOL.$size.$body; - $this->write($buffer); - $this->consume(); + $this->send($buffer)->expectResponse(self::OK); } /** @@ -42,8 +41,7 @@ final class Writer extends Connection $buffer = 'MPUB '.$topic.PHP_EOL.$size.$num.$mb; - $this->write($buffer); - $this->consume(); + $this->send($buffer)->expectResponse(self::OK); } /** @@ -55,7 +53,6 @@ final class Writer extends Connection $buffer = sprintf('DPUB %s %s', $topic, $deferTime).PHP_EOL.$size.$body; - $this->write($buffer); - $this->consume(); + $this->send($buffer)->expectResponse(self::OK); } } diff --git a/tests/NsqTest.php b/tests/NsqTest.php index 6d3d96c..e4aee5c 100644 --- a/tests/NsqTest.php +++ b/tests/NsqTest.php @@ -5,6 +5,7 @@ declare(strict_types=1); use Nsq\Envelope; use Nsq\Subscriber; use Nsq\Writer; +use Nsq\Exception; use PHPUnit\Framework\TestCase; final class NsqTest extends TestCase @@ -17,14 +18,81 @@ final class NsqTest extends TestCase $subscriber = new Subscriber('tcp://localhost:4150'); $generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__, 1); + /** @var null|Envelope $envelope */ $envelope = $generator->current(); static::assertInstanceOf(Envelope::class, $envelope); - /** @var Envelope $envelope */ static::assertSame(__FUNCTION__, $envelope->message->body); $envelope->finish(); $generator->next(); static::assertNull($generator->current()); + + $writer->mpub(__FUNCTION__, [ + 'First mpub message.', + 'Second mpub message.', + ]); + + $generator->next(); + /** @var null|Envelope $envelope */ + $envelope = $generator->current(); + static::assertInstanceOf(Envelope::class, $envelope); + static::assertSame('First mpub message.', $envelope->message->body); + $envelope->finish(); + + $generator->next(); + /** @var null|Envelope $envelope */ + $envelope = $generator->current(); + static::assertInstanceOf(Envelope::class, $envelope); + static::assertSame('Second mpub message.', $envelope->message->body); + $envelope->requeue(0); + + $generator->next(); + /** @var null|Envelope $envelope */ + $envelope = $generator->current(); + static::assertInstanceOf(Envelope::class, $envelope); + static::assertSame('Second mpub message.', $envelope->message->body); + $envelope->finish(); + + $writer->dpub(__FUNCTION__, 2000, 'Deferred message.'); + + $generator->next(); + /** @var null|Envelope $envelope */ + $envelope = $generator->current(); + static::assertNull($envelope); + + $generator->send(Subscriber::CHANGE_TIMEOUT); + $generator->send(10.0); + + /** @var null|Envelope $envelope */ + $envelope = $generator->current(); + static::assertInstanceOf(Envelope::class, $envelope); + static::assertSame('Deferred message.', $envelope->message->body); + $envelope->finish(); + + static::assertFalse($subscriber->isClosed()); + $generator->send(Subscriber::STOP); + static::assertTrue($subscriber->isClosed()); + } + + /** + * @dataProvider pubFails + */ + public function testPubFail(string $topic, string $body, string $exceptionMessage): void + { + $this->expectException(Exception::class); + $this->expectExceptionMessage($exceptionMessage); + + $writer = new Writer('tcp://localhost:4150'); + $writer->pub($topic, $body); + } + + /** + * @return Generator + */ + public function pubFails(): Generator + { + yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0']; + yield 'Invalid topic' => ['test$%^&', '', 'E_BAD_TOPIC PUB topic name "test$%^&" is not valid']; } }