From 18ecca3c408c96b793425e31e0eb751ffec34ab4 Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Sat, 23 Jan 2021 01:30:29 +0300 Subject: [PATCH] Merge Envelope with Message --- README.md | 14 +++++------ src/Envelope.php | 59 ---------------------------------------------- src/Message.php | 59 ++++++++++++++++++++++++++++++++++++++++++---- src/Response.php | 4 ++-- src/Subscriber.php | 8 +++---- tests/NsqTest.php | 37 ++++++++++++++--------------- 6 files changed, 85 insertions(+), 96 deletions(-) delete mode 100644 src/Envelope.php diff --git a/README.md b/README.md index d6bc3db..24fbc29 100644 --- a/README.md +++ b/README.md @@ -64,21 +64,21 @@ $writer->dpub('topic', 5000, 'Deferred message'); ### Subscription ```php -use Nsq\Envelope; +use Nsq\Message; use Nsq\Subscriber; $subscriber = new Subscriber(address: 'tcp://nsqd:4150'); $generator = $subscriber->subscribe('topic', 'channel', timeout: 5); -foreach ($generator as $envelope) { - if ($envelope instanceof Envelope) { - $payload = $envelope->message->body; +foreach ($generator as $message) { + if ($message instanceof Message) { + $payload = $message->body; // handle message - $envelope->touch(); // Reset the timeout for an in-flight message - $envelope->requeue(timeout: 5000); // Re-queue a message (indicate failure to process) - $envelope->finish(); // Finish a message (indicate successful processing) + $message->touch(); // Reset the timeout for an in-flight message + $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process) + $message->finish(); // Finish a message (indicate successful processing) } // In case of nothing received during timeout generator will return NULL diff --git a/src/Envelope.php b/src/Envelope.php deleted file mode 100644 index 5ea3066..0000000 --- a/src/Envelope.php +++ /dev/null @@ -1,59 +0,0 @@ -message = $message; - $this->connection = $connection; - } - - public function isFinished(): bool - { - return $this->finished; - } - - public function finish(): void - { - if ($this->finished) { - throw new LogicException('Can\'t finish message as it already finished.'); - } - - $this->connection->fin($this->message->id); - $this->finished = true; - } - - public function requeue(int $timeout): void - { - if ($this->finished) { - throw new LogicException('Can\'t requeue message as it already finished.'); - } - - $this->connection->req($this->message->id, $timeout); - $this->finished = true; - } - - public function touch(): void - { - if ($this->finished) { - throw new LogicException('Can\'t touch message as it already finished.'); - } - - $this->connection->touch($this->message->id); - } -} diff --git a/src/Message.php b/src/Message.php index 7b6fe1a..2043da3 100644 --- a/src/Message.php +++ b/src/Message.php @@ -4,24 +4,75 @@ declare(strict_types=1); namespace Nsq; -/** - * @psalm-immutable - */ +use LogicException; + final class Message { + /** + * @psalm-readonly + */ public int $timestamp; + /** + * @psalm-readonly + */ public int $attempts; + /** + * @psalm-readonly + */ public string $id; + /** + * @psalm-readonly + */ public string $body; - public function __construct(int $timestamp, int $attempts, string $id, string $body) + public function __construct(int $timestamp, int $attempts, string $id, string $body, Reader $reader) { $this->timestamp = $timestamp; $this->attempts = $attempts; $this->id = $id; $this->body = $body; + + $this->connection = $reader; + } + + private bool $finished = false; + + private Reader $connection; + + public function isFinished(): bool + { + return $this->finished; + } + + public function finish(): void + { + if ($this->finished) { + throw new LogicException('Can\'t finish message as it already finished.'); + } + + $this->connection->fin($this->id); + $this->finished = true; + } + + public function requeue(int $timeout): void + { + if ($this->finished) { + throw new LogicException('Can\'t requeue message as it already finished.'); + } + + $this->connection->req($this->id, $timeout); + $this->finished = true; + } + + public function touch(): void + { + if ($this->finished) { + throw new LogicException('Can\'t touch message as it already finished.'); + } + + $this->connection->touch($this->id); } } diff --git a/src/Response.php b/src/Response.php index 2e1a3c6..650340d 100644 --- a/src/Response.php +++ b/src/Response.php @@ -44,7 +44,7 @@ final class Response return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes(); } - public function toMessage(): Message + public function toMessage(Reader $reader): Message { if (self::TYPE_MESSAGE !== $this->type) { throw new Exception(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type)); @@ -57,6 +57,6 @@ final class Response $id = $buffer->consume(Bytes::BYTES_ID); $body = $buffer->flush(); - return new Message($timestamp, $attempts, $id, $body); + return new Message($timestamp, $attempts, $id, $body, $reader); } } diff --git a/src/Subscriber.php b/src/Subscriber.php index f248fee..67a5554 100644 --- a/src/Subscriber.php +++ b/src/Subscriber.php @@ -22,7 +22,7 @@ final class Subscriber } /** - * @psalm-return Generator + * @psalm-return Generator */ public function subscribe(string $topic, string $channel, float $timeout = 0): Generator { @@ -31,9 +31,7 @@ final class Subscriber while (true) { $this->reader->rdy(1); - $message = $this->consume($timeout); - - $command = yield null === $message ? null : new Envelope($message, $this->reader); + $command = yield $this->consume($timeout); if (self::STOP === $command) { break; @@ -70,6 +68,6 @@ final class Subscriber ); } - return $response->toMessage(); + return $response->toMessage($this->reader); } } diff --git a/tests/NsqTest.php b/tests/NsqTest.php index 221f44c..7cf8c43 100644 --- a/tests/NsqTest.php +++ b/tests/NsqTest.php @@ -2,8 +2,7 @@ declare(strict_types=1); -use Nsq\Envelope; -use Nsq\Reader; +use Nsq\Message; use Nsq\Subscriber; use Nsq\Writer; use Nsq\Exception; @@ -16,15 +15,15 @@ final class NsqTest extends TestCase $writer = new Writer('tcp://localhost:4150'); $writer->pub(__FUNCTION__, __FUNCTION__); - $reader = new Reader('tcp://localhost:4150'); + $reader = new \Nsq\Reader('tcp://localhost:4150'); $subscriber = new Subscriber($reader); $generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__, 1); - /** @var null|Envelope $envelope */ + /** @var null|Message $envelope */ $envelope = $generator->current(); - static::assertInstanceOf(Envelope::class, $envelope); - static::assertSame(__FUNCTION__, $envelope->message->body); + static::assertInstanceOf(Message::class, $envelope); + static::assertSame(__FUNCTION__, $envelope->body); $envelope->finish(); $generator->next(); @@ -36,40 +35,40 @@ final class NsqTest extends TestCase ]); $generator->next(); - /** @var null|Envelope $envelope */ + /** @var null|Message $envelope */ $envelope = $generator->current(); - static::assertInstanceOf(Envelope::class, $envelope); - static::assertSame('First mpub message.', $envelope->message->body); + static::assertInstanceOf(Message::class, $envelope); + static::assertSame('First mpub message.', $envelope->body); $envelope->finish(); $generator->next(); - /** @var null|Envelope $envelope */ + /** @var null|Message $envelope */ $envelope = $generator->current(); - static::assertInstanceOf(Envelope::class, $envelope); - static::assertSame('Second mpub message.', $envelope->message->body); + static::assertInstanceOf(Message::class, $envelope); + static::assertSame('Second mpub message.', $envelope->body); $envelope->requeue(0); $generator->next(); - /** @var null|Envelope $envelope */ + /** @var null|Message $envelope */ $envelope = $generator->current(); - static::assertInstanceOf(Envelope::class, $envelope); - static::assertSame('Second mpub message.', $envelope->message->body); + static::assertInstanceOf(Message::class, $envelope); + static::assertSame('Second mpub message.', $envelope->body); $envelope->finish(); $writer->dpub(__FUNCTION__, 2000, 'Deferred message.'); $generator->next(); - /** @var null|Envelope $envelope */ + /** @var null|Message $envelope */ $envelope = $generator->current(); static::assertNull($envelope); $generator->send(Subscriber::CHANGE_TIMEOUT); $generator->send(10.0); - /** @var null|Envelope $envelope */ + /** @var null|Message $envelope */ $envelope = $generator->current(); - static::assertInstanceOf(Envelope::class, $envelope); - static::assertSame('Deferred message.', $envelope->message->body); + static::assertInstanceOf(Message::class, $envelope); + static::assertSame('Deferred message.', $envelope->body); $envelope->finish(); static::assertFalse($reader->isClosed());