Merge Envelope with Message

This commit is contained in:
2021-01-23 01:30:29 +03:00
parent 7873a9f010
commit 18ecca3c40
6 changed files with 85 additions and 96 deletions

View File

@ -64,21 +64,21 @@ $writer->dpub('topic', 5000, 'Deferred message');
### Subscription ### Subscription
```php ```php
use Nsq\Envelope; use Nsq\Message;
use Nsq\Subscriber; use Nsq\Subscriber;
$subscriber = new Subscriber(address: 'tcp://nsqd:4150'); $subscriber = new Subscriber(address: 'tcp://nsqd:4150');
$generator = $subscriber->subscribe('topic', 'channel', timeout: 5); $generator = $subscriber->subscribe('topic', 'channel', timeout: 5);
foreach ($generator as $envelope) { foreach ($generator as $message) {
if ($envelope instanceof Envelope) { if ($message instanceof Message) {
$payload = $envelope->message->body; $payload = $message->body;
// handle message // handle message
$envelope->touch(); // Reset the timeout for an in-flight message $message->touch(); // Reset the timeout for an in-flight message
$envelope->requeue(timeout: 5000); // Re-queue a message (indicate failure to process) $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
$envelope->finish(); // Finish a message (indicate successful processing) $message->finish(); // Finish a message (indicate successful processing)
} }
// In case of nothing received during timeout generator will return NULL // In case of nothing received during timeout generator will return NULL

View File

@ -1,59 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq;
use LogicException;
final class Envelope
{
/**
* @psalm-readonly
*/
public Message $message;
private bool $finished = false;
private Reader $connection;
public function __construct(Message $message, Reader $connection)
{
$this->message = $message;
$this->connection = $connection;
}
public function isFinished(): bool
{
return $this->finished;
}
public function finish(): void
{
if ($this->finished) {
throw new LogicException('Can\'t finish message as it already finished.');
}
$this->connection->fin($this->message->id);
$this->finished = true;
}
public function requeue(int $timeout): void
{
if ($this->finished) {
throw new LogicException('Can\'t requeue message as it already finished.');
}
$this->connection->req($this->message->id, $timeout);
$this->finished = true;
}
public function touch(): void
{
if ($this->finished) {
throw new LogicException('Can\'t touch message as it already finished.');
}
$this->connection->touch($this->message->id);
}
}

View File

@ -4,24 +4,75 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
/** use LogicException;
* @psalm-immutable
*/
final class Message final class Message
{ {
/**
* @psalm-readonly
*/
public int $timestamp; public int $timestamp;
/**
* @psalm-readonly
*/
public int $attempts; public int $attempts;
/**
* @psalm-readonly
*/
public string $id; public string $id;
/**
* @psalm-readonly
*/
public string $body; public string $body;
public function __construct(int $timestamp, int $attempts, string $id, string $body) public function __construct(int $timestamp, int $attempts, string $id, string $body, Reader $reader)
{ {
$this->timestamp = $timestamp; $this->timestamp = $timestamp;
$this->attempts = $attempts; $this->attempts = $attempts;
$this->id = $id; $this->id = $id;
$this->body = $body; $this->body = $body;
$this->connection = $reader;
}
private bool $finished = false;
private Reader $connection;
public function isFinished(): bool
{
return $this->finished;
}
public function finish(): void
{
if ($this->finished) {
throw new LogicException('Can\'t finish message as it already finished.');
}
$this->connection->fin($this->id);
$this->finished = true;
}
public function requeue(int $timeout): void
{
if ($this->finished) {
throw new LogicException('Can\'t requeue message as it already finished.');
}
$this->connection->req($this->id, $timeout);
$this->finished = true;
}
public function touch(): void
{
if ($this->finished) {
throw new LogicException('Can\'t touch message as it already finished.');
}
$this->connection->touch($this->id);
} }
} }

View File

@ -44,7 +44,7 @@ final class Response
return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes(); return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes();
} }
public function toMessage(): Message public function toMessage(Reader $reader): Message
{ {
if (self::TYPE_MESSAGE !== $this->type) { if (self::TYPE_MESSAGE !== $this->type) {
throw new Exception(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type)); throw new Exception(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type));
@ -57,6 +57,6 @@ final class Response
$id = $buffer->consume(Bytes::BYTES_ID); $id = $buffer->consume(Bytes::BYTES_ID);
$body = $buffer->flush(); $body = $buffer->flush();
return new Message($timestamp, $attempts, $id, $body); return new Message($timestamp, $attempts, $id, $body, $reader);
} }
} }

View File

@ -22,7 +22,7 @@ final class Subscriber
} }
/** /**
* @psalm-return Generator<int, Envelope|null, int|float|null, void> * @psalm-return Generator<int, Message|null, int|float|null, void>
*/ */
public function subscribe(string $topic, string $channel, float $timeout = 0): Generator public function subscribe(string $topic, string $channel, float $timeout = 0): Generator
{ {
@ -31,9 +31,7 @@ final class Subscriber
while (true) { while (true) {
$this->reader->rdy(1); $this->reader->rdy(1);
$message = $this->consume($timeout); $command = yield $this->consume($timeout);
$command = yield null === $message ? null : new Envelope($message, $this->reader);
if (self::STOP === $command) { if (self::STOP === $command) {
break; break;
@ -70,6 +68,6 @@ final class Subscriber
); );
} }
return $response->toMessage(); return $response->toMessage($this->reader);
} }
} }

View File

@ -2,8 +2,7 @@
declare(strict_types=1); declare(strict_types=1);
use Nsq\Envelope; use Nsq\Message;
use Nsq\Reader;
use Nsq\Subscriber; use Nsq\Subscriber;
use Nsq\Writer; use Nsq\Writer;
use Nsq\Exception; use Nsq\Exception;
@ -16,15 +15,15 @@ final class NsqTest extends TestCase
$writer = new Writer('tcp://localhost:4150'); $writer = new Writer('tcp://localhost:4150');
$writer->pub(__FUNCTION__, __FUNCTION__); $writer->pub(__FUNCTION__, __FUNCTION__);
$reader = new Reader('tcp://localhost:4150'); $reader = new \Nsq\Reader('tcp://localhost:4150');
$subscriber = new Subscriber($reader); $subscriber = new Subscriber($reader);
$generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__, 1); $generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__, 1);
/** @var null|Envelope $envelope */ /** @var null|Message $envelope */
$envelope = $generator->current(); $envelope = $generator->current();
static::assertInstanceOf(Envelope::class, $envelope); static::assertInstanceOf(Message::class, $envelope);
static::assertSame(__FUNCTION__, $envelope->message->body); static::assertSame(__FUNCTION__, $envelope->body);
$envelope->finish(); $envelope->finish();
$generator->next(); $generator->next();
@ -36,40 +35,40 @@ final class NsqTest extends TestCase
]); ]);
$generator->next(); $generator->next();
/** @var null|Envelope $envelope */ /** @var null|Message $envelope */
$envelope = $generator->current(); $envelope = $generator->current();
static::assertInstanceOf(Envelope::class, $envelope); static::assertInstanceOf(Message::class, $envelope);
static::assertSame('First mpub message.', $envelope->message->body); static::assertSame('First mpub message.', $envelope->body);
$envelope->finish(); $envelope->finish();
$generator->next(); $generator->next();
/** @var null|Envelope $envelope */ /** @var null|Message $envelope */
$envelope = $generator->current(); $envelope = $generator->current();
static::assertInstanceOf(Envelope::class, $envelope); static::assertInstanceOf(Message::class, $envelope);
static::assertSame('Second mpub message.', $envelope->message->body); static::assertSame('Second mpub message.', $envelope->body);
$envelope->requeue(0); $envelope->requeue(0);
$generator->next(); $generator->next();
/** @var null|Envelope $envelope */ /** @var null|Message $envelope */
$envelope = $generator->current(); $envelope = $generator->current();
static::assertInstanceOf(Envelope::class, $envelope); static::assertInstanceOf(Message::class, $envelope);
static::assertSame('Second mpub message.', $envelope->message->body); static::assertSame('Second mpub message.', $envelope->body);
$envelope->finish(); $envelope->finish();
$writer->dpub(__FUNCTION__, 2000, 'Deferred message.'); $writer->dpub(__FUNCTION__, 2000, 'Deferred message.');
$generator->next(); $generator->next();
/** @var null|Envelope $envelope */ /** @var null|Message $envelope */
$envelope = $generator->current(); $envelope = $generator->current();
static::assertNull($envelope); static::assertNull($envelope);
$generator->send(Subscriber::CHANGE_TIMEOUT); $generator->send(Subscriber::CHANGE_TIMEOUT);
$generator->send(10.0); $generator->send(10.0);
/** @var null|Envelope $envelope */ /** @var null|Message $envelope */
$envelope = $generator->current(); $envelope = $generator->current();
static::assertInstanceOf(Envelope::class, $envelope); static::assertInstanceOf(Message::class, $envelope);
static::assertSame('Deferred message.', $envelope->message->body); static::assertSame('Deferred message.', $envelope->body);
$envelope->finish(); $envelope->finish();
static::assertFalse($reader->isClosed()); static::assertFalse($reader->isClosed());