Prevent recreate ByteBuffer objects
This commit is contained in:
80
src/Buffer.php
Normal file
80
src/Buffer.php
Normal file
@@ -0,0 +1,80 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use PHPinnacle\Buffer\ByteBuffer;
|
||||
use const PHP_EOL;
|
||||
|
||||
final class Buffer
|
||||
{
|
||||
private ByteBuffer $buffer;
|
||||
|
||||
public function __construct(string $initial = '')
|
||||
{
|
||||
$this->buffer = new ByteBuffer($initial);
|
||||
}
|
||||
|
||||
public function append(string $data): self
|
||||
{
|
||||
$this->buffer->append($data);
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function appendCommand(string $command): void
|
||||
{
|
||||
$this->buffer->append($command.PHP_EOL);
|
||||
}
|
||||
|
||||
public function appendData(string $data): void
|
||||
{
|
||||
$this->buffer->appendUint32(\strlen($data));
|
||||
$this->buffer->append($data);
|
||||
}
|
||||
|
||||
public function consumeSize(): int
|
||||
{
|
||||
/** @see Bytes::BYTES_SIZE */
|
||||
return $this->buffer->consumeUint32();
|
||||
}
|
||||
|
||||
public function consumeType(): int
|
||||
{
|
||||
/** @see Bytes::BYTES_TYPE */
|
||||
return $this->buffer->consumeUint32();
|
||||
}
|
||||
|
||||
public function consumeTimestamp(): int
|
||||
{
|
||||
/** @see Bytes::BYTES_TIMESTAMP */
|
||||
return $this->buffer->consumeInt64();
|
||||
}
|
||||
|
||||
public function consumeAttempts(): int
|
||||
{
|
||||
/** @see Bytes::BYTES_ATTEMPTS */
|
||||
return $this->buffer->consumeUint16();
|
||||
}
|
||||
|
||||
public function consumeId(): string
|
||||
{
|
||||
return $this->buffer->consume(Bytes::BYTES_ID);
|
||||
}
|
||||
|
||||
public function size(): int
|
||||
{
|
||||
return $this->buffer->size();
|
||||
}
|
||||
|
||||
public function bytes(): string
|
||||
{
|
||||
return $this->buffer->bytes();
|
||||
}
|
||||
|
||||
public function flush(): string
|
||||
{
|
||||
return $this->buffer->flush();
|
||||
}
|
||||
}
|
@@ -18,7 +18,6 @@ use Nsq\Protocol\Message;
|
||||
use Nsq\Protocol\Response;
|
||||
use Nsq\Reconnect\ExponentialStrategy;
|
||||
use Nsq\Reconnect\ReconnectStrategy;
|
||||
use PHPinnacle\Buffer\ByteBuffer;
|
||||
use Psr\Log\LoggerAwareTrait;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
@@ -29,7 +28,6 @@ use function addcslashes;
|
||||
use function http_build_query;
|
||||
use function implode;
|
||||
use function json_encode;
|
||||
use function pack;
|
||||
use const JSON_FORCE_OBJECT;
|
||||
use const JSON_THROW_ON_ERROR;
|
||||
use const PHP_EOL;
|
||||
@@ -45,6 +43,10 @@ abstract class Connection
|
||||
|
||||
private string $address;
|
||||
|
||||
private Buffer $input;
|
||||
|
||||
private Buffer $output;
|
||||
|
||||
private ?Socket $socket = null;
|
||||
|
||||
private ReconnectStrategy $reconnect;
|
||||
@@ -61,6 +63,9 @@ abstract class Connection
|
||||
) {
|
||||
$this->address = $address;
|
||||
|
||||
$this->input = new Buffer();
|
||||
$this->output = new Buffer();
|
||||
|
||||
$this->logger = $logger ?? new NullLogger();
|
||||
$this->reconnect = $reconnectStrategy ?? new ExponentialStrategy(logger: $this->logger);
|
||||
$this->clientConfig = $clientConfig ?? new ClientConfig();
|
||||
@@ -144,30 +149,18 @@ abstract class Connection
|
||||
*/
|
||||
protected function command(string $command, array | string $params = [], string $data = null): self
|
||||
{
|
||||
$socket = $this->socket();
|
||||
|
||||
$buffer = [] === $params ? $command : implode(' ', [$command, ...((array) $params)]);
|
||||
$buffer .= PHP_EOL;
|
||||
$this->output->appendCommand(
|
||||
[] === $params
|
||||
? $command
|
||||
: implode(' ', [$command, ...((array) $params)]),
|
||||
);
|
||||
|
||||
if (null !== $data) {
|
||||
$buffer .= pack('N', \strlen($data));
|
||||
$buffer .= $data;
|
||||
$this->output->appendData($data);
|
||||
}
|
||||
|
||||
$this->logger->debug('Send buffer: '.addcslashes($buffer, PHP_EOL));
|
||||
$this->flush();
|
||||
|
||||
try {
|
||||
$socket->write($buffer);
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->disconnect();
|
||||
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
return $this;
|
||||
}
|
||||
|
||||
@@ -187,8 +180,6 @@ abstract class Connection
|
||||
|
||||
protected function readFrame(float $timeout = null): ?Frame
|
||||
{
|
||||
$socket = $this->socket();
|
||||
|
||||
$timeout ??= $this->clientConfig->readTimeout;
|
||||
$deadline = microtime(true) + $timeout;
|
||||
|
||||
@@ -197,36 +188,29 @@ abstract class Connection
|
||||
}
|
||||
|
||||
try {
|
||||
$size = $socket->read(Bytes::BYTES_SIZE);
|
||||
$buffer = $this->read(Bytes::BYTES_SIZE);
|
||||
|
||||
if ('' === $size) {
|
||||
if ('' === $buffer->bytes()) {
|
||||
$this->disconnect();
|
||||
|
||||
throw new ConnectionFail('Probably connection lost');
|
||||
}
|
||||
|
||||
$buffer = new ByteBuffer();
|
||||
|
||||
/** @phpstan-ignore-next-line */
|
||||
$size = unpack('N', $size)[1];
|
||||
$size = $buffer->consumeSize();
|
||||
|
||||
do {
|
||||
$chunk = $socket->read($size);
|
||||
|
||||
$buffer->append($chunk);
|
||||
|
||||
$size -= \strlen($chunk);
|
||||
} while (0 < $size);
|
||||
$this->read($size);
|
||||
} while ($buffer->size() < $size);
|
||||
|
||||
$this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL));
|
||||
|
||||
$frame = match ($type = $buffer->consumeUint32()) {
|
||||
$frame = match ($type = $buffer->consumeType()) {
|
||||
0 => new Response($buffer->flush()),
|
||||
1 => new Error($buffer->flush()),
|
||||
2 => new Message(
|
||||
timestamp: $buffer->consumeInt64(),
|
||||
attempts: $buffer->consumeUint16(),
|
||||
id: $buffer->consume(Bytes::BYTES_ID),
|
||||
timestamp: $buffer->consumeTimestamp(),
|
||||
attempts: $buffer->consumeAttempts(),
|
||||
id: $buffer->consumeId(),
|
||||
body: $buffer->flush(),
|
||||
consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'),
|
||||
),
|
||||
@@ -280,6 +264,35 @@ abstract class Connection
|
||||
throw new NsqException('Unreachable statement.');
|
||||
}
|
||||
|
||||
private function read(int $size): Buffer
|
||||
{
|
||||
return $this->input->append(
|
||||
$this->socket()->read($size),
|
||||
);
|
||||
}
|
||||
|
||||
private function flush(): void
|
||||
{
|
||||
$buffer = $this->output->flush();
|
||||
|
||||
$this->logger->debug('Send buffer: '.addcslashes($buffer, PHP_EOL));
|
||||
|
||||
try {
|
||||
$this->socket()->write(
|
||||
$buffer,
|
||||
);
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->disconnect();
|
||||
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
}
|
||||
|
||||
private function socket(): Socket
|
||||
{
|
||||
if (null === $this->socket) {
|
||||
|
Reference in New Issue
Block a user