getProducer(); $encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class)); $encodedMessage = json_encode($encodedMessage, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE); /** @var DelayStamp|null $delayStamp */ $delayStamp = $envelope->last(DelayStamp::class); $delay = null !== $delayStamp ? $delayStamp->getDelay() : null; if (null === $delay) { $promise = $producer->publish($this->topic, $encodedMessage); } else { $promise = $producer->defer($this->topic, $encodedMessage, $delay); } wait($promise); return $envelope; } /** * {@inheritdoc} */ public function get(): iterable { $message = array_pop($this->messages); if (null === $message) { $this->getConsumer(); wait(delay(500)); $message = array_pop($this->messages); } if (null === $message) { return []; } try { $encodedEnvelope = json_decode($message->body, true, 512, JSON_THROW_ON_ERROR); } catch (JsonException $e) { wait($message->finish()); throw new MessageDecodingFailedException('', 0, $e); } try { $envelope = $this->serializer->decode($encodedEnvelope); } catch (MessageDecodingFailedException $e) { wait($message->finish()); throw $e; } return [ $envelope->with( new NsqReceivedStamp($message), new TransportMessageIdStamp($message->id), ), ]; } /** * {@inheritdoc} */ public function ack(Envelope $envelope): void { $message = NsqReceivedStamp::getMessageFromEnvelope($envelope); wait($message->finish()); } /** * {@inheritdoc} */ public function reject(Envelope $envelope): void { $message = NsqReceivedStamp::getMessageFromEnvelope($envelope); wait($message->finish()); } private function getProducer(): Producer { if (null === $this->producer) { $this->producer = new Producer( $this->address, $this->clientConfig, $this->logger, ); } wait($this->producer->connect()); return $this->producer; } private function getConsumer(): Consumer { if (null === $this->consumer) { $this->consumer = new Consumer( $this->address, $this->topic, $this->channel, function (Message $message) { $this->messages[] = $message; }, $this->clientConfig, $this->logger, ); } wait($this->consumer->connect()); return $this->consumer; } }