From cb9d26d6fa52ec0314f573d3515ea8be4c851898 Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Tue, 26 Jan 2021 15:34:09 +0300 Subject: [PATCH] Upgrade to nsq/nsq:4.0 --- composer.json | 3 +- src/Messenger/NsqReceivedStamp.php | 8 +++--- src/Messenger/NsqTransport.php | 40 +++++++++++++-------------- src/Messenger/NsqTransportFactory.php | 28 +++++++++++++++---- 4 files changed, 48 insertions(+), 31 deletions(-) diff --git a/composer.json b/composer.json index aae0a32..97a3176 100644 --- a/composer.json +++ b/composer.json @@ -11,9 +11,8 @@ } ], "require": { - "php": ">=7.4", "ext-json": "*", - "nsq/nsq": "0.3", + "nsq/nsq": "0.4", "symfony/framework-bundle": "^5.0", "symfony/messenger": "^5.0" }, diff --git a/src/Messenger/NsqReceivedStamp.php b/src/Messenger/NsqReceivedStamp.php index 5cf94a8..396d96b 100644 --- a/src/Messenger/NsqReceivedStamp.php +++ b/src/Messenger/NsqReceivedStamp.php @@ -4,7 +4,7 @@ declare(strict_types=1); namespace Nsq\NsqBundle\Messenger; -use Nsq\Envelope; +use Nsq\Message; use Symfony\Component\Messenger\Stamp\StampInterface; /** @@ -12,10 +12,10 @@ use Symfony\Component\Messenger\Stamp\StampInterface; */ final class NsqReceivedStamp implements StampInterface { - public Envelope $envelope; + public Message $message; - public function __construct(Envelope $envelope) + public function __construct(Message $message) { - $this->envelope = $envelope; + $this->message = $message; } } diff --git a/src/Messenger/NsqTransport.php b/src/Messenger/NsqTransport.php index 051ea63..cbb7f66 100644 --- a/src/Messenger/NsqTransport.php +++ b/src/Messenger/NsqTransport.php @@ -7,9 +7,9 @@ namespace Nsq\NsqBundle\Messenger; use Generator; use JsonException; use LogicException; -use Nsq\Envelope as NsqEnvelope; +use Nsq\Producer; use Nsq\Subscriber; -use Nsq\Writer; +use Nsq\Message; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; @@ -22,7 +22,7 @@ use const JSON_THROW_ON_ERROR; final class NsqTransport implements TransportInterface { - private Writer $writer; + private Producer $producer; private Subscriber $subscriber; @@ -35,13 +35,13 @@ final class NsqTransport implements TransportInterface private string $channel; public function __construct( - Writer $writer, + Producer $producer, Subscriber $subscriber, string $topic, string $channel, SerializerInterface $serializer = null ) { - $this->writer = $writer; + $this->producer = $producer; $this->subscriber = $subscriber; $this->topic = $topic; $this->channel = $channel; @@ -55,7 +55,7 @@ final class NsqTransport implements TransportInterface { $encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class)); - $this->writer->pub($this->topic, json_encode($encodedMessage, JSON_THROW_ON_ERROR)); + $this->producer->pub($this->topic, json_encode($encodedMessage, JSON_THROW_ON_ERROR)); return $envelope; } @@ -72,17 +72,17 @@ final class NsqTransport implements TransportInterface $generator->next(); } - /** @var NsqEnvelope|null $nsqEnvelope */ - $nsqEnvelope = $generator->current(); + /** @var Message|null $nsqMessage */ + $nsqMessage = $generator->current(); - if (null === $nsqEnvelope) { + if (null === $nsqMessage) { return []; } try { - $encodedEnvelope = json_decode($nsqEnvelope->message->body, true, 512, JSON_THROW_ON_ERROR); + $encodedEnvelope = json_decode($nsqMessage->body, true, 512, JSON_THROW_ON_ERROR); } catch (JsonException $e) { - $nsqEnvelope->finish(); + $nsqMessage->finish(); throw new MessageDecodingFailedException('', 0, $e); } @@ -90,15 +90,15 @@ final class NsqTransport implements TransportInterface try { $envelope = $this->serializer->decode($encodedEnvelope); } catch (MessageDecodingFailedException $e) { - $nsqEnvelope->finish(); + $nsqMessage->finish(); throw $e; } return [ $envelope->with( - new NsqReceivedStamp($nsqEnvelope), - new TransportMessageIdStamp($nsqEnvelope->message->id), + new NsqReceivedStamp($nsqMessage), + new TransportMessageIdStamp($nsqMessage->id), ), ]; } @@ -108,8 +108,8 @@ final class NsqTransport implements TransportInterface */ public function ack(Envelope $envelope): void { - $message = $this->getNsqEnvelope($envelope); - if (!$message instanceof NsqEnvelope) { + $message = $this->getMessage($envelope); + if (!$message instanceof Message) { throw new LogicException('Returned envelop doesn\'t related to NsqMessage.'); } @@ -121,21 +121,21 @@ final class NsqTransport implements TransportInterface */ public function reject(Envelope $envelope): void { - $message = $this->getNsqEnvelope($envelope); - if (!$message instanceof NsqEnvelope) { + $message = $this->getMessage($envelope); + if (!$message instanceof Message) { throw new LogicException('Returned envelop doesn\'t related to NsqMessage.'); } $message->finish(); } - private function getNsqEnvelope(Envelope $envelope): ?NsqEnvelope + private function getMessage(Envelope $envelope): ?Message { $stamp = $envelope->last(NsqReceivedStamp::class); if (!$stamp instanceof NsqReceivedStamp) { return null; } - return $stamp->envelope; + return $stamp->message; } } diff --git a/src/Messenger/NsqTransportFactory.php b/src/Messenger/NsqTransportFactory.php index 558689b..af971c2 100644 --- a/src/Messenger/NsqTransportFactory.php +++ b/src/Messenger/NsqTransportFactory.php @@ -4,8 +4,12 @@ declare(strict_types=1); namespace Nsq\NsqBundle\Messenger; +use Nsq\Consumer; +use Nsq\Producer; use Nsq\Subscriber; -use Nsq\Writer; +use Psr\Log\LoggerAwareTrait; +use Psr\Log\LoggerInterface; +use Psr\Log\NullLogger; use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\TransportFactoryInterface; @@ -13,10 +17,16 @@ use Symfony\Component\Messenger\Transport\TransportInterface; use function parse_str; use function parse_url; use function sprintf; -use function strpos; final class NsqTransportFactory implements TransportFactoryInterface { + use LoggerAwareTrait; + + public function __construct(LoggerInterface $logger = null) + { + $this->logger = $logger ?? new NullLogger(); + } + /** * {@inheritdoc} */ @@ -34,8 +44,16 @@ final class NsqTransportFactory implements TransportFactoryInterface $address = sprintf('tcp://%s:%s', $parsedUrl['host'] ?? 'nsqd', $parsedUrl['port'] ?? 4150); return new NsqTransport( - new Writer($address), - new Subscriber($address), + new Producer( + address: $address, + logger: $this->logger, + ), + new Subscriber( + new Consumer( + address: $address, + logger: $this->logger, + ) + ), $nsqOptions['topic'] ?? 'symfony-messenger', $nsqOptions['channel'] ?? 'default', $serializer @@ -47,6 +65,6 @@ final class NsqTransportFactory implements TransportFactoryInterface */ public function supports(string $dsn, array $options): bool { - return 0 === strpos($dsn, 'nsq://'); + return str_starts_with($dsn, 'nsq://'); } }