Simplify discovery producer example

This commit is contained in:
2021-09-19 02:13:11 +03:00
parent 43ab797ee0
commit 381ba88f8d

View File

@@ -12,7 +12,6 @@ use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor; use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig; use Nsq\Config\LookupConfig;
use Nsq\Exception\NsqException;
use Nsq\Lookup; use Nsq\Lookup;
use Nsq\Producer; use Nsq\Producer;
use function Amp\asyncCall; use function Amp\asyncCall;
@@ -39,44 +38,36 @@ Loop::run(static function () {
$isRunning = true; $isRunning = true;
asyncCall(static function () use ($lookup, $clientConfig, $logger, &$producers, &$isRunning) { asyncCall(static function () use ($lookup, $clientConfig, $logger, &$producers, &$isRunning) {
$stopList = [];
while ($isRunning) { while ($isRunning) {
/** @var Lookup\Producer[] $nodes */ /** @var Lookup\Producer[] $nodes */
$nodes = yield $lookup->nodes(); $nodes = yield $lookup->nodes();
foreach ($nodes as $node) { foreach ($nodes as $node) {
$toTcpUri = $node->toTcpUri(); $address = $node->toTcpUri();
if (array_key_exists($toTcpUri, $producers) || array_key_exists($toTcpUri, $stopList)) { if (array_key_exists($address, $producers)) {
continue; continue;
} }
$stopList[$node->broadcastAddress] = true; asyncCall(function () use ($address, $clientConfig, $logger, &$producers) {
yield ($producers[$address] = Producer::create($address, $clientConfig, $logger))
$logger->debug('Found new nsqd: '.$toTcpUri); ->onClose(function () use (&$producers, $address) {
unset($producers[$address]);
asyncCall(function () use ($node, $clientConfig, $logger, &$producers, &$stopList) { })
$producer = new Producer( ->connect()
$node->toTcpUri(), ;
clientConfig: $clientConfig,
logger: $logger,
);
yield $producer->connect();
$producers[$node->toTcpUri()] = $producer;
unset($stopList[$node->toTcpUri()]);
}); });
} }
yield delay(5000); yield delay(5000);
yield Amp\Dns\resolver()->reloadConfig(); yield Amp\Dns\resolver()->reloadConfig(); // for reload /etc/hosts
} }
}); });
Loop::delay(5000, function () use (&$isRunning) { Loop::delay(5000, function () use (&$isRunning, $logger) {
$logger->info('Stopping producer.');
$isRunning = false; $isRunning = false;
}); });
@@ -99,10 +90,12 @@ Loop::run(static function () {
$index = array_rand($producers); $index = array_rand($producers);
$producer = $producers[$index]; $producer = $producers[$index];
try { if (!$producer->isConnected()) {
yield $producer->publish('local', 'This is message of count '.$counter++); yield delay(100);
} catch (NsqException) {
unset($producers[$index]); continue;
} }
yield $producer->publish('local', 'This is message of count '.$counter++);
} }
}); });