From ca2c2ee633f452d5a7da10d26e2ae4d0937eeb65 Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Sat, 4 Sep 2021 01:55:34 +0300 Subject: [PATCH] Lookup (#14) --- .env | 1 + README.md | 23 ++++- composer.json | 1 + docker-compose.yml | 86 +++++++++++++--- src/Config/LookupConfig.php | 13 +++ src/Exception/LookupException.php | 9 ++ src/Lookup.php | 162 ++++++++++++++++++++++++++++++ src/Lookup/Producer.php | 30 ++++++ src/Lookup/Response.php | 34 +++++++ 9 files changed, 344 insertions(+), 15 deletions(-) create mode 100644 .env create mode 100644 src/Config/LookupConfig.php create mode 100644 src/Exception/LookupException.php create mode 100644 src/Lookup.php create mode 100644 src/Lookup/Producer.php create mode 100644 src/Lookup/Response.php diff --git a/.env b/.env new file mode 100644 index 0000000..c489573 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +NSQ_VERSION=1.2.1 diff --git a/README.md b/README.md index cdb151b..3250f76 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ Features - [x] PUB - [x] SUB - [X] Feature Negotiation -- [ ] Discovery +- [X] Discovery - [ ] Backoff - [X] TLS - [ ] Deflate @@ -80,6 +80,27 @@ $consumer = Consumer::create( ); ``` +### Lookup + +```php +use Nsq\Lookup; +use Nsq\Message; + +$lookup = new Lookup('http://nsqlookupd0:4161'); +$lookup = new Lookup(['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161']); + +$callable = static function (Message $message): Generator { + yield $message->touch(); // Reset the timeout for an in-flight message + yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process) + yield $message->finish(); // Finish a message (indicate successful processing) +}; + +$lookup->subscribe(topic: 'topic', channel: 'channel', onMessage: $callable); +$lookup->subscribe(topic: 'anotherTopic', channel: 'channel', onMessage: $callable); + +$lookup->run(); +``` + ### Integrations - [Symfony](https://github.com/nsqphp/NsqBundle) diff --git a/composer.json b/composer.json index eb82fd0..8f8b6db 100644 --- a/composer.json +++ b/composer.json @@ -13,6 +13,7 @@ "require": { "php": "^8.0.1", "ext-json": "*", + "amphp/http-client": "^4.6", "amphp/socket": "^1.1", "composer/semver": "^3.2", "phpinnacle/buffer": "^1.2", diff --git a/docker-compose.yml b/docker-compose.yml index 8bdd629..e856203 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,24 +1,82 @@ version: '3.7' services: - nsqd: - image: nsqio/nsq:v1.2.0 + nsqd0: + image: nsqio/nsq:v${NSQ_VERSION} labels: - ru.grachevko.dhu: 'nsqd' - command: /nsqd -log-level debug -# command: /nsqd - ports: - - 4150:4150 - - 4151:4151 + ru.grachevko.dhu: 'nsqd0' + command: >- + nsqd + --log-level debug + --lookupd-tcp-address nsqlookupd0:4160 + --lookupd-tcp-address nsqlookupd1:4160 + --lookupd-tcp-address nsqlookupd2:4160 + --broadcast-address nsqd0 + + nsqd1: + image: nsqio/nsq:v${NSQ_VERSION} + labels: + ru.grachevko.dhu: 'nsqd1' + command: >- + nsqd + --log-level debug + --lookupd-tcp-address nsqlookupd0:4160 + --lookupd-tcp-address nsqlookupd1:4160 + --lookupd-tcp-address nsqlookupd2:4160 + --broadcast-address nsqd1 + + nsqd2: + image: nsqio/nsq:v${NSQ_VERSION} + labels: + ru.grachevko.dhu: 'nsqd2' + command: >- + nsqd + --log-level debug + --lookupd-tcp-address nsqlookupd0:4160 + --lookupd-tcp-address nsqlookupd1:4160 + --lookupd-tcp-address nsqlookupd2:4160 + --broadcast-address nsqd2 + + nsqlookupd0: + image: nsqio/nsq:v${NSQ_VERSION} + labels: + ru.grachevko.dhu: 'nsqlookupd0' + command: /nsqlookupd -log-level debug + + nsqlookupd1: + image: nsqio/nsq:v${NSQ_VERSION} + labels: + ru.grachevko.dhu: 'nsqlookupd1' + command: /nsqlookupd -log-level debug + + nsqlookupd2: + image: nsqio/nsq:v${NSQ_VERSION} + labels: + ru.grachevko.dhu: 'nsqlookupd2' + command: /nsqlookupd -log-level debug nsqadmin: - image: nsqio/nsq:v1.2.0 + image: nsqio/nsq:v${NSQ_VERSION} labels: ru.grachevko.dhu: 'nsqadmin' - command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171 - ports: - - 4171:4171 + command: + - nsqadmin + - --http-address=0.0.0.0:4171 + - --lookupd-http-address=nsqlookupd0:4161 + - --lookupd-http-address=nsqlookupd1:4161 + - --lookupd-http-address=nsqlookupd2:4161 + depends_on: + - nsqlookupd0 + - nsqlookupd1 + - nsqlookupd2 tail: - image: nsqio/nsq:v1.2.0 - command: nsq_tail -channel nsq_tail -topic local -nsqd-tcp-address nsqd:4150 + image: nsqio/nsq:v${NSQ_VERSION} + command: >- + nsq_tail + --channel nsq_tail + --topic local + --lookupd-http-address nsqlookupd1:4161 + depends_on: + - nsqd1 + - nsqlookupd1 diff --git a/src/Config/LookupConfig.php b/src/Config/LookupConfig.php new file mode 100644 index 0000000..0843138 --- /dev/null +++ b/src/Config/LookupConfig.php @@ -0,0 +1,13 @@ +addresses = (array) $address; + $this->config = $config ?? new LookupConfig(); + $this->logger = $logger ?? new NullLogger(); + } + + public function run(): void + { + if (null !== $this->watcherId) { + return; + } + + $client = HttpClientBuilder::buildDefault(); + $logger = $this->logger; + + $requestHandler = static function (string $uri) use ($client, $logger): \Generator { + /** @var Response $response */ + $response = yield $client->request(new Request($uri)); + + $buffer = yield $response->getBody()->buffer(); + + try { + return Lookup\Response::fromJson($buffer); + } catch (LookupException $e) { + $logger->warning($e->getMessage()); + + return null; + } + }; + + $callback = function () use ($requestHandler): \Generator { + foreach ($this->addresses as $address) { + foreach ($this->subscriptions as $key => $subscription) { + [$topic, $channel] = \explode(':', $key); + + $promise = call($requestHandler, $address.'/lookup?topic='.$topic); + $promise->onResolve( + function (?\Throwable $e, ?Lookup\Response $response) use ( + $key, + $subscription, + $topic, + $channel + ) { + if (null !== $e) { + $this->logger->error($e->getMessage(), ['exception' => $e]); + + return; + } + + if (null === $response) { + return; + } + + foreach ($response->producers as $producer) { + $address = sprintf('%s:%s', $producer->broadcastAddress, $producer->tcpPort); + $consumerKey = $key.$address; + + if (\array_key_exists($consumerKey, $this->consumers)) { + continue; + } + + $this->logger->info('Consumer created.', \compact('address', 'topic', 'channel')); + + yield ($this->consumers[$consumerKey] = new Consumer( + $address, + $topic, + $channel, + $subscription['callable'], + $subscription['config'], + $this->logger, + ))->connect(); + } + }, + ); + + yield $promise; + } + } + }; + + Loop::defer($callback); + $this->watcherId = Loop::repeat($this->config->pollingInterval, $callback); + } + + public function stop(): void + { + if (null === $this->watcherId) { + return; + } + + $this->logger->info('Lookup stopped, cancel watcher.'); + + Loop::cancel($this->watcherId); + $this->watcherId = null; + + foreach ($this->consumers as $key => $consumer) { + $consumer->close(); + + unset($this->consumers[$key]); + } + } + + public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void + { + $key = $topic.':'.$channel; + + if (\array_key_exists($key, $this->subscriptions)) { + throw new \InvalidArgumentException('Subscription already exists.'); + } + + $this->subscriptions[$key] = [ + 'callable' => $onMessage, + 'config' => $config, + ]; + + $this->logger->info('Subscribed', \compact('topic', 'channel')); + } + + public function unsubscribe(string $topic, string $channel): void + { + $key = $topic.':'.$channel; + + unset($this->subscriptions[$key]); + + $this->logger->info('Unsubscribed', \compact('topic', 'channel')); + } +} diff --git a/src/Lookup/Producer.php b/src/Lookup/Producer.php new file mode 100644 index 0000000..688a19a --- /dev/null +++ b/src/Lookup/Producer.php @@ -0,0 +1,30 @@ +