Unset previous producers on lookup newer
This commit is contained in:
@@ -206,6 +206,7 @@ final class Lookup
|
|||||||
/** @var Lookup\Response[] $responses */
|
/** @var Lookup\Response[] $responses */
|
||||||
$responses = yield $promises;
|
$responses = yield $promises;
|
||||||
|
|
||||||
|
$producers = [];
|
||||||
foreach ($responses as $response) {
|
foreach ($responses as $response) {
|
||||||
if (($deferred = ($this->producers[$topic] ?? null)) instanceof Deferred) {
|
if (($deferred = ($this->producers[$topic] ?? null)) instanceof Deferred) {
|
||||||
$deferred->resolve($response->producers);
|
$deferred->resolve($response->producers);
|
||||||
@@ -213,9 +214,10 @@ final class Lookup
|
|||||||
}
|
}
|
||||||
|
|
||||||
foreach ($response->producers as $producer) {
|
foreach ($response->producers as $producer) {
|
||||||
$this->producers[$topic][$producer->toTcpUri()] = $producer;
|
$producers[$producer->toTcpUri()] = $producer;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
$this->producers[$topic] = $producers;
|
||||||
|
|
||||||
yield delay($this->config->pollingInterval);
|
yield delay($this->config->pollingInterval);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user