From e3485416a5b098d4eeed6af06f40450b5ecaebdc Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Sat, 30 Jan 2021 18:37:38 +0300 Subject: [PATCH] Move try/catch from readFrame to read --- src/Connection.php | 96 ++++++++++++++++++++++++---------------------- 1 file changed, 51 insertions(+), 45 deletions(-) diff --git a/src/Connection.php b/src/Connection.php index a43527b..74e92d9 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -7,9 +7,9 @@ namespace Nsq; use Nsq\Config\ClientConfig; use Nsq\Config\ConnectionConfig; use Nsq\Exception\AuthenticationRequired; +use Nsq\Exception\BadResponse; use Nsq\Exception\ConnectionFail; use Nsq\Exception\NsqError; -use Nsq\Exception\BadResponse; use Nsq\Exception\NsqException; use Nsq\Exception\NullReceived; use Nsq\Protocol\Error; @@ -187,51 +187,30 @@ abstract class Connection return null; } - try { - $buffer = $this->read(Bytes::BYTES_SIZE); + $buffer = $this->read(); - if ('' === $buffer->bytes()) { - $this->disconnect(); + $this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL)); - throw new ConnectionFail('Probably connection lost'); - } + $frame = match ($type = $buffer->consumeType()) { + 0 => new Response($buffer->flush()), + 1 => new Error($buffer->flush()), + 2 => new Message( + timestamp: $buffer->consumeTimestamp(), + attempts: $buffer->consumeAttempts(), + id: $buffer->consumeId(), + body: $buffer->flush(), + consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'), + ), + default => throw new NsqException('Unexpected frame type: '.$type) + }; - $size = $buffer->consumeSize(); + if ($frame instanceof Response && $frame->isHeartBeat()) { + $this->command('NOP'); - do { - $this->read($size); - } while ($buffer->size() < $size); - - $this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL)); - - $frame = match ($type = $buffer->consumeType()) { - 0 => new Response($buffer->flush()), - 1 => new Error($buffer->flush()), - 2 => new Message( - timestamp: $buffer->consumeTimestamp(), - attempts: $buffer->consumeAttempts(), - id: $buffer->consumeId(), - body: $buffer->flush(), - consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'), - ), - default => throw new NsqException('Unexpected frame type: '.$type) - }; - - if ($frame instanceof Response && $frame->isHeartBeat()) { - $this->command('NOP'); - - return $this->readFrame( - ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime - ); - } + return $this->readFrame( + ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime + ); } - // @codeCoverageIgnoreStart - catch (Exception $e) { - $this->disconnect(); - - throw ConnectionFail::fromThrowable($e); - } - // @codeCoverageIgnoreEnd return $frame; } @@ -264,11 +243,38 @@ abstract class Connection throw new NsqException('Unreachable statement.'); } - private function read(int $size): Buffer + private function read(): Buffer { - return $this->input->append( - $this->socket()->read($size), - ); + try { + $socket = $this->socket(); + + $buffer = $this->input->append( + $socket->read(Bytes::BYTES_SIZE), + ); + + if ('' === $buffer->bytes()) { + $this->disconnect(); + + throw new ConnectionFail('Probably connection lost'); + } + + $size = $buffer->consumeSize(); + + do { + $buffer->append( + $socket->read($size), + ); + } while ($buffer->size() < $size); + + return $buffer; + } + // @codeCoverageIgnoreStart + catch (Exception $e) { + $this->disconnect(); + + throw ConnectionFail::fromThrowable($e); + } + // @codeCoverageIgnoreEnd } private function flush(): void