diff --git a/src/Connection.php b/src/Connection.php index 8f610c5..fac6a70 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -134,14 +134,24 @@ abstract class Connection public function receive(float $timeout = 0): ?Response { $socket = $this->socket(); + $deadline = microtime(true) + $timeout; if (false === $socket->selectRead($timeout)) { return null; } $size = (new ByteBuffer($socket->read(Bytes::BYTES_SIZE)))->consumeUint32(); + $response = new Response(new ByteBuffer($socket->read($size))); - return new Response(new ByteBuffer($socket->read($size))); + if ($response->isHeartBeat()) { + $this->send('NOP'.PHP_EOL); + + return $this->receive( + ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime + ); + } + + return $response; } protected function sendWithResponse(string $buffer): Response diff --git a/src/Consumer.php b/src/Consumer.php index 54787e8..495bb55 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -34,22 +34,7 @@ final class Consumer extends Connection public function consume(float $timeout): ?Message { - $deadline = microtime(true) + $timeout; - - $response = $this->receive($timeout); - if (null === $response) { - return null; - } - - if ($response->isHeartBeat()) { - $this->nop(); - - return $this->consume( - ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime - ); - } - - return $response->toMessage($this); + return $this->receive($timeout)?->toMessage($this); } /** @@ -82,9 +67,4 @@ final class Consumer extends Connection { $this->send('TOUCH '.$id.PHP_EOL); } - - public function nop(): void - { - $this->send('NOP'.PHP_EOL); - } }