Update for nsq/nsq:0.2

This commit is contained in:
2021-01-20 17:23:18 +03:00
parent f33a9dc226
commit b3cb1c0c32
3 changed files with 19 additions and 27 deletions

View File

@@ -13,7 +13,7 @@
"require": { "require": {
"php": ">=7.4", "php": ">=7.4",
"ext-json": "*", "ext-json": "*",
"nsq/nsq": "0.1", "nsq/nsq": "0.2",
"symfony/framework-bundle": "^5.0", "symfony/framework-bundle": "^5.0",
"symfony/messenger": "^5.0" "symfony/messenger": "^5.0"
}, },

View File

@@ -7,9 +7,7 @@ namespace NsqPHP\NsqBundle\Messenger;
use Generator; use Generator;
use JsonException; use JsonException;
use LogicException; use LogicException;
use Nsq\Connection;
use Nsq\Envelope as NsqEnvelope; use Nsq\Envelope as NsqEnvelope;
use Nsq\Reader;
use Nsq\Subscriber; use Nsq\Subscriber;
use Nsq\Writer; use Nsq\Writer;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
@@ -24,7 +22,9 @@ use const JSON_THROW_ON_ERROR;
final class NsqTransport implements TransportInterface final class NsqTransport implements TransportInterface
{ {
private Connection $connection; private Writer $writer;
private Subscriber $subscriber;
private SerializerInterface $serializer; private SerializerInterface $serializer;
@@ -35,12 +35,14 @@ final class NsqTransport implements TransportInterface
private string $channel; private string $channel;
public function __construct( public function __construct(
Connection $connection, Writer $writer,
Subscriber $subscriber,
string $topic, string $topic,
string $channel, string $channel,
SerializerInterface $serializer = null SerializerInterface $serializer = null
) { ) {
$this->connection = $connection; $this->writer = $writer;
$this->subscriber = $subscriber;
$this->topic = $topic; $this->topic = $topic;
$this->channel = $channel; $this->channel = $channel;
$this->serializer = $serializer ?? new PhpSerializer(); $this->serializer = $serializer ?? new PhpSerializer();
@@ -55,10 +57,10 @@ final class NsqTransport implements TransportInterface
$encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class)); $encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class));
$this->getPublisher()->pub($this->topic, json_encode($encodedMessage, JSON_THROW_ON_ERROR)); $this->writer->pub($this->topic, json_encode($encodedMessage, JSON_THROW_ON_ERROR));
if (null !== $nsqEnvelope) { if (null !== $nsqEnvelope) {
$nsqEnvelope->ack(); $nsqEnvelope->finish();
} }
return $envelope; return $envelope;
@@ -71,7 +73,7 @@ final class NsqTransport implements TransportInterface
{ {
$generator = $this->generator; $generator = $this->generator;
if (null === $generator) { if (null === $generator) {
$this->generator = $generator = $this->getSubscriber()->subscribe($this->topic, $this->channel); $this->generator = $generator = $this->subscriber->subscribe($this->topic, $this->channel);
} else { } else {
$generator->next(); $generator->next();
} }
@@ -86,7 +88,7 @@ final class NsqTransport implements TransportInterface
try { try {
$encodedEnvelope = json_decode($nsqEnvelope->message->body, true, 512, JSON_THROW_ON_ERROR); $encodedEnvelope = json_decode($nsqEnvelope->message->body, true, 512, JSON_THROW_ON_ERROR);
} catch (JsonException $e) { } catch (JsonException $e) {
$nsqEnvelope->ack(); $nsqEnvelope->finish();
throw new MessageDecodingFailedException('', 0, $e); throw new MessageDecodingFailedException('', 0, $e);
} }
@@ -94,7 +96,7 @@ final class NsqTransport implements TransportInterface
try { try {
$envelope = $this->serializer->decode($encodedEnvelope); $envelope = $this->serializer->decode($encodedEnvelope);
} catch (MessageDecodingFailedException $e) { } catch (MessageDecodingFailedException $e) {
$nsqEnvelope->ack(); $nsqEnvelope->finish();
throw $e; throw $e;
} }
@@ -117,7 +119,7 @@ final class NsqTransport implements TransportInterface
throw new LogicException('Returned envelop doesn\'t related to NsqMessage.'); throw new LogicException('Returned envelop doesn\'t related to NsqMessage.');
} }
$message->ack(); $message->finish();
} }
/** /**
@@ -130,7 +132,7 @@ final class NsqTransport implements TransportInterface
throw new LogicException('Returned envelop doesn\'t related to NsqMessage.'); throw new LogicException('Returned envelop doesn\'t related to NsqMessage.');
} }
$message->ack(); $message->finish();
} }
private function getNsqEnvelope(Envelope $envelope): ?NsqEnvelope private function getNsqEnvelope(Envelope $envelope): ?NsqEnvelope
@@ -142,14 +144,4 @@ final class NsqTransport implements TransportInterface
return $stamp->envelope; return $stamp->envelope;
} }
private function getPublisher(): Writer
{
return $this->publisher ??= new Writer($this->connection);
}
private function getSubscriber(): Subscriber
{
return $this->publisher ??= new Subscriber(new Reader($this->connection));
}
} }

View File

@@ -4,13 +4,12 @@ declare(strict_types=1);
namespace NsqPHP\NsqBundle\Messenger; namespace NsqPHP\NsqBundle\Messenger;
use Nsq\Config; use Nsq\Subscriber;
use Nsq\Connection; use Nsq\Writer;
use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface; use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface; use Symfony\Component\Messenger\Transport\TransportInterface;
use function dump;
use function parse_str; use function parse_str;
use function parse_url; use function parse_url;
use function sprintf; use function sprintf;
@@ -35,7 +34,8 @@ final class NsqTransportFactory implements TransportFactoryInterface
$address = sprintf('tcp://%s:%s', $parsedUrl['host'] ?? 'nsqd', $parsedUrl['port'] ?? 4150); $address = sprintf('tcp://%s:%s', $parsedUrl['host'] ?? 'nsqd', $parsedUrl['port'] ?? 4150);
return new NsqTransport( return new NsqTransport(
Connection::connect(new Config($address)), new Writer($address),
new Subscriber($address),
$nsqOptions['topic'] ?? 'symfony-messenger', $nsqOptions['topic'] ?? 'symfony-messenger',
$nsqOptions['channel'] ?? 'default', $nsqOptions['channel'] ?? 'default',
$serializer $serializer