From 9d2ef8e132420975e94499290ca2081267254cfc Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Thu, 8 Jul 2021 18:34:17 +0300 Subject: [PATCH] Fix: Prevent message duplication --- src/Messenger/NsqSender.php | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/src/Messenger/NsqSender.php b/src/Messenger/NsqSender.php index 4e29b9c..275a4ec 100644 --- a/src/Messenger/NsqSender.php +++ b/src/Messenger/NsqSender.php @@ -39,25 +39,19 @@ final class NsqSender implements SenderInterface $delayStamp = $envelope->last(DelayStamp::class); $delay = null !== $delayStamp ? $delayStamp->getDelay() : 0; - $promise = null; + if (null === $envelope->last(NsqReceivedStamp::class)) { + $encodedMessage = $this->serializer->encode($envelope); + $encodedMessage = json_encode($encodedMessage, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE); - if (null !== $envelope->last(NsqReceivedStamp::class)) { + wait($producer->publish($this->topic, $encodedMessage, $delay)); + } else { $message = NsqReceivedStamp::getMessageFromEnvelope($envelope); if (!$message->isProcessed()) { - $promise = $message->requeue($delay); + wait($message->requeue($delay)); } } - if (null === $promise) { - $encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class)); - $encodedMessage = json_encode($encodedMessage, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE); - - $promise = $producer->publish($this->topic, $encodedMessage, $delay); - } - - wait($promise); - return $envelope; }