Upgrade to nsq/nsq:4.0

This commit is contained in:
2021-01-26 15:34:09 +03:00
parent 92b778ffe2
commit cb9d26d6fa
4 changed files with 48 additions and 31 deletions

View File

@ -11,9 +11,8 @@
} }
], ],
"require": { "require": {
"php": ">=7.4",
"ext-json": "*", "ext-json": "*",
"nsq/nsq": "0.3", "nsq/nsq": "0.4",
"symfony/framework-bundle": "^5.0", "symfony/framework-bundle": "^5.0",
"symfony/messenger": "^5.0" "symfony/messenger": "^5.0"
}, },

View File

@ -4,7 +4,7 @@ declare(strict_types=1);
namespace Nsq\NsqBundle\Messenger; namespace Nsq\NsqBundle\Messenger;
use Nsq\Envelope; use Nsq\Message;
use Symfony\Component\Messenger\Stamp\StampInterface; use Symfony\Component\Messenger\Stamp\StampInterface;
/** /**
@ -12,10 +12,10 @@ use Symfony\Component\Messenger\Stamp\StampInterface;
*/ */
final class NsqReceivedStamp implements StampInterface final class NsqReceivedStamp implements StampInterface
{ {
public Envelope $envelope; public Message $message;
public function __construct(Envelope $envelope) public function __construct(Message $message)
{ {
$this->envelope = $envelope; $this->message = $message;
} }
} }

View File

@ -7,9 +7,9 @@ namespace Nsq\NsqBundle\Messenger;
use Generator; use Generator;
use JsonException; use JsonException;
use LogicException; use LogicException;
use Nsq\Envelope as NsqEnvelope; use Nsq\Producer;
use Nsq\Subscriber; use Nsq\Subscriber;
use Nsq\Writer; use Nsq\Message;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
@ -22,7 +22,7 @@ use const JSON_THROW_ON_ERROR;
final class NsqTransport implements TransportInterface final class NsqTransport implements TransportInterface
{ {
private Writer $writer; private Producer $producer;
private Subscriber $subscriber; private Subscriber $subscriber;
@ -35,13 +35,13 @@ final class NsqTransport implements TransportInterface
private string $channel; private string $channel;
public function __construct( public function __construct(
Writer $writer, Producer $producer,
Subscriber $subscriber, Subscriber $subscriber,
string $topic, string $topic,
string $channel, string $channel,
SerializerInterface $serializer = null SerializerInterface $serializer = null
) { ) {
$this->writer = $writer; $this->producer = $producer;
$this->subscriber = $subscriber; $this->subscriber = $subscriber;
$this->topic = $topic; $this->topic = $topic;
$this->channel = $channel; $this->channel = $channel;
@ -55,7 +55,7 @@ final class NsqTransport implements TransportInterface
{ {
$encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class)); $encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class));
$this->writer->pub($this->topic, json_encode($encodedMessage, JSON_THROW_ON_ERROR)); $this->producer->pub($this->topic, json_encode($encodedMessage, JSON_THROW_ON_ERROR));
return $envelope; return $envelope;
} }
@ -72,17 +72,17 @@ final class NsqTransport implements TransportInterface
$generator->next(); $generator->next();
} }
/** @var NsqEnvelope|null $nsqEnvelope */ /** @var Message|null $nsqMessage */
$nsqEnvelope = $generator->current(); $nsqMessage = $generator->current();
if (null === $nsqEnvelope) { if (null === $nsqMessage) {
return []; return [];
} }
try { try {
$encodedEnvelope = json_decode($nsqEnvelope->message->body, true, 512, JSON_THROW_ON_ERROR); $encodedEnvelope = json_decode($nsqMessage->body, true, 512, JSON_THROW_ON_ERROR);
} catch (JsonException $e) { } catch (JsonException $e) {
$nsqEnvelope->finish(); $nsqMessage->finish();
throw new MessageDecodingFailedException('', 0, $e); throw new MessageDecodingFailedException('', 0, $e);
} }
@ -90,15 +90,15 @@ final class NsqTransport implements TransportInterface
try { try {
$envelope = $this->serializer->decode($encodedEnvelope); $envelope = $this->serializer->decode($encodedEnvelope);
} catch (MessageDecodingFailedException $e) { } catch (MessageDecodingFailedException $e) {
$nsqEnvelope->finish(); $nsqMessage->finish();
throw $e; throw $e;
} }
return [ return [
$envelope->with( $envelope->with(
new NsqReceivedStamp($nsqEnvelope), new NsqReceivedStamp($nsqMessage),
new TransportMessageIdStamp($nsqEnvelope->message->id), new TransportMessageIdStamp($nsqMessage->id),
), ),
]; ];
} }
@ -108,8 +108,8 @@ final class NsqTransport implements TransportInterface
*/ */
public function ack(Envelope $envelope): void public function ack(Envelope $envelope): void
{ {
$message = $this->getNsqEnvelope($envelope); $message = $this->getMessage($envelope);
if (!$message instanceof NsqEnvelope) { if (!$message instanceof Message) {
throw new LogicException('Returned envelop doesn\'t related to NsqMessage.'); throw new LogicException('Returned envelop doesn\'t related to NsqMessage.');
} }
@ -121,21 +121,21 @@ final class NsqTransport implements TransportInterface
*/ */
public function reject(Envelope $envelope): void public function reject(Envelope $envelope): void
{ {
$message = $this->getNsqEnvelope($envelope); $message = $this->getMessage($envelope);
if (!$message instanceof NsqEnvelope) { if (!$message instanceof Message) {
throw new LogicException('Returned envelop doesn\'t related to NsqMessage.'); throw new LogicException('Returned envelop doesn\'t related to NsqMessage.');
} }
$message->finish(); $message->finish();
} }
private function getNsqEnvelope(Envelope $envelope): ?NsqEnvelope private function getMessage(Envelope $envelope): ?Message
{ {
$stamp = $envelope->last(NsqReceivedStamp::class); $stamp = $envelope->last(NsqReceivedStamp::class);
if (!$stamp instanceof NsqReceivedStamp) { if (!$stamp instanceof NsqReceivedStamp) {
return null; return null;
} }
return $stamp->envelope; return $stamp->message;
} }
} }

View File

@ -4,8 +4,12 @@ declare(strict_types=1);
namespace Nsq\NsqBundle\Messenger; namespace Nsq\NsqBundle\Messenger;
use Nsq\Consumer;
use Nsq\Producer;
use Nsq\Subscriber; use Nsq\Subscriber;
use Nsq\Writer; use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface; use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
@ -13,10 +17,16 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
use function parse_str; use function parse_str;
use function parse_url; use function parse_url;
use function sprintf; use function sprintf;
use function strpos;
final class NsqTransportFactory implements TransportFactoryInterface final class NsqTransportFactory implements TransportFactoryInterface
{ {
use LoggerAwareTrait;
public function __construct(LoggerInterface $logger = null)
{
$this->logger = $logger ?? new NullLogger();
}
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
@ -34,8 +44,16 @@ final class NsqTransportFactory implements TransportFactoryInterface
$address = sprintf('tcp://%s:%s', $parsedUrl['host'] ?? 'nsqd', $parsedUrl['port'] ?? 4150); $address = sprintf('tcp://%s:%s', $parsedUrl['host'] ?? 'nsqd', $parsedUrl['port'] ?? 4150);
return new NsqTransport( return new NsqTransport(
new Writer($address), new Producer(
new Subscriber($address), address: $address,
logger: $this->logger,
),
new Subscriber(
new Consumer(
address: $address,
logger: $this->logger,
)
),
$nsqOptions['topic'] ?? 'symfony-messenger', $nsqOptions['topic'] ?? 'symfony-messenger',
$nsqOptions['channel'] ?? 'default', $nsqOptions['channel'] ?? 'default',
$serializer $serializer
@ -47,6 +65,6 @@ final class NsqTransportFactory implements TransportFactoryInterface
*/ */
public function supports(string $dsn, array $options): bool public function supports(string $dsn, array $options): bool
{ {
return 0 === strpos($dsn, 'nsq://'); return str_starts_with($dsn, 'nsq://');
} }
} }