Lookup: graceful shutdown

This commit is contained in:
2021-09-19 01:57:53 +03:00
parent be696f17b5
commit 45048320a8

View File

@ -23,10 +23,12 @@ use function Amp\delay;
final class Lookup
{
/**
* @var array<string, array<string, \Nsq\Lookup\Producer[]>>
* @var array<string, array<string, Lookup\Producer[]>>
*/
private array $producers = [];
private array $consumers = [];
private array $running = [];
private array $topicWatchers = [];
@ -91,11 +93,10 @@ final class Lookup
public function stop(): void
{
foreach ($this->running as $topic => $channels) {
foreach (array_keys($channels) as $channel) {
$this->unsubscribe($topic, $channel);
}
}
$this->producers = [];
$this->consumers = [];
$this->running = [];
$this->topicWatchers = [];
$this->logger->info('Lookup stopped.');
}
@ -108,16 +109,9 @@ final class Lookup
$this->running[$topic][$channel] = true;
/** @var Consumer[] $consumers */
$consumers = [];
asyncCall(function () use ($topic, $channel, $onMessage, $config, &$consumers) {
asyncCall(function () use ($topic, $channel, $onMessage, $config) {
while (true) {
if (null === ($this->running[$topic][$channel] ?? null)) {
foreach ($consumers as $consumer) {
$consumer->close();
}
return;
}
@ -128,12 +122,12 @@ final class Lookup
$producers = yield $producers->promise();
}
foreach (array_diff_key($consumers, $producers) as $address => $producer) {
unset($consumers[$address]);
foreach (array_diff_key($this->consumers, $producers) as $address => $producer) {
unset($this->consumers[$address]);
}
foreach ($producers as $address => $producer) {
if (\array_key_exists($address, $consumers)) {
if (null !== ($this->consumers[$address][$topic][$channel] ?? null)) {
continue;
}
@ -146,7 +140,6 @@ final class Lookup
$config,
$this->logger,
),
$consumers,
);
}
@ -176,18 +169,21 @@ final class Lookup
$this->logger->info('Unsubscribed.', compact('topic', 'channel'));
}
private function keepConnection(Consumer $consumer, &$consumers): void
private function keepConnection(Consumer $consumer): void
{
$consumers[$consumer->address] = $consumer;
$this->consumers[$consumer->address][$consumer->topic][$consumer->channel] = $consumer;
asyncCall(function () use ($consumer, &$consumers) {
while (\array_key_exists($consumer->address, $consumers)) {
asyncCall(function () use ($consumer) {
while (null !== ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
try {
yield $consumer->connect();
} catch (DnsException $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
unset($consumers[$consumer->address], $this->producers[$consumer->topic][$consumer->address]);
unset(
$this->consumers[$consumer->address],
$this->producers[$consumer->topic][$consumer->address],
);
return;
} catch (\Throwable $e) {
@ -199,7 +195,7 @@ final class Lookup
}
while (true) {
if (!\array_key_exists($consumer->address, $consumers)) {
if (null === ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
$consumer->close();
return;