1 Commits

Author SHA1 Message Date
7dc14e916d Decouple from Reader 2021-03-10 20:57:50 +03:00

View File

@ -4,14 +4,11 @@ declare(strict_types=1);
namespace Nsq\NsqBundle\Messenger;
use Amp\Loop;
use Amp\Promise;
use Generator;
use JsonException;
use Nsq\Config\ClientConfig;
use Nsq\Consumer;
use Nsq\Message;
use Nsq\Producer;
use Nsq\Reader;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
@ -19,7 +16,9 @@ use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use function Amp\delay;
use function Amp\Promise\wait;
use function array_pop;
use function json_decode;
use function json_encode;
use const JSON_THROW_ON_ERROR;
@ -28,12 +27,12 @@ final class NsqTransport implements TransportInterface
{
private ?Producer $producer = null;
private ?Reader $reader = null;
private ?Consumer $consumer = null;
/**
* @var Promise<Message>|null
* @var Message[]
*/
private ?Promise $deferred = null;
private array $messages = [];
public function __construct(
private string $address,
@ -75,26 +74,20 @@ final class NsqTransport implements TransportInterface
*/
public function get(): iterable
{
$reader = $this->getReader();
$message = array_pop($this->messages);
$promise = $this->deferred ??= $reader->consume();
if (null === $message) {
$this->getConsumer();
/** @var Message|null $message */
$message = null;
Loop::run(function () use (&$message, $promise): Generator {
Loop::delay(500, static function () {
Loop::stop();
});
wait(delay(500));
$message = yield $promise;
});
$message = array_pop($this->messages);
}
if (null === $message) {
return [];
}
$this->deferred = null;
try {
$encodedEnvelope = json_decode($message->body, true, 512, JSON_THROW_ON_ERROR);
} catch (JsonException $e) {
@ -154,20 +147,23 @@ final class NsqTransport implements TransportInterface
return $this->producer;
}
private function getReader(): Reader
private function getConsumer(): Consumer
{
if (null === $this->reader) {
$this->reader = new Reader(
if (null === $this->consumer) {
$this->consumer = new Consumer(
$this->address,
$this->topic,
$this->channel,
function (Message $message) {
$this->messages[] = $message;
},
$this->clientConfig,
$this->logger,
);
}
wait($this->reader->connect());
wait($this->consumer->connect());
return $this->reader;
return $this->consumer;
}
}