From 72dca5c73b40bae74d3dde36248de41e9d83cd10 Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Sat, 30 Jan 2021 20:07:28 +0300 Subject: [PATCH] Wrap Socket --- src/Buffer.php | 12 -- src/Config/ClientConfig.php | 8 ++ src/Connection.php | 240 +++++++++------------------------ src/Consumer.php | 39 ++++-- src/Exception/NotConnected.php | 9 ++ src/NsqSocket.php | 73 ++++++++++ src/Producer.php | 9 +- src/Socket/RawSocket.php | 81 +++++++++++ src/Socket/Socket.php | 27 ++++ tests/NsqTest.php | 4 +- 10 files changed, 288 insertions(+), 214 deletions(-) create mode 100644 src/Exception/NotConnected.php create mode 100644 src/NsqSocket.php create mode 100644 src/Socket/RawSocket.php create mode 100644 src/Socket/Socket.php diff --git a/src/Buffer.php b/src/Buffer.php index e7a0446..7cfad1c 100644 --- a/src/Buffer.php +++ b/src/Buffer.php @@ -5,7 +5,6 @@ declare(strict_types=1); namespace Nsq; use PHPinnacle\Buffer\ByteBuffer; -use const PHP_EOL; final class Buffer { @@ -23,17 +22,6 @@ final class Buffer return $this; } - public function appendCommand(string $command): void - { - $this->buffer->append($command.PHP_EOL); - } - - public function appendData(string $data): void - { - $this->buffer->appendUint32(\strlen($data)); - $this->buffer->append($data); - } - public function consumeSize(): int { /** @see Bytes::BYTES_SIZE */ diff --git a/src/Config/ClientConfig.php b/src/Config/ClientConfig.php index 8117233..db7cd20 100644 --- a/src/Config/ClientConfig.php +++ b/src/Config/ClientConfig.php @@ -8,6 +8,9 @@ use Composer\InstalledVersions; use InvalidArgumentException; use JsonSerializable; use function gethostname; +use function json_encode; +use const JSON_FORCE_OBJECT; +use const JSON_THROW_ON_ERROR; /** * This class is used for configuring the clients for nsq. Immutable properties must be set when creating the object and @@ -136,4 +139,9 @@ final class ClientConfig implements JsonSerializable 'user_agent' => $this->userAgent, ]; } + + public function toString(): string + { + return json_encode($this, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT); + } } diff --git a/src/Connection.php b/src/Connection.php index 74e92d9..7b9041f 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -9,139 +9,98 @@ use Nsq\Config\ConnectionConfig; use Nsq\Exception\AuthenticationRequired; use Nsq\Exception\BadResponse; use Nsq\Exception\ConnectionFail; +use Nsq\Exception\NotConnected; use Nsq\Exception\NsqError; use Nsq\Exception\NsqException; -use Nsq\Exception\NullReceived; use Nsq\Protocol\Error; use Nsq\Protocol\Frame; use Nsq\Protocol\Message; use Nsq\Protocol\Response; -use Nsq\Reconnect\ExponentialStrategy; -use Nsq\Reconnect\ReconnectStrategy; +use Nsq\Socket\RawSocket; use Psr\Log\LoggerAwareTrait; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; -use Socket\Raw\Exception; -use Socket\Raw\Factory; -use Socket\Raw\Socket; +use Throwable; use function addcslashes; use function http_build_query; use function implode; -use function json_encode; -use const JSON_FORCE_OBJECT; -use const JSON_THROW_ON_ERROR; use const PHP_EOL; /** * @internal - * - * @property ConnectionConfig $connectionConfig */ abstract class Connection { use LoggerAwareTrait; - private string $address; + protected ClientConfig $clientConfig; - private Buffer $input; + private NsqSocket $socket; - private Buffer $output; + private ConnectionConfig $connectionConfig; - private ?Socket $socket = null; - - private ReconnectStrategy $reconnect; - - private ClientConfig $clientConfig; - - private ?ConnectionConfig $connectionConfig = null; + private bool $closed = false; public function __construct( - string $address, + private string $address, ClientConfig $clientConfig = null, - ReconnectStrategy $reconnectStrategy = null, LoggerInterface $logger = null, ) { - $this->address = $address; - - $this->input = new Buffer(); - $this->output = new Buffer(); - $this->logger = $logger ?? new NullLogger(); - $this->reconnect = $reconnectStrategy ?? new ExponentialStrategy(logger: $this->logger); $this->clientConfig = $clientConfig ?? new ClientConfig(); - } - public function connect(): void - { - $this->reconnect->connect(function (): void { - try { - $this->socket = (new Factory())->createClient($this->address); - } - // @codeCoverageIgnoreStart - catch (Exception $e) { - $this->logger->error('Connecting to {address} failed.', ['address' => $this->address]); + $socket = new RawSocket($this->address, $this->logger); + $socket->write(' V2'); - throw ConnectionFail::fromThrowable($e); - } - // @codeCoverageIgnoreEnd + $this->socket = new NsqSocket($socket); - $this->socket->write(' V2'); + $this->connectionConfig = ConnectionConfig::fromArray( + $this + ->command('IDENTIFY', data: $this->clientConfig->toString()) + ->readResponse() + ->toArray() + ); - $body = json_encode($this->clientConfig, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT); + if ($this->connectionConfig->snappy || $this->connectionConfig->deflate) { + $this->checkIsOK(); + } - $this->connectionConfig = ConnectionConfig::fromArray( - $this - ->command('IDENTIFY', data: $body) - ->readResponse() - ->toArray() - ); - - if ($this->connectionConfig->snappy || $this->connectionConfig->deflate) { - $this->checkIsOK(); + if ($this->connectionConfig->authRequired) { + if (null === $this->clientConfig->authSecret) { + throw new AuthenticationRequired(); } - if ($this->connectionConfig->authRequired) { - if (null === $this->clientConfig->authSecret) { - throw new AuthenticationRequired(); - } + $authResponse = $this + ->command('AUTH', data: $this->clientConfig->authSecret) + ->readResponse() + ->toArray() + ; - $authResponse = $this - ->command('AUTH', data: $this->clientConfig->authSecret) - ->readResponse() - ->toArray() - ; - - $this->logger->info('Authorization response: '.http_build_query($authResponse)); - } - }); + $this->logger->info('Authorization response: '.http_build_query($authResponse)); + } } /** * Cleanly close your connection (no more messages are sent). */ - public function disconnect(): void + public function close(): void { - if (null === $this->socket) { + if ($this->closed) { return; } try { - $this->socket->write('CLS'.PHP_EOL); + $this->command('CLS'); $this->socket->close(); + } catch (Throwable) { } - // @codeCoverageIgnoreStart - catch (Exception $e) { - $this->logger->debug($e->getMessage(), ['exception' => $e]); - } - // @codeCoverageIgnoreEnd - $this->socket = null; - $this->connectionConfig = null; + $this->closed = true; } - public function isReady(): bool + public function isClosed(): bool { - return null !== $this->socket; + return $this->closed; } /** @@ -149,49 +108,45 @@ abstract class Connection */ protected function command(string $command, array | string $params = [], string $data = null): self { - $this->output->appendCommand( - [] === $params - ? $command - : implode(' ', [$command, ...((array) $params)]), - ); - - if (null !== $data) { - $this->output->appendData($data); + if ($this->closed) { + throw new NotConnected('Connection closed.'); } - $this->flush(); + $command = [] === $params + ? $command + : implode(' ', [$command, ...((array) $params)]); + + $this->socket->write($command, $data); return $this; } - public function hasMessage(float $timeout = 0): bool + public function hasMessage(float $timeout): bool { - try { - return false !== $this->socket()->selectRead($timeout); + if ($this->closed) { + throw new NotConnected('Connection closed.'); } - // @codeCoverageIgnoreStart - catch (Exception $e) { - $this->disconnect(); - throw ConnectionFail::fromThrowable($e); + try { + return false !== $this->socket->wait($timeout); + } catch (ConnectionFail $e) { + $this->close(); + + throw $e; } - // @codeCoverageIgnoreEnd } - protected function readFrame(float $timeout = null): ?Frame + protected function readFrame(): Frame { - $timeout ??= $this->clientConfig->readTimeout; - $deadline = microtime(true) + $timeout; - - if (!$this->hasMessage($timeout)) { - return null; + if ($this->closed) { + throw new NotConnected('Connection closed.'); } - $buffer = $this->read(); + $buffer = $this->socket->read(); $this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL)); - $frame = match ($type = $buffer->consumeType()) { + return match ($type = $buffer->consumeType()) { 0 => new Response($buffer->flush()), 1 => new Error($buffer->flush()), 2 => new Message( @@ -203,16 +158,6 @@ abstract class Connection ), default => throw new NsqException('Unexpected frame type: '.$type) }; - - if ($frame instanceof Response && $frame->isHeartBeat()) { - $this->command('NOP'); - - return $this->readFrame( - ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime - ); - } - - return $frame; } protected function checkIsOK(): void @@ -226,7 +171,7 @@ abstract class Connection private function readResponse(): Response { - $frame = $this->readFrame() ?? throw new NullReceived(); + $frame = $this->readFrame(); if ($frame instanceof Response) { return $frame; @@ -234,7 +179,7 @@ abstract class Connection if ($frame instanceof Error) { if ($frame->type->terminateConnection) { - $this->disconnect(); + $this->close(); } throw new NsqError($frame); @@ -242,69 +187,4 @@ abstract class Connection throw new NsqException('Unreachable statement.'); } - - private function read(): Buffer - { - try { - $socket = $this->socket(); - - $buffer = $this->input->append( - $socket->read(Bytes::BYTES_SIZE), - ); - - if ('' === $buffer->bytes()) { - $this->disconnect(); - - throw new ConnectionFail('Probably connection lost'); - } - - $size = $buffer->consumeSize(); - - do { - $buffer->append( - $socket->read($size), - ); - } while ($buffer->size() < $size); - - return $buffer; - } - // @codeCoverageIgnoreStart - catch (Exception $e) { - $this->disconnect(); - - throw ConnectionFail::fromThrowable($e); - } - // @codeCoverageIgnoreEnd - } - - private function flush(): void - { - $buffer = $this->output->flush(); - - $this->logger->debug('Send buffer: '.addcslashes($buffer, PHP_EOL)); - - try { - $this->socket()->write( - $buffer, - ); - } - // @codeCoverageIgnoreStart - catch (Exception $e) { - $this->disconnect(); - - $this->logger->error($e->getMessage(), ['exception' => $e]); - - throw ConnectionFail::fromThrowable($e); - } - // @codeCoverageIgnoreEnd - } - - private function socket(): Socket - { - if (null === $this->socket) { - $this->connect(); - } - - return $this->socket ?? throw new ConnectionFail('This connection is closed, create new one.'); - } } diff --git a/src/Consumer.php b/src/Consumer.php index 9a10813..838cf6a 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -10,8 +10,9 @@ use Nsq\Exception\NsqError; use Nsq\Exception\NsqException; use Nsq\Protocol\Error; use Nsq\Protocol\Message; -use Nsq\Reconnect\ReconnectStrategy; +use Nsq\Protocol\Response; use Psr\Log\LoggerInterface; +use function microtime; final class Consumer extends Connection { @@ -22,17 +23,9 @@ final class Consumer extends Connection private string $channel, string $address, ClientConfig $clientConfig = null, - ReconnectStrategy $reconnectStrategy = null, LoggerInterface $logger = null ) { - parent::__construct($address, $clientConfig, $reconnectStrategy, $logger); - } - - public function connect(): void - { - parent::connect(); - - $this->command('SUB', [$this->topic, $this->channel])->checkIsOK(); + parent::__construct($address, $clientConfig, $logger); } /** @@ -40,30 +33,48 @@ final class Consumer extends Connection */ public function generator(): Generator { + $this->command('SUB', [$this->topic, $this->channel])->checkIsOK(); + while (true) { $this->rdy(1); - $command = yield $this->readMessage(); + $timeout = $this->clientConfig->readTimeout; + + do { + $deadline = microtime(true) + $timeout; + + $message = $this->hasMessage($timeout) ? $this->readMessage() : null; + + $timeout = ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime; + } while (0 < $timeout && null === $message); + + $command = yield $message; if (0 === $command) { break; } } - $this->disconnect(); + $this->close(); } public function readMessage(): ?Message { $frame = $this->readFrame(); - if ($frame instanceof Message || null === $frame) { + if ($frame instanceof Message) { return $frame; } + if ($frame instanceof Response && $frame->isHeartBeat()) { + $this->command('NOP'); + + return null; + } + if ($frame instanceof Error) { if ($frame->type->terminateConnection) { - $this->disconnect(); + $this->close(); } throw new NsqError($frame); diff --git a/src/Exception/NotConnected.php b/src/Exception/NotConnected.php new file mode 100644 index 0000000..e184835 --- /dev/null +++ b/src/Exception/NotConnected.php @@ -0,0 +1,9 @@ +input = new Buffer(); + $this->output = new ByteBuffer(); + } + + public function write(string $command, string $data = null): void + { + $this->output->append($command.PHP_EOL); + + if (null !== $data) { + $this->output->appendUint32(\strlen($data)); + $this->output->append($data); + } + + $this->socket->write($this->output->flush()); + } + + public function wait(float $timeout): bool + { + return $this->socket->selectRead($timeout); + } + + public function read(): Buffer + { + $buffer = $this->input; + + $buffer->append( + $this->socket->read(Bytes::BYTES_SIZE), + ); + + if ('' === $buffer->bytes()) { + throw new ConnectionFail('Probably connection closed.'); + } + + $size = $buffer->consumeSize(); + + do { + $buffer->append( + $this->socket->read($size - $buffer->size()), + ); + } while ($buffer->size() < $size); + + return $buffer; + } + + public function close(): void + { + try { + $this->socket->close(); + } catch (Throwable) { + } + } +} diff --git a/src/Producer.php b/src/Producer.php index 44a3984..a226fb2 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -8,11 +8,11 @@ use function array_map; use function implode; use function pack; +/** + * @psalm-suppress PropertyNotSetInConstructor + */ final class Producer extends Connection { - /** - * @psalm-suppress PossiblyFalseOperand - */ public function pub(string $topic, string $body): void { $this->command('PUB', $topic, $body)->checkIsOK(); @@ -34,9 +34,6 @@ final class Producer extends Connection $this->command('MPUB', $topic, $num.$mb)->checkIsOK(); } - /** - * @psalm-suppress PossiblyFalseOperand - */ public function dpub(string $topic, string $body, int $delay): void { $this->command('DPUB', [$topic, $delay], $body)->checkIsOK(); diff --git a/src/Socket/RawSocket.php b/src/Socket/RawSocket.php new file mode 100644 index 0000000..e661682 --- /dev/null +++ b/src/Socket/RawSocket.php @@ -0,0 +1,81 @@ +socket = (new Factory())->createClient($address); + $this->logger = $logger ?? new NullLogger(); + } + + /** + * {@inheritDoc} + */ + public function selectRead(float $timeout): bool + { + try { + return false !== $this->socket->selectRead($timeout); + } // @codeCoverageIgnoreStart + catch (Exception $e) { + throw ConnectionFail::fromThrowable($e); + } + // @codeCoverageIgnoreEnd + } + + /** + * {@inheritDoc} + */ + public function close(): void + { + try { + $this->socket->close(); + } catch (Throwable) { + } + } + + /** + * {@inheritDoc} + */ + public function write(string $data): void + { + try { + $this->socket->write($data); + } // @codeCoverageIgnoreStart + catch (Exception $e) { + $this->logger->error($e->getMessage(), ['exception' => $e]); + + throw ConnectionFail::fromThrowable($e); + } + // @codeCoverageIgnoreEnd + } + + /** + * {@inheritDoc} + */ + public function read(int $length): string + { + try { + return $this->socket->read($length); + } // @codeCoverageIgnoreStart + catch (Exception $e) { + throw ConnectionFail::fromThrowable($e); + } + // @codeCoverageIgnoreEnd + } +} diff --git a/src/Socket/Socket.php b/src/Socket/Socket.php new file mode 100644 index 0000000..20b0e49 --- /dev/null +++ b/src/Socket/Socket.php @@ -0,0 +1,27 @@ +touch(); $message->finish(); - self::assertTrue($consumer->isReady()); + self::assertFalse($consumer->isClosed()); $generator->send(0); - self::assertFalse($consumer->isReady()); + self::assertTrue($consumer->isClosed()); } }