From d9bf2a443730f601f131797f42594521426a8788 Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Mon, 25 Jan 2021 03:19:54 +0300 Subject: [PATCH] Add ClientConfig and ConnectionConfig, Feature Negotiation and Authentication --- README.md | 10 +- composer.json | 1 + src/Config/ClientConfig.php | 115 +++++++++++++++++++++++ src/Config/ConnectionConfig.php | 81 ++++++++++++++++ src/Connection.php | 70 +++++++------- src/Consumer.php | 5 - src/Exception/AuthenticationRequired.php | 11 +++ src/Response.php | 17 ++++ src/Subscriber.php | 23 +---- tests/NsqTest.php | 18 +++- tests/SubscriberTest.php | 45 --------- 11 files changed, 277 insertions(+), 119 deletions(-) create mode 100644 src/Config/ClientConfig.php create mode 100644 src/Config/ConnectionConfig.php create mode 100644 src/Exception/AuthenticationRequired.php delete mode 100644 tests/SubscriberTest.php diff --git a/README.md b/README.md index 79c89ba..50401cd 100644 --- a/README.md +++ b/README.md @@ -30,13 +30,13 @@ Features - [x] PUB - [x] SUB -- [ ] Feature Negotiation +- [X] Feature Negotiation - [ ] Discovery - [ ] Backoff - [ ] TLS - [ ] Snappy - [X] Sampling -- [ ] AUTH +- [X] AUTH Usage ----- @@ -71,7 +71,7 @@ use Nsq\Subscriber; $consumer = new Consumer('tcp://nsqd:4150'); $subscriber = new Subscriber($consumer); -$generator = $subscriber->subscribe('topic', 'channel', timeout: 5); +$generator = $subscriber->subscribe('topic', 'channel'); foreach ($generator as $message) { if ($message instanceof Message) { $payload = $message->body; @@ -89,10 +89,6 @@ foreach ($generator as $message) { // We can also communicate with generator through send // for example: - // Dynamically change timeout - $generator->send(Subscriber::CHANGE_TIMEOUT); - $generator->send(10.0); // float required - // Gracefully close connection (loop will be ended) $generator->send(Subscriber::STOP); } diff --git a/composer.json b/composer.json index a63cd8b..e0a5d61 100644 --- a/composer.json +++ b/composer.json @@ -23,6 +23,7 @@ "ergebnis/composer-normalize": "9999999-dev", "friendsofphp/php-cs-fixer": "^2.18", "infection/infection": "^0.20.2", + "nyholm/nsa": "^1.2", "phpstan/phpstan": "^0.12.68", "phpstan/phpstan-phpunit": "^0.12.17", "phpstan/phpstan-strict-rules": "^0.12.9", diff --git a/src/Config/ClientConfig.php b/src/Config/ClientConfig.php new file mode 100644 index 0000000..df5b9fc --- /dev/null +++ b/src/Config/ClientConfig.php @@ -0,0 +1,115 @@ +hostname) { + $this->hostname = (static fn (mixed $h): string => \is_string($h) ? $h : '')(gethostname()); + } + + if ('' === $this->userAgent) { + $this->userAgent = 'nsqphp/'.InstalledVersions::getPrettyVersion('nsq/nsq'); + } + + if ($this->snappy && $this->deflate) { + throw new InvalidArgumentException('Client cannot enable both [snappy] and [deflate]'); + } + } + + /** + * @phpstan-ignore-next-line + */ + public function jsonSerialize(): array + { + return [ + 'client_id' => $this->clientId, + 'deflate' => $this->deflate, + 'deflate_level' => $this->deflateLevel, + 'feature_negotiation' => $this->featureNegotiation, + 'heartbeat_interval' => $this->heartbeatInterval, + 'hostname' => $this->hostname, + 'msg_timeout' => $this->msgTimeout, + 'sample_rate' => $this->sampleRate, + 'tls_v1' => $this->tls, + 'user_agent' => $this->userAgent, + ]; + } +} diff --git a/src/Config/ConnectionConfig.php b/src/Config/ConnectionConfig.php new file mode 100644 index 0000000..7145b27 --- /dev/null +++ b/src/Config/ConnectionConfig.php @@ -0,0 +1,81 @@ +address = $address; - $this->features = [ - 'client_id' => $clientId ?? '', - 'hostname' => $hostname ?? (static fn (mixed $h): string => \is_string($h) ? $h : '')(gethostname()), - 'user_agent' => $userAgent ?? 'nsqphp/'.InstalledVersions::getPrettyVersion('nsq/nsq'), - 'heartbeat_interval' => $heartbeatInterval, - 'sample_rate' => $sampleRate, - ]; - $this->logger = $logger ?? new NullLogger(); $this->reconnect = $reconnectStrategy ?? new ExponentialStrategy(logger: $this->logger); + $this->clientConfig = $clientConfig ?? new ClientConfig(); } public function connect(): void @@ -87,9 +73,27 @@ abstract class Connection $this->socket->write(' V2'); - $body = json_encode($this->features, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT); + $body = json_encode($this->clientConfig, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT); - $this->command('IDENTIFY', data: $body)->response()->okOrFail(); + $response = $this->command('IDENTIFY', data: $body)->response(); + + if ($this->clientConfig->featureNegotiation) { + $this->connectionConfig = ConnectionConfig::fromArray($response->toArray()); + } + + if ($this->connectionConfig->snappy || $this->connectionConfig->deflate) { + $this->response()->okOrFail(); + } + + if ($this->connectionConfig->authRequired) { + if (null === $this->clientConfig->authSecret) { + throw new AuthenticationRequired('NSQ requires authorization, set ClientConfig::$authSecret before connecting'); + } + + $authResponse = $this->command('AUTH', data: $this->clientConfig->authSecret)->response()->toArray(); + + $this->logger->info('Authorization response: '.http_build_query($authResponse)); + } }); } @@ -120,16 +124,6 @@ abstract class Connection return null !== $this->socket; } - /** - * @psalm-suppress PossiblyFalseOperand - */ - protected function auth(string $secret): string - { - $size = pack('N', \strlen($secret)); - - return 'AUTH'.PHP_EOL.$size.$secret; - } - /** * @param array|string $params */ @@ -176,9 +170,11 @@ abstract class Connection // @codeCoverageIgnoreEnd } - public function receive(float $timeout = 0): ?Response + public function receive(float $timeout = null): ?Response { $socket = $this->socket(); + + $timeout ??= $this->clientConfig->readTimeout; $deadline = microtime(true) + $timeout; if (!$this->hasMessage($timeout)) { @@ -226,7 +222,7 @@ abstract class Connection protected function response(): Response { - return $this->receive(1) ?? throw UnexpectedResponse::null(); + return $this->receive() ?? throw UnexpectedResponse::null(); } private function socket(): Socket diff --git a/src/Consumer.php b/src/Consumer.php index 6859ede..6a67901 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -30,11 +30,6 @@ final class Consumer extends Connection $this->rdy = $count; } - public function consume(float $timeout): ?Message - { - return $this->receive($timeout)?->toMessage($this); - } - /** * Finish a message (indicate successful processing). */ diff --git a/src/Exception/AuthenticationRequired.php b/src/Exception/AuthenticationRequired.php new file mode 100644 index 0000000..0c493a5 --- /dev/null +++ b/src/Exception/AuthenticationRequired.php @@ -0,0 +1,11 @@ +type && self::HEARTBEAT === $this->buffer->bytes(); } + /** + * @phpstan-ignore-next-line + */ + public function toArray(): array + { + if (self::TYPE_RESPONSE !== $this->type) { + // @codeCoverageIgnoreStart + throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type)); + // @codeCoverageIgnoreEnd + } + + return json_decode($this->buffer->bytes(), true, flags: JSON_THROW_ON_ERROR); + } + public function toMessage(Consumer $reader): Message { if (self::TYPE_MESSAGE !== $this->type) { diff --git a/src/Subscriber.php b/src/Subscriber.php index cda1a69..5707484 100644 --- a/src/Subscriber.php +++ b/src/Subscriber.php @@ -5,15 +5,10 @@ declare(strict_types=1); namespace Nsq; use Generator; -use InvalidArgumentException; -use function get_debug_type; -use function sprintf; final class Subscriber { public const STOP = 0; - public const CHANGE_TIMEOUT = 1; - public const TIMEOUT = 2; private Consumer $reader; @@ -25,32 +20,18 @@ final class Subscriber /** * @psalm-return Generator */ - public function subscribe(string $topic, string $channel, float $timeout = 0): Generator + public function subscribe(string $topic, string $channel): Generator { $this->reader->sub($topic, $channel); while (true) { $this->reader->rdy(1); - $command = yield $this->reader->consume($timeout); + $command = yield $this->reader->receive()?->toMessage($this->reader); if (self::STOP === $command) { break; } - - if (self::CHANGE_TIMEOUT === $command) { - $newTimeout = yield null; - - if (!\is_float($newTimeout)) { - throw new InvalidArgumentException(sprintf('Timeout must be float, "%s" given.', get_debug_type($newTimeout))); - } - - $timeout = $newTimeout; - } - - if (self::TIMEOUT === $command) { - yield $timeout; - } } $this->reader->disconnect(); diff --git a/tests/NsqTest.php b/tests/NsqTest.php index 6c3a086..18a38d8 100644 --- a/tests/NsqTest.php +++ b/tests/NsqTest.php @@ -2,10 +2,12 @@ declare(strict_types=1); +use Nsq\Config\ClientConfig; use Nsq\Consumer; use Nsq\Message; use Nsq\Producer; use Nsq\Subscriber; +use Nyholm\NSA; use PHPUnit\Framework\TestCase; final class NsqTest extends TestCase @@ -17,10 +19,13 @@ final class NsqTest extends TestCase $consumer = new Consumer( address: 'tcp://localhost:4150', - heartbeatInterval: 1000, + clientConfig: new ClientConfig( + heartbeatInterval: 1000, + readTimeout: 1, + ), ); $subscriber = new Subscriber($consumer); - $generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__, 1); + $generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__); /** @var null|Message $message */ $message = $generator->current(); @@ -65,8 +70,13 @@ final class NsqTest extends TestCase $message = $generator->current(); self::assertNull($message); - $generator->send(Subscriber::CHANGE_TIMEOUT); - $generator->send(10.0); + NSA::setProperty( + NSA::getProperty($consumer, 'clientConfig'), + 'readTimeout', + 10, + ); + + $generator->next(); /** @var null|Message $message */ $message = $generator->current(); diff --git a/tests/SubscriberTest.php b/tests/SubscriberTest.php deleted file mode 100644 index c413ed1..0000000 --- a/tests/SubscriberTest.php +++ /dev/null @@ -1,45 +0,0 @@ -subscriber = new Subscriber($consumer); - } - - public function testChangeInterval(): void - { - $generator = $this->subscriber->subscribe(__FUNCTION__, __FUNCTION__, 0.1); - - self::assertSame(0.1, $generator->send(Subscriber::TIMEOUT)); - $generator->next(); - - $generator->send(Subscriber::CHANGE_TIMEOUT); - $generator->send(0.2); - - self::assertSame(0.2, $generator->send(Subscriber::TIMEOUT)); - } - - public function testInvalidChangeInterval(): void - { - $this->expectException(InvalidArgumentException::class); - $this->expectExceptionMessage('Timeout must be float, "string" given.'); - - $generator = $this->subscriber->subscribe(__FUNCTION__, __FUNCTION__); - $generator->send(Subscriber::CHANGE_TIMEOUT); - // @phpstan-ignore-next-line - $generator->send('bla'); - } -}