handle heartbeat after reading frame
This commit is contained in:
@ -189,7 +189,7 @@ abstract class Connection
|
||||
|
||||
$buffer = $this->buffer->append($bytes);
|
||||
|
||||
return match ($type = $buffer->consumeUint32()) {
|
||||
$frame = match ($type = $buffer->consumeUint32()) {
|
||||
0 => new Response($buffer->flush()),
|
||||
1 => new Error($buffer->flush()),
|
||||
2 => new Message(
|
||||
@ -201,6 +201,14 @@ abstract class Connection
|
||||
),
|
||||
default => throw new NsqException('Unexpected frame type: '.$type)
|
||||
};
|
||||
|
||||
if ($frame instanceof Response && $frame->isHeartBeat()) {
|
||||
yield $this->command('NOP');
|
||||
|
||||
return $this->readFrame();
|
||||
}
|
||||
|
||||
return $frame;
|
||||
});
|
||||
}
|
||||
|
||||
@ -213,17 +221,11 @@ abstract class Connection
|
||||
/** @var Response $response */
|
||||
$response = yield $this->readResponse();
|
||||
|
||||
if ($response->isHeartBeat()) {
|
||||
yield $this->command('NOP');
|
||||
|
||||
return $this->checkIsOK();
|
||||
}
|
||||
|
||||
if (!$response->isOk()) {
|
||||
throw new BadResponse($response);
|
||||
}
|
||||
|
||||
$this->logger->info('Ok checked.');
|
||||
$this->logger->debug('Ok checked.');
|
||||
|
||||
return call(static function (): void {});
|
||||
});
|
||||
|
@ -63,21 +63,15 @@ final class Consumer extends Connection
|
||||
return $frame;
|
||||
}
|
||||
|
||||
if ($frame instanceof Response && $frame->isHeartBeat()) {
|
||||
yield $this->command('NOP');
|
||||
|
||||
return $this->readMessage();
|
||||
}
|
||||
|
||||
if ($frame instanceof Error) {
|
||||
if ($frame->type->terminateConnection) {
|
||||
yield $this->close();
|
||||
}
|
||||
|
||||
return new Failure(new NsqError($frame));
|
||||
throw new NsqError($frame);
|
||||
}
|
||||
|
||||
return new Failure(new NsqException('Unreachable statement.'));
|
||||
throw new NsqException('Unreachable statement.');
|
||||
});
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user