Compare commits

1 Commits

Author SHA1 Message Date
7dc14e916d Decouple from Reader 2021-03-10 20:57:50 +03:00

View File

@ -4,14 +4,11 @@ declare(strict_types=1);
namespace Nsq\NsqBundle\Messenger; namespace Nsq\NsqBundle\Messenger;
use Amp\Loop;
use Amp\Promise;
use Generator;
use JsonException; use JsonException;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Consumer;
use Nsq\Message; use Nsq\Message;
use Nsq\Producer; use Nsq\Producer;
use Nsq\Reader;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
@ -19,7 +16,9 @@ use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface; use Symfony\Component\Messenger\Transport\TransportInterface;
use function Amp\delay;
use function Amp\Promise\wait; use function Amp\Promise\wait;
use function array_pop;
use function json_decode; use function json_decode;
use function json_encode; use function json_encode;
use const JSON_THROW_ON_ERROR; use const JSON_THROW_ON_ERROR;
@ -28,12 +27,12 @@ final class NsqTransport implements TransportInterface
{ {
private ?Producer $producer = null; private ?Producer $producer = null;
private ?Reader $reader = null; private ?Consumer $consumer = null;
/** /**
* @var Promise<Message>|null * @var Message[]
*/ */
private ?Promise $deferred = null; private array $messages = [];
public function __construct( public function __construct(
private string $address, private string $address,
@ -75,26 +74,20 @@ final class NsqTransport implements TransportInterface
*/ */
public function get(): iterable public function get(): iterable
{ {
$reader = $this->getReader(); $message = array_pop($this->messages);
$promise = $this->deferred ??= $reader->consume(); if (null === $message) {
$this->getConsumer();
/** @var Message|null $message */ wait(delay(500));
$message = null;
Loop::run(function () use (&$message, $promise): Generator {
Loop::delay(500, static function () {
Loop::stop();
});
$message = yield $promise; $message = array_pop($this->messages);
}); }
if (null === $message) { if (null === $message) {
return []; return [];
} }
$this->deferred = null;
try { try {
$encodedEnvelope = json_decode($message->body, true, 512, JSON_THROW_ON_ERROR); $encodedEnvelope = json_decode($message->body, true, 512, JSON_THROW_ON_ERROR);
} catch (JsonException $e) { } catch (JsonException $e) {
@ -154,20 +147,23 @@ final class NsqTransport implements TransportInterface
return $this->producer; return $this->producer;
} }
private function getReader(): Reader private function getConsumer(): Consumer
{ {
if (null === $this->reader) { if (null === $this->consumer) {
$this->reader = new Reader( $this->consumer = new Consumer(
$this->address, $this->address,
$this->topic, $this->topic,
$this->channel, $this->channel,
function (Message $message) {
$this->messages[] = $message;
},
$this->clientConfig, $this->clientConfig,
$this->logger, $this->logger,
); );
} }
wait($this->reader->connect()); wait($this->consumer->connect());
return $this->reader; return $this->consumer;
} }
} }