From 381ba88f8da46c5efcc0abe388e9eefc843f32c0 Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Sun, 19 Sep 2021 02:13:11 +0300 Subject: [PATCH] Simplify discovery producer example --- examples/discovery/producer.php | 45 ++++++++++++++------------------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/examples/discovery/producer.php b/examples/discovery/producer.php index fd6cb23..feb01e5 100644 --- a/examples/discovery/producer.php +++ b/examples/discovery/producer.php @@ -12,7 +12,6 @@ use Monolog\Logger; use Monolog\Processor\PsrLogMessageProcessor; use Nsq\Config\ClientConfig; use Nsq\Config\LookupConfig; -use Nsq\Exception\NsqException; use Nsq\Lookup; use Nsq\Producer; use function Amp\asyncCall; @@ -39,44 +38,36 @@ Loop::run(static function () { $isRunning = true; asyncCall(static function () use ($lookup, $clientConfig, $logger, &$producers, &$isRunning) { - $stopList = []; - while ($isRunning) { /** @var Lookup\Producer[] $nodes */ $nodes = yield $lookup->nodes(); foreach ($nodes as $node) { - $toTcpUri = $node->toTcpUri(); + $address = $node->toTcpUri(); - if (array_key_exists($toTcpUri, $producers) || array_key_exists($toTcpUri, $stopList)) { + if (array_key_exists($address, $producers)) { continue; } - $stopList[$node->broadcastAddress] = true; - - $logger->debug('Found new nsqd: '.$toTcpUri); - - asyncCall(function () use ($node, $clientConfig, $logger, &$producers, &$stopList) { - $producer = new Producer( - $node->toTcpUri(), - clientConfig: $clientConfig, - logger: $logger, - ); - - yield $producer->connect(); - - $producers[$node->toTcpUri()] = $producer; - unset($stopList[$node->toTcpUri()]); + asyncCall(function () use ($address, $clientConfig, $logger, &$producers) { + yield ($producers[$address] = Producer::create($address, $clientConfig, $logger)) + ->onClose(function () use (&$producers, $address) { + unset($producers[$address]); + }) + ->connect() + ; }); } yield delay(5000); - yield Amp\Dns\resolver()->reloadConfig(); + yield Amp\Dns\resolver()->reloadConfig(); // for reload /etc/hosts } }); - Loop::delay(5000, function () use (&$isRunning) { + Loop::delay(5000, function () use (&$isRunning, $logger) { + $logger->info('Stopping producer.'); + $isRunning = false; }); @@ -99,10 +90,12 @@ Loop::run(static function () { $index = array_rand($producers); $producer = $producers[$index]; - try { - yield $producer->publish('local', 'This is message of count '.$counter++); - } catch (NsqException) { - unset($producers[$index]); + if (!$producer->isConnected()) { + yield delay(100); + + continue; } + + yield $producer->publish('local', 'This is message of count '.$counter++); } });