From 6e37bbd1ae0aa8f6baa8c682bf10bd17d0808131 Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Sat, 23 Jan 2021 02:27:21 +0300 Subject: [PATCH] Prevent send RDY every iteration --- src/Consumer.php | 32 ++++++++++++++++++++++++++++++++ src/Subscriber.php | 23 +---------------------- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/src/Consumer.php b/src/Consumer.php index 6c207e2..9ed7e7d 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -6,6 +6,8 @@ namespace Nsq; class Consumer extends Connection { + private int $rdy = 0; + /** * Subscribe to a topic/channel. */ @@ -21,7 +23,33 @@ class Consumer extends Connection */ public function rdy(int $count): void { + if ($this->rdy === $count) { + return; + } + $this->send('RDY '.$count.PHP_EOL); + + $this->rdy = $count; + } + + public function consume(float $timeout): ?Message + { + $deadline = microtime(true) + $timeout; + + $response = $this->receive($timeout); + if (null === $response) { + return null; + } + + if ($response->isHeartBeat()) { + $this->nop(); + + return $this->consume( + ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime + ); + } + + return $response->toMessage($this); } /** @@ -30,6 +58,8 @@ class Consumer extends Connection public function fin(string $id): void { $this->send('FIN '.$id.PHP_EOL); + + --$this->rdy; } /** @@ -41,6 +71,8 @@ class Consumer extends Connection public function req(string $id, int $timeout): void { $this->send(sprintf('REQ %s %s', $id, $timeout).PHP_EOL); + + --$this->rdy; } /** diff --git a/src/Subscriber.php b/src/Subscriber.php index 3e28196..cb0c147 100644 --- a/src/Subscriber.php +++ b/src/Subscriber.php @@ -6,7 +6,6 @@ namespace Nsq; use Generator; use function get_debug_type; -use function microtime; use function sprintf; final class Subscriber @@ -31,7 +30,7 @@ final class Subscriber while (true) { $this->reader->rdy(1); - $command = yield $this->consume($timeout); + $command = yield $this->reader->consume($timeout); if (self::STOP === $command) { break; @@ -50,24 +49,4 @@ final class Subscriber $this->reader->disconnect(); } - - private function consume(float $timeout): ?Message - { - $deadline = microtime(true) + $timeout; - - $response = $this->reader->receive($timeout); - if (null === $response) { - return null; - } - - if ($response->isHeartBeat()) { - $this->reader->nop(); - - return $this->consume( - ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime - ); - } - - return $response->toMessage($this->reader); - } }