Prevent send RDY every iteration

This commit is contained in:
2021-01-23 02:27:21 +03:00
parent bc276c7c57
commit 6e37bbd1ae
2 changed files with 33 additions and 22 deletions

View File

@ -6,6 +6,8 @@ namespace Nsq;
class Consumer extends Connection class Consumer extends Connection
{ {
private int $rdy = 0;
/** /**
* Subscribe to a topic/channel. * Subscribe to a topic/channel.
*/ */
@ -21,7 +23,33 @@ class Consumer extends Connection
*/ */
public function rdy(int $count): void public function rdy(int $count): void
{ {
if ($this->rdy === $count) {
return;
}
$this->send('RDY '.$count.PHP_EOL); $this->send('RDY '.$count.PHP_EOL);
$this->rdy = $count;
}
public function consume(float $timeout): ?Message
{
$deadline = microtime(true) + $timeout;
$response = $this->receive($timeout);
if (null === $response) {
return null;
}
if ($response->isHeartBeat()) {
$this->nop();
return $this->consume(
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
);
}
return $response->toMessage($this);
} }
/** /**
@ -30,6 +58,8 @@ class Consumer extends Connection
public function fin(string $id): void public function fin(string $id): void
{ {
$this->send('FIN '.$id.PHP_EOL); $this->send('FIN '.$id.PHP_EOL);
--$this->rdy;
} }
/** /**
@ -41,6 +71,8 @@ class Consumer extends Connection
public function req(string $id, int $timeout): void public function req(string $id, int $timeout): void
{ {
$this->send(sprintf('REQ %s %s', $id, $timeout).PHP_EOL); $this->send(sprintf('REQ %s %s', $id, $timeout).PHP_EOL);
--$this->rdy;
} }
/** /**

View File

@ -6,7 +6,6 @@ namespace Nsq;
use Generator; use Generator;
use function get_debug_type; use function get_debug_type;
use function microtime;
use function sprintf; use function sprintf;
final class Subscriber final class Subscriber
@ -31,7 +30,7 @@ final class Subscriber
while (true) { while (true) {
$this->reader->rdy(1); $this->reader->rdy(1);
$command = yield $this->consume($timeout); $command = yield $this->reader->consume($timeout);
if (self::STOP === $command) { if (self::STOP === $command) {
break; break;
@ -50,24 +49,4 @@ final class Subscriber
$this->reader->disconnect(); $this->reader->disconnect();
} }
private function consume(float $timeout): ?Message
{
$deadline = microtime(true) + $timeout;
$response = $this->reader->receive($timeout);
if (null === $response) {
return null;
}
if ($response->isHeartBeat()) {
$this->reader->nop();
return $this->consume(
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
);
}
return $response->toMessage($this->reader);
}
} }