From be696f17b523000aebc4c1bd5b56e4bb4d0ca9e3 Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Sun, 19 Sep 2021 01:30:30 +0300 Subject: [PATCH] Lookup: reconnections --- src/Connection.php | 9 +++-- src/Consumer.php | 20 ++++++---- src/Lookup.php | 91 +++++++++++++++++++++++++++++++--------------- src/Producer.php | 12 +++--- 4 files changed, 86 insertions(+), 46 deletions(-) diff --git a/src/Connection.php b/src/Connection.php index 745dfdf..3521f6f 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -38,9 +38,12 @@ abstract class Connection private $onCloseCallback; public function __construct( - private string $address, - private ClientConfig $clientConfig, - private LoggerInterface $logger, + /** + * @readonly + */ + public string $address, + protected ClientConfig $clientConfig, + protected LoggerInterface $logger, ) { $this->stream = new NullStream(); $this->onConnectCallback = static function () { diff --git a/src/Consumer.php b/src/Consumer.php index e01e722..06fe752 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -25,17 +25,23 @@ final class Consumer extends Connection private $onMessage; public function __construct( - private string $address, - private string $topic, - private string $channel, + string $address, + /** + * @readonly + */ + public string $topic, + /** + * @readonly + */ + public string $channel, callable $onMessage, - private ClientConfig $clientConfig, - private LoggerInterface $logger, + ClientConfig $clientConfig, + LoggerInterface $logger, ) { parent::__construct( - $this->address, + $address, $clientConfig, - $this->logger, + $logger, ); $this->onMessage = $onMessage; diff --git a/src/Lookup.php b/src/Lookup.php index 04e1a0d..4554983 100644 --- a/src/Lookup.php +++ b/src/Lookup.php @@ -5,6 +5,7 @@ declare(strict_types=1); namespace Nsq; use Amp\Deferred; +use Amp\Dns\DnsException; use Amp\Http\Client\DelegateHttpClient; use Amp\Http\Client\HttpClientBuilder; use Amp\Http\Client\Request; @@ -123,38 +124,30 @@ final class Lookup $producers = $this->producers[$topic] ??= new Deferred(); if ($producers instanceof Deferred) { + /** @var array $producers */ $producers = yield $producers->promise(); } - /** @var \Nsq\Lookup\Producer $producer */ - foreach ($producers as $producer) { - $address = $producer->toTcpUri(); - $consumerKey = $topic.$address; + foreach (array_diff_key($consumers, $producers) as $address => $producer) { + unset($consumers[$address]); + } - if (\array_key_exists($consumerKey, $consumers)) { + foreach ($producers as $address => $producer) { + if (\array_key_exists($address, $consumers)) { continue; } - $promise = ($consumers[$consumerKey] = new Consumer( - $address, - $topic, - $channel, - $onMessage, - $config, - $this->logger, - ))->onClose(function () use ($consumerKey, &$consumers) { - unset($consumers[$consumerKey]); - })->connect(); - - $promise->onResolve(function (?\Throwable $e) use ($consumerKey, &$consumers) { - if (null !== $e) { - $this->logger->error($e->getMessage()); - - unset($consumers[$consumerKey]); - } - }); - - yield $promise; + $this->keepConnection( + new Consumer( + $address, + $topic, + $channel, + $onMessage, + $config, + $this->logger, + ), + $consumers, + ); } yield delay($this->config->pollingInterval); @@ -183,6 +176,45 @@ final class Lookup $this->logger->info('Unsubscribed.', compact('topic', 'channel')); } + private function keepConnection(Consumer $consumer, &$consumers): void + { + $consumers[$consumer->address] = $consumer; + + asyncCall(function () use ($consumer, &$consumers) { + while (\array_key_exists($consumer->address, $consumers)) { + try { + yield $consumer->connect(); + } catch (DnsException $e) { + $this->logger->error($e->getMessage(), ['exception' => $e]); + + unset($consumers[$consumer->address], $this->producers[$consumer->topic][$consumer->address]); + + return; + } catch (\Throwable $e) { + $this->logger->error($e->getMessage(), ['exception' => $e]); + + yield delay($this->config->pollingInterval); + + continue; + } + + while (true) { + if (!\array_key_exists($consumer->address, $consumers)) { + $consumer->close(); + + return; + } + + if (!$consumer->isConnected()) { + break; + } + + yield delay(500); + } + } + }); + } + private function watch(string $topic): void { if (\array_key_exists($topic, $this->topicWatchers)) { @@ -218,15 +250,14 @@ final class Lookup $producers = []; foreach ($responses as $response) { - if (($deferred = ($this->producers[$topic] ?? null)) instanceof Deferred) { - $deferred->resolve($response->producers); - unset($this->producers[$topic]); - } - foreach ($response->producers as $producer) { $producers[$producer->toTcpUri()] = $producer; } } + + if (($deferred = ($this->producers[$topic] ?? null)) instanceof Deferred) { + $deferred->resolve($producers); + } $this->producers[$topic] = $producers; yield delay($this->config->pollingInterval); diff --git a/src/Producer.php b/src/Producer.php index 90a42b2..0f191f8 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -16,14 +16,14 @@ use function Amp\call; final class Producer extends Connection { public function __construct( - private string $address, - private ClientConfig $clientConfig, - private LoggerInterface $logger, + string $address, + ClientConfig $clientConfig, + LoggerInterface $logger, ) { parent::__construct( - $this->address, - $this->clientConfig, - $this->logger, + $address, + $clientConfig, + $logger, ); $context = compact('address');