diff --git a/src/Buffer.php b/src/Buffer.php new file mode 100644 index 0000000..e7a0446 --- /dev/null +++ b/src/Buffer.php @@ -0,0 +1,80 @@ +buffer = new ByteBuffer($initial); + } + + public function append(string $data): self + { + $this->buffer->append($data); + + return $this; + } + + public function appendCommand(string $command): void + { + $this->buffer->append($command.PHP_EOL); + } + + public function appendData(string $data): void + { + $this->buffer->appendUint32(\strlen($data)); + $this->buffer->append($data); + } + + public function consumeSize(): int + { + /** @see Bytes::BYTES_SIZE */ + return $this->buffer->consumeUint32(); + } + + public function consumeType(): int + { + /** @see Bytes::BYTES_TYPE */ + return $this->buffer->consumeUint32(); + } + + public function consumeTimestamp(): int + { + /** @see Bytes::BYTES_TIMESTAMP */ + return $this->buffer->consumeInt64(); + } + + public function consumeAttempts(): int + { + /** @see Bytes::BYTES_ATTEMPTS */ + return $this->buffer->consumeUint16(); + } + + public function consumeId(): string + { + return $this->buffer->consume(Bytes::BYTES_ID); + } + + public function size(): int + { + return $this->buffer->size(); + } + + public function bytes(): string + { + return $this->buffer->bytes(); + } + + public function flush(): string + { + return $this->buffer->flush(); + } +} diff --git a/src/Connection.php b/src/Connection.php index 55bf29d..a43527b 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -18,7 +18,6 @@ use Nsq\Protocol\Message; use Nsq\Protocol\Response; use Nsq\Reconnect\ExponentialStrategy; use Nsq\Reconnect\ReconnectStrategy; -use PHPinnacle\Buffer\ByteBuffer; use Psr\Log\LoggerAwareTrait; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -29,7 +28,6 @@ use function addcslashes; use function http_build_query; use function implode; use function json_encode; -use function pack; use const JSON_FORCE_OBJECT; use const JSON_THROW_ON_ERROR; use const PHP_EOL; @@ -45,6 +43,10 @@ abstract class Connection private string $address; + private Buffer $input; + + private Buffer $output; + private ?Socket $socket = null; private ReconnectStrategy $reconnect; @@ -61,6 +63,9 @@ abstract class Connection ) { $this->address = $address; + $this->input = new Buffer(); + $this->output = new Buffer(); + $this->logger = $logger ?? new NullLogger(); $this->reconnect = $reconnectStrategy ?? new ExponentialStrategy(logger: $this->logger); $this->clientConfig = $clientConfig ?? new ClientConfig(); @@ -144,30 +149,18 @@ abstract class Connection */ protected function command(string $command, array | string $params = [], string $data = null): self { - $socket = $this->socket(); - - $buffer = [] === $params ? $command : implode(' ', [$command, ...((array) $params)]); - $buffer .= PHP_EOL; + $this->output->appendCommand( + [] === $params + ? $command + : implode(' ', [$command, ...((array) $params)]), + ); if (null !== $data) { - $buffer .= pack('N', \strlen($data)); - $buffer .= $data; + $this->output->appendData($data); } - $this->logger->debug('Send buffer: '.addcslashes($buffer, PHP_EOL)); + $this->flush(); - try { - $socket->write($buffer); - } - // @codeCoverageIgnoreStart - catch (Exception $e) { - $this->disconnect(); - - $this->logger->error($e->getMessage(), ['exception' => $e]); - - throw ConnectionFail::fromThrowable($e); - } - // @codeCoverageIgnoreEnd return $this; } @@ -187,8 +180,6 @@ abstract class Connection protected function readFrame(float $timeout = null): ?Frame { - $socket = $this->socket(); - $timeout ??= $this->clientConfig->readTimeout; $deadline = microtime(true) + $timeout; @@ -197,36 +188,29 @@ abstract class Connection } try { - $size = $socket->read(Bytes::BYTES_SIZE); + $buffer = $this->read(Bytes::BYTES_SIZE); - if ('' === $size) { + if ('' === $buffer->bytes()) { $this->disconnect(); throw new ConnectionFail('Probably connection lost'); } - $buffer = new ByteBuffer(); - - /** @phpstan-ignore-next-line */ - $size = unpack('N', $size)[1]; + $size = $buffer->consumeSize(); do { - $chunk = $socket->read($size); - - $buffer->append($chunk); - - $size -= \strlen($chunk); - } while (0 < $size); + $this->read($size); + } while ($buffer->size() < $size); $this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL)); - $frame = match ($type = $buffer->consumeUint32()) { + $frame = match ($type = $buffer->consumeType()) { 0 => new Response($buffer->flush()), 1 => new Error($buffer->flush()), 2 => new Message( - timestamp: $buffer->consumeInt64(), - attempts: $buffer->consumeUint16(), - id: $buffer->consume(Bytes::BYTES_ID), + timestamp: $buffer->consumeTimestamp(), + attempts: $buffer->consumeAttempts(), + id: $buffer->consumeId(), body: $buffer->flush(), consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'), ), @@ -280,6 +264,35 @@ abstract class Connection throw new NsqException('Unreachable statement.'); } + private function read(int $size): Buffer + { + return $this->input->append( + $this->socket()->read($size), + ); + } + + private function flush(): void + { + $buffer = $this->output->flush(); + + $this->logger->debug('Send buffer: '.addcslashes($buffer, PHP_EOL)); + + try { + $this->socket()->write( + $buffer, + ); + } + // @codeCoverageIgnoreStart + catch (Exception $e) { + $this->disconnect(); + + $this->logger->error($e->getMessage(), ['exception' => $e]); + + throw ConnectionFail::fromThrowable($e); + } + // @codeCoverageIgnoreEnd + } + private function socket(): Socket { if (null === $this->socket) {