Move try/catch from readFrame to read
This commit is contained in:
@@ -7,9 +7,9 @@ namespace Nsq;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Config\ConnectionConfig;
|
||||
use Nsq\Exception\AuthenticationRequired;
|
||||
use Nsq\Exception\BadResponse;
|
||||
use Nsq\Exception\ConnectionFail;
|
||||
use Nsq\Exception\NsqError;
|
||||
use Nsq\Exception\BadResponse;
|
||||
use Nsq\Exception\NsqException;
|
||||
use Nsq\Exception\NullReceived;
|
||||
use Nsq\Protocol\Error;
|
||||
@@ -187,51 +187,30 @@ abstract class Connection
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
$buffer = $this->read(Bytes::BYTES_SIZE);
|
||||
$buffer = $this->read();
|
||||
|
||||
if ('' === $buffer->bytes()) {
|
||||
$this->disconnect();
|
||||
$this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL));
|
||||
|
||||
throw new ConnectionFail('Probably connection lost');
|
||||
}
|
||||
$frame = match ($type = $buffer->consumeType()) {
|
||||
0 => new Response($buffer->flush()),
|
||||
1 => new Error($buffer->flush()),
|
||||
2 => new Message(
|
||||
timestamp: $buffer->consumeTimestamp(),
|
||||
attempts: $buffer->consumeAttempts(),
|
||||
id: $buffer->consumeId(),
|
||||
body: $buffer->flush(),
|
||||
consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'),
|
||||
),
|
||||
default => throw new NsqException('Unexpected frame type: '.$type)
|
||||
};
|
||||
|
||||
$size = $buffer->consumeSize();
|
||||
if ($frame instanceof Response && $frame->isHeartBeat()) {
|
||||
$this->command('NOP');
|
||||
|
||||
do {
|
||||
$this->read($size);
|
||||
} while ($buffer->size() < $size);
|
||||
|
||||
$this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL));
|
||||
|
||||
$frame = match ($type = $buffer->consumeType()) {
|
||||
0 => new Response($buffer->flush()),
|
||||
1 => new Error($buffer->flush()),
|
||||
2 => new Message(
|
||||
timestamp: $buffer->consumeTimestamp(),
|
||||
attempts: $buffer->consumeAttempts(),
|
||||
id: $buffer->consumeId(),
|
||||
body: $buffer->flush(),
|
||||
consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'),
|
||||
),
|
||||
default => throw new NsqException('Unexpected frame type: '.$type)
|
||||
};
|
||||
|
||||
if ($frame instanceof Response && $frame->isHeartBeat()) {
|
||||
$this->command('NOP');
|
||||
|
||||
return $this->readFrame(
|
||||
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
|
||||
);
|
||||
}
|
||||
return $this->readFrame(
|
||||
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
|
||||
);
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->disconnect();
|
||||
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
|
||||
return $frame;
|
||||
}
|
||||
@@ -264,11 +243,38 @@ abstract class Connection
|
||||
throw new NsqException('Unreachable statement.');
|
||||
}
|
||||
|
||||
private function read(int $size): Buffer
|
||||
private function read(): Buffer
|
||||
{
|
||||
return $this->input->append(
|
||||
$this->socket()->read($size),
|
||||
);
|
||||
try {
|
||||
$socket = $this->socket();
|
||||
|
||||
$buffer = $this->input->append(
|
||||
$socket->read(Bytes::BYTES_SIZE),
|
||||
);
|
||||
|
||||
if ('' === $buffer->bytes()) {
|
||||
$this->disconnect();
|
||||
|
||||
throw new ConnectionFail('Probably connection lost');
|
||||
}
|
||||
|
||||
$size = $buffer->consumeSize();
|
||||
|
||||
do {
|
||||
$buffer->append(
|
||||
$socket->read($size),
|
||||
);
|
||||
} while ($buffer->size() < $size);
|
||||
|
||||
return $buffer;
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->disconnect();
|
||||
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
}
|
||||
|
||||
private function flush(): void
|
||||
|
Reference in New Issue
Block a user