Refactor Lookup

This commit is contained in:
2021-09-15 01:26:47 +03:00
parent 5bab748952
commit 47194b30f3

View File

@@ -4,159 +4,219 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Amp\Deferred;
use Amp\Http\Client\DelegateHttpClient;
use Amp\Http\Client\HttpClientBuilder; use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request; use Amp\Http\Client\Request;
use Amp\Http\Client\Response; use Amp\Http\Client\Response;
use Amp\Loop; use Amp\Promise;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig; use Nsq\Config\LookupConfig;
use Nsq\Exception\LookupException; use Nsq\Exception\LookupException;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger; use Psr\Log\NullLogger;
use function Amp\asyncCall;
use function Amp\call; use function Amp\call;
use function Amp\delay;
final class Lookup final class Lookup
{ {
private array $addresses; /**
* @var array<string, array<string, \Nsq\Lookup\Producer[]>>
*/
private array $producers = [];
private array $subscriptions = []; private array $running = [];
private array $consumers = [];
private LookupConfig $config;
private LoggerInterface $logger;
private ?string $watcherId = null;
public function __construct( public function __construct(
private array $addresses,
private LookupConfig $config,
private LoggerInterface $logger,
private DelegateHttpClient $httpClient,
) {
}
public static function create(
string | array $address, string | array $address,
LookupConfig $config = null, LookupConfig $config = null,
LoggerInterface $logger = null, LoggerInterface $logger = null,
) { DelegateHttpClient $httpClient = null,
$this->addresses = (array) $address; ): self {
$this->config = $config ?? new LookupConfig(); return new self(
$this->logger = $logger ?? new NullLogger(); (array) $address,
$config ?? new LookupConfig(),
$logger ?? new NullLogger(),
$httpClient ?? HttpClientBuilder::buildDefault(),
);
} }
public function run(): void /**
* @return Promise<Lookup\Producer>
*/
public function nodes(): Promise
{ {
if (null !== $this->watcherId) { return call(function () {
return; $requestHandler = function (string $uri): \Generator {
} /** @var Response $response */
$response = yield $this->httpClient->request(new Request($uri.'/nodes'));
$client = HttpClientBuilder::buildDefault(); try {
$logger = $this->logger; return Lookup\Response::fromJson(yield $response->getBody()->buffer());
} catch (LookupException $e) {
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
$requestHandler = static function (string $uri) use ($client, $logger): \Generator { return null;
/** @var Response $response */ }
$response = yield $client->request(new Request($uri)); };
$buffer = yield $response->getBody()->buffer(); $promises = [];
try {
return Lookup\Response::fromJson($buffer);
} catch (LookupException $e) {
$logger->log($e->level(), $uri.' '.$e->getMessage());
return null;
}
};
$callback = function () use ($requestHandler): \Generator {
foreach ($this->addresses as $address) { foreach ($this->addresses as $address) {
foreach ($this->subscriptions as $key => $subscription) { $promises[$address] = call($requestHandler, $address);
[$topic, $channel] = \explode(':', $key); }
$promise = call($requestHandler, $address.'/lookup?topic='.$topic); $nodes = [];
$promise->onResolve( /** @var Lookup\Response $response */
function (?\Throwable $e, ?Lookup\Response $response) use ( foreach (yield $promises as $response) {
$key, foreach ($response->producers as $producer) {
$subscription, $nodes[$producer->toTcpUri()] = $producer;
$topic,
$channel
) {
if (null !== $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
return;
}
if (null === $response) {
return;
}
foreach ($response->producers as $producer) {
$address = sprintf('%s:%s', $producer->broadcastAddress, $producer->tcpPort);
$consumerKey = $key.$address;
if (\array_key_exists($consumerKey, $this->consumers)) {
continue;
}
$this->logger->info('Consumer created.', \compact('address', 'topic', 'channel'));
yield ($this->consumers[$consumerKey] = new Consumer(
$address,
$topic,
$channel,
$subscription['callable'],
$subscription['config'],
$this->logger,
))->connect();
}
},
);
yield $promise;
} }
} }
};
Loop::defer($callback); return array_values($nodes);
$this->watcherId = Loop::repeat($this->config->pollingInterval, $callback); });
} }
public function stop(): void public function stop(): void
{ {
if (null === $this->watcherId) { foreach ($this->running as $topic => $channels) {
return; foreach ($channels as $channel) {
$this->unsubscribe($topic, $channel);
}
} }
$this->logger->info('Lookup stopped, cancel watcher.'); $this->logger->info('Lookup stopped.');
Loop::cancel($this->watcherId);
$this->watcherId = null;
foreach ($this->consumers as $key => $consumer) {
$consumer->close();
unset($this->consumers[$key]);
}
} }
public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void
{ {
$key = $topic.':'.$channel; if (null !== ($this->running[$topic][$channel] ?? null)) {
if (\array_key_exists($key, $this->subscriptions)) {
throw new \InvalidArgumentException('Subscription already exists.'); throw new \InvalidArgumentException('Subscription already exists.');
} }
$this->subscriptions[$key] = [ $this->running[$topic][$channel] = true;
'callable' => $onMessage,
'config' => $config,
];
$this->logger->info('Subscribed', \compact('topic', 'channel')); /** @var Consumer[] $consumers */
$consumers = [];
asyncCall(function () use ($topic, $channel, $onMessage, $config, &$consumers) {
while (true) {
if (null === ($this->running[$topic][$channel] ?? null)) {
foreach ($consumers as $consumer) {
$consumer->close();
}
return;
}
$producers = $this->producers[$topic] ??= new Deferred();
if ($producers instanceof Deferred) {
$producers = yield $producers->promise();
}
/** @var \Nsq\Lookup\Producer $producer */
foreach ($producers as $producer) {
$address = $producer->toTcpUri();
$consumerKey = $topic.$address;
if (\array_key_exists($consumerKey, $consumers)) {
continue;
}
$promise = ($consumers[$consumerKey] = new Consumer(
$address,
$topic,
$channel,
$onMessage,
$config,
$this->logger,
))->onClose(function () use ($consumerKey, &$consumers) {
unset($consumers[$consumerKey]);
})->connect();
$this->logger->debug('Consumer created.', compact('address', 'topic', 'channel'));
$promise->onResolve(function (?\Throwable $e) use ($consumerKey, &$consumers) {
if (null !== $e) {
$this->logger->error($e->getMessage());
unset($consumers[$consumerKey]);
}
});
yield $promise;
}
yield delay($this->config->pollingInterval);
}
});
$this->watch($topic);
$this->logger->info('Subscribed', compact('topic', 'channel'));
} }
public function unsubscribe(string $topic, string $channel): void public function unsubscribe(string $topic, string $channel): void
{ {
$key = $topic.':'.$channel; if (null === ($this->running[$topic][$channel] ?? null)) {
$this->logger->debug('Trying unsubscribe from non subscribed', compact('topic', 'channel'));
unset($this->subscriptions[$key]); return;
}
$this->logger->info('Unsubscribed', \compact('topic', 'channel')); unset($this->running[$topic][$channel]);
$this->logger->info('Unsubscribed', compact('topic', 'channel'));
}
private function watch(string $topic): void
{
asyncCall(function () use ($topic) {
$requestHandler = function (string $uri) use ($topic): \Generator {
/** @var Response $response */
$response = yield $this->httpClient->request(new Request($uri.'/lookup?topic='.$topic));
try {
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
} catch (LookupException $e) {
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
return null;
}
};
while (\array_key_exists($topic, $this->running)) {
$promises = [];
foreach ($this->addresses as $address) {
$promises[$address] = call($requestHandler, $address);
}
/** @var Lookup\Response[] $responses */
$responses = yield $promises;
foreach ($responses as $response) {
if (($deferred = ($this->producers[$topic] ?? null)) instanceof Deferred) {
$deferred->resolve($response->producers);
unset($this->producers[$topic]);
}
foreach ($response->producers as $producer) {
$this->producers[$topic][$producer->toTcpUri()] = $producer;
}
}
yield delay($this->config->pollingInterval);
}
});
} }
} }