Handle Producer heartbeat messages to prevent disconnection from nsq side

This commit is contained in:
2021-01-26 19:58:41 +03:00
parent 4f0c69df3e
commit c6295ab8d3

View File

@@ -39,7 +39,7 @@ final class NsqTransport implements TransportInterface
Subscriber $subscriber, Subscriber $subscriber,
string $topic, string $topic,
string $channel, string $channel,
SerializerInterface $serializer = null SerializerInterface $serializer = null,
) { ) {
$this->producer = $producer; $this->producer = $producer;
$this->subscriber = $subscriber; $this->subscriber = $subscriber;
@@ -65,6 +65,8 @@ final class NsqTransport implements TransportInterface
*/ */
public function get(): iterable public function get(): iterable
{ {
$this->producer->receive(); // keepalive, handle heartbeat messages
$generator = $this->generator; $generator = $this->generator;
if (null === $generator) { if (null === $generator) {
$this->generator = $generator = $this->subscriber->subscribe($this->topic, $this->channel); $this->generator = $generator = $this->subscriber->subscribe($this->topic, $this->channel);