Heartbeat can receive Producer too
This commit is contained in:
@ -134,14 +134,24 @@ abstract class Connection
|
|||||||
public function receive(float $timeout = 0): ?Response
|
public function receive(float $timeout = 0): ?Response
|
||||||
{
|
{
|
||||||
$socket = $this->socket();
|
$socket = $this->socket();
|
||||||
|
$deadline = microtime(true) + $timeout;
|
||||||
|
|
||||||
if (false === $socket->selectRead($timeout)) {
|
if (false === $socket->selectRead($timeout)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
$size = (new ByteBuffer($socket->read(Bytes::BYTES_SIZE)))->consumeUint32();
|
$size = (new ByteBuffer($socket->read(Bytes::BYTES_SIZE)))->consumeUint32();
|
||||||
|
$response = new Response(new ByteBuffer($socket->read($size)));
|
||||||
|
|
||||||
return new Response(new ByteBuffer($socket->read($size)));
|
if ($response->isHeartBeat()) {
|
||||||
|
$this->send('NOP'.PHP_EOL);
|
||||||
|
|
||||||
|
return $this->receive(
|
||||||
|
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $response;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function sendWithResponse(string $buffer): Response
|
protected function sendWithResponse(string $buffer): Response
|
||||||
|
@ -34,22 +34,7 @@ final class Consumer extends Connection
|
|||||||
|
|
||||||
public function consume(float $timeout): ?Message
|
public function consume(float $timeout): ?Message
|
||||||
{
|
{
|
||||||
$deadline = microtime(true) + $timeout;
|
return $this->receive($timeout)?->toMessage($this);
|
||||||
|
|
||||||
$response = $this->receive($timeout);
|
|
||||||
if (null === $response) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($response->isHeartBeat()) {
|
|
||||||
$this->nop();
|
|
||||||
|
|
||||||
return $this->consume(
|
|
||||||
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
return $response->toMessage($this);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -82,9 +67,4 @@ final class Consumer extends Connection
|
|||||||
{
|
{
|
||||||
$this->send('TOUCH '.$id.PHP_EOL);
|
$this->send('TOUCH '.$id.PHP_EOL);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function nop(): void
|
|
||||||
{
|
|
||||||
$this->send('NOP'.PHP_EOL);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user