From ae00cdb763e05ea18575eb389a59eea04405f09d Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Wed, 10 Mar 2021 20:59:03 +0300 Subject: [PATCH] Decouple from Reader (#3) --- src/Messenger/NsqTransport.php | 44 ++++++++++++++++------------------ 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/src/Messenger/NsqTransport.php b/src/Messenger/NsqTransport.php index 0f73412..3ee8eaa 100644 --- a/src/Messenger/NsqTransport.php +++ b/src/Messenger/NsqTransport.php @@ -4,14 +4,11 @@ declare(strict_types=1); namespace Nsq\NsqBundle\Messenger; -use Amp\Loop; -use Amp\Promise; -use Generator; use JsonException; use Nsq\Config\ClientConfig; +use Nsq\Consumer; use Nsq\Message; use Nsq\Producer; -use Nsq\Reader; use Psr\Log\LoggerInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; @@ -19,7 +16,9 @@ use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\TransportInterface; +use function Amp\delay; use function Amp\Promise\wait; +use function array_pop; use function json_decode; use function json_encode; use const JSON_THROW_ON_ERROR; @@ -28,12 +27,12 @@ final class NsqTransport implements TransportInterface { private ?Producer $producer = null; - private ?Reader $reader = null; + private ?Consumer $consumer = null; /** - * @var Promise|null + * @var Message[] */ - private ?Promise $deferred = null; + private array $messages = []; public function __construct( private string $address, @@ -75,26 +74,20 @@ final class NsqTransport implements TransportInterface */ public function get(): iterable { - $reader = $this->getReader(); + $message = array_pop($this->messages); - $promise = $this->deferred ??= $reader->consume(); + if (null === $message) { + $this->getConsumer(); - /** @var Message|null $message */ - $message = null; - Loop::run(function () use (&$message, $promise): Generator { - Loop::delay(500, static function () { - Loop::stop(); - }); + wait(delay(500)); - $message = yield $promise; - }); + $message = array_pop($this->messages); + } if (null === $message) { return []; } - $this->deferred = null; - try { $encodedEnvelope = json_decode($message->body, true, 512, JSON_THROW_ON_ERROR); } catch (JsonException $e) { @@ -154,20 +147,23 @@ final class NsqTransport implements TransportInterface return $this->producer; } - private function getReader(): Reader + private function getConsumer(): Consumer { - if (null === $this->reader) { - $this->reader = new Reader( + if (null === $this->consumer) { + $this->consumer = new Consumer( $this->address, $this->topic, $this->channel, + function (Message $message) { + $this->messages[] = $message; + }, $this->clientConfig, $this->logger, ); } - wait($this->reader->connect()); + wait($this->consumer->connect()); - return $this->reader; + return $this->consumer; } }