From ebf82cf47842f136d18d428b6ef0d2183e67acc7 Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Sun, 24 Jan 2021 20:51:01 +0300 Subject: [PATCH] Reconnect --- src/Connection.php | 133 +++++++++++++++++--------- src/Exception/UnexpectedResponse.php | 7 ++ src/Reconnect/ExponentialStrategy.php | 64 +++++++++++++ src/Reconnect/RealTimeProvider.php | 13 +++ src/Reconnect/ReconnectStrategy.php | 15 +++ src/Reconnect/TimeProvider.php | 10 ++ src/Response.php | 6 ++ tests/ExponentialStrategyTest.php | 90 +++++++++++++++++ tests/NsqTest.php | 4 +- 9 files changed, 294 insertions(+), 48 deletions(-) create mode 100644 src/Reconnect/ExponentialStrategy.php create mode 100644 src/Reconnect/RealTimeProvider.php create mode 100644 src/Reconnect/ReconnectStrategy.php create mode 100644 src/Reconnect/TimeProvider.php create mode 100644 tests/ExponentialStrategyTest.php diff --git a/src/Connection.php b/src/Connection.php index 7b41152..f53764d 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -7,6 +7,8 @@ namespace Nsq; use Composer\InstalledVersions; use Nsq\Exception\ConnectionFail; use Nsq\Exception\UnexpectedResponse; +use Nsq\Reconnect\ExponentialStrategy; +use Nsq\Reconnect\ReconnectStrategy; use PHPinnacle\Buffer\ByteBuffer; use Psr\Log\LoggerAwareTrait; use Psr\Log\LoggerInterface; @@ -14,7 +16,7 @@ use Psr\Log\NullLogger; use Socket\Raw\Exception; use Socket\Raw\Factory; use Socket\Raw\Socket; -use Throwable; +use function addcslashes; use function json_encode; use function pack; use const JSON_FORCE_OBJECT; @@ -32,7 +34,7 @@ abstract class Connection private ?Socket $socket = null; - private bool $closed = false; + private ReconnectStrategy $reconnect; /** * @var array{ @@ -51,37 +53,44 @@ abstract class Connection string $userAgent = null, int $heartbeatInterval = null, int $sampleRate = 0, + ReconnectStrategy $reconnectStrategy = null, LoggerInterface $logger = null, ) { $this->address = $address; $this->features = [ 'client_id' => $clientId ?? '', - 'hostname' => $hostname ?? (static fn (mixed $host): string => \is_string($host) ? $host : '')(gethostname()), + 'hostname' => $hostname ?? (static fn (mixed $h): string => \is_string($h) ? $h : '')(gethostname()), 'user_agent' => $userAgent ?? 'nsqphp/'.InstalledVersions::getPrettyVersion('nsq/nsq'), 'heartbeat_interval' => $heartbeatInterval, 'sample_rate' => $sampleRate, ]; $this->logger = $logger ?? new NullLogger(); + $this->reconnect = $reconnectStrategy ?? new ExponentialStrategy(logger: $this->logger); } public function connect(): void { - try { - $this->socket = (new Factory())->createClient($this->address); - } catch (Exception $e) { - throw ConnectionFail::fromThrowable($e); - } + $this->reconnect->connect(function (): void { + try { + $this->socket = (new Factory())->createClient($this->address); + } + // @codeCoverageIgnoreStart + catch (Exception $e) { + $this->logger->error('Connecting to {address} failed.', ['address' => $this->address]); - $this->send(' V2'); + throw ConnectionFail::fromThrowable($e); + } + // @codeCoverageIgnoreEnd - $body = json_encode($this->features, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT); - $size = pack('N', \strlen($body)); + $this->send(' V2'); - $this->logger->info('Feature Negotiation: '.http_build_query($this->features)); + $body = json_encode($this->features, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT); + $size = pack('N', \strlen($body)); - $this->sendWithResponse('IDENTIFY '.PHP_EOL.$size.$body)->okOrFail(); + $this->sendWithResponse('IDENTIFY '.PHP_EOL.$size.$body)->okOrFail(); + }); } /** @@ -89,26 +98,26 @@ abstract class Connection */ public function disconnect(): void { - if ($this->closed) { + if (null === $this->socket) { return; } try { - $this->send('CLS'.PHP_EOL); - - if (null !== $this->socket) { - $this->socket->close(); - } - } catch (Throwable $e) { + $this->socket->write('CLS'.PHP_EOL); + $this->socket->close(); + } + // @codeCoverageIgnoreStart + catch (Exception $e) { $this->logger->debug($e->getMessage(), ['exception' => $e]); } + // @codeCoverageIgnoreEnd - $this->closed = true; + $this->socket = null; } - public function isClosed(): bool + public function isReady(): bool { - return $this->closed; + return null !== $this->socket; } /** @@ -129,63 +138,95 @@ abstract class Connection try { $socket->write($buffer); - } catch (Exception $e) { - $this->closed = true; + } + // @codeCoverageIgnoreStart + catch (Exception $e) { + $this->disconnect(); $this->logger->error($e->getMessage(), ['exception' => $e]); throw ConnectionFail::fromThrowable($e); } + // @codeCoverageIgnoreEnd return $this; } + public function hasMessage(float $timeout = 0): bool + { + try { + return false !== $this->socket()->selectRead($timeout); + } + // @codeCoverageIgnoreStart + catch (Exception $e) { + $this->disconnect(); + + throw ConnectionFail::fromThrowable($e); + } + // @codeCoverageIgnoreEnd + } + public function receive(float $timeout = 0): ?Response { $socket = $this->socket(); $deadline = microtime(true) + $timeout; - if (false === $socket->selectRead($timeout)) { + if (!$this->hasMessage($timeout)) { return null; } - $size = (new ByteBuffer($socket->read(Bytes::BYTES_SIZE)))->consumeUint32(); - $response = new Response(new ByteBuffer($socket->read($size))); + try { + $size = $socket->read(Bytes::BYTES_SIZE); - if ($response->isHeartBeat()) { - $this->send('NOP'.PHP_EOL); + if ('' === $size) { + $this->disconnect(); - return $this->receive( - ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime + throw new ConnectionFail('Probably connection lost'); + } + + $buffer = new ByteBuffer( + $socket->read( + // @phpstan-ignore-next-line + unpack('N', $size)[1] + ) ); + + $this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL)); + + $response = new Response($buffer); + + if ($response->isHeartBeat()) { + $this->send('NOP'.PHP_EOL); + + return $this->receive( + ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime + ); + } } + // @codeCoverageIgnoreStart + catch (Exception $e) { + $this->disconnect(); + + throw ConnectionFail::fromThrowable($e); + } + // @codeCoverageIgnoreEnd return $response; } protected function sendWithResponse(string $buffer): Response { - $this->send($buffer); - - $response = $this->receive(0.1); - - if (null === $response) { - throw new UnexpectedResponse('Response was expected, but null received.'); - } - - return $response; + return $this + ->send($buffer) + ->receive(1) ?? throw UnexpectedResponse::null(); } private function socket(): Socket { - if ($this->closed) { - throw new ConnectionFail('This connection is closed, create new one.'); - } - if (null === $this->socket) { $this->connect(); } - return $this->socket; + return $this->socket ?? throw new ConnectionFail('This connection is closed, create new one.'); } } diff --git a/src/Exception/UnexpectedResponse.php b/src/Exception/UnexpectedResponse.php index c02dfe0..3cab4cf 100644 --- a/src/Exception/UnexpectedResponse.php +++ b/src/Exception/UnexpectedResponse.php @@ -8,4 +8,11 @@ use RuntimeException; final class UnexpectedResponse extends RuntimeException implements NsqException { + /** + * @codeCoverageIgnore + */ + public static function null(): self + { + return new self('Response was expected, but null received.'); + } } diff --git a/src/Reconnect/ExponentialStrategy.php b/src/Reconnect/ExponentialStrategy.php new file mode 100644 index 0000000..13a2680 --- /dev/null +++ b/src/Reconnect/ExponentialStrategy.php @@ -0,0 +1,64 @@ +delay = 0; + $this->timeProvider = $timeProvider ?? new RealTimeProvider(); + $this->nextTryAfter = $this->timeProvider->time(); + $this->logger = $logger ?? new NullLogger(); + } + + /** + * {@inheritDoc} + */ + public function connect(callable $callable): void + { + $currentTime = $this->timeProvider->time(); + + if ($currentTime < $this->nextTryAfter) { + throw new ConnectionFail('Time to reconnect has not yet come'); + } + + try { + $callable(); + } catch (Throwable $e) { + $nextDelay = 0 === $this->delay ? $this->minDelay : $this->delay * 2; + $this->delay = $nextDelay > $this->maxDelay ? $this->maxDelay : $nextDelay; + $this->nextTryAfter = $currentTime + $this->delay; + + $this->logger->info(sprintf('Reconnect #%s after %ss', ++$this->attempt, $this->delay)); + + throw $e; + } + + $this->delay = 0; + $this->attempt = 0; + } +} diff --git a/src/Reconnect/RealTimeProvider.php b/src/Reconnect/RealTimeProvider.php new file mode 100644 index 0000000..c984a47 --- /dev/null +++ b/src/Reconnect/RealTimeProvider.php @@ -0,0 +1,13 @@ +type) { + // @codeCoverageIgnoreStart throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type)); + // @codeCoverageIgnoreEnd } if (self::OK !== $this->buffer->bytes()) { + // @codeCoverageIgnoreStart throw new UnexpectedResponse(sprintf('OK response expected, but "%s" received.', $this->buffer->bytes())); + // @codeCoverageIgnoreEnd } } @@ -49,7 +53,9 @@ final class Response public function toMessage(Consumer $reader): Message { if (self::TYPE_MESSAGE !== $this->type) { + // @codeCoverageIgnoreStart throw new UnexpectedResponse(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type)); + // @codeCoverageIgnoreEnd } $buffer = new ByteBuffer($this->buffer->bytes()); diff --git a/tests/ExponentialStrategyTest.php b/tests/ExponentialStrategyTest.php new file mode 100644 index 0000000..cee0065 --- /dev/null +++ b/tests/ExponentialStrategyTest.php @@ -0,0 +1,90 @@ +connect(static function (): void { + }); + }; + $failConnect = static function (int $time = null) use ($strategy, $timeProvider): void { + $timeProvider($time); + + try { + $strategy->connect(function (): void { + throw new ConnectionFail('Time come but failed'); + }); + } catch (ConnectionFail $e) { + self::assertSame('Time come but failed', $e->getMessage()); + + return; + } + + self::fail('Expecting exception with message "Time come but failed"'); + }; + $timeNotCome = static function (int $time = null) use ($strategy, $timeProvider): void { + $timeProvider($time); + + try { + $strategy->connect(function (): void { + throw new ConnectionFail(''); + }); + } catch (ConnectionFail $e) { + self::assertSame('Time to reconnect has not yet come', $e->getMessage()); + + return; + } + + self::fail('Was expecting exception with message "Time to reconnect has not yet come"'); + }; + + $failConnect(0); + $timeNotCome(7); + $failConnect(8); + $timeNotCome(22); + $timeNotCome(13); + $failConnect(24); + $successConnect(56); + $failConnect(); + $timeNotCome(); + $timeNotCome(63); + $failConnect(64); + + $this->expectException(ConnectionFail::class); + $this->expectExceptionMessage('Time to reconnect has not yet come'); + + $successConnect(); + } +} + +class FakeTimeProvider implements TimeProvider +{ + public int $time = 0; + + public function time(): int + { + return $this->time; + } + + public function __invoke(int $time = null): void + { + $this->time = $time ?? $this->time; + } +} diff --git a/tests/NsqTest.php b/tests/NsqTest.php index 307da74..6c3a086 100644 --- a/tests/NsqTest.php +++ b/tests/NsqTest.php @@ -75,8 +75,8 @@ final class NsqTest extends TestCase $message->touch(); $message->finish(); - self::assertFalse($consumer->isClosed()); + self::assertTrue($consumer->isReady()); $generator->send(Subscriber::STOP); - self::assertTrue($consumer->isClosed()); + self::assertFalse($consumer->isReady()); } }