Reconnect
This commit is contained in:
@@ -7,6 +7,8 @@ namespace Nsq;
|
||||
use Composer\InstalledVersions;
|
||||
use Nsq\Exception\ConnectionFail;
|
||||
use Nsq\Exception\UnexpectedResponse;
|
||||
use Nsq\Reconnect\ExponentialStrategy;
|
||||
use Nsq\Reconnect\ReconnectStrategy;
|
||||
use PHPinnacle\Buffer\ByteBuffer;
|
||||
use Psr\Log\LoggerAwareTrait;
|
||||
use Psr\Log\LoggerInterface;
|
||||
@@ -14,7 +16,7 @@ use Psr\Log\NullLogger;
|
||||
use Socket\Raw\Exception;
|
||||
use Socket\Raw\Factory;
|
||||
use Socket\Raw\Socket;
|
||||
use Throwable;
|
||||
use function addcslashes;
|
||||
use function json_encode;
|
||||
use function pack;
|
||||
use const JSON_FORCE_OBJECT;
|
||||
@@ -32,7 +34,7 @@ abstract class Connection
|
||||
|
||||
private ?Socket $socket = null;
|
||||
|
||||
private bool $closed = false;
|
||||
private ReconnectStrategy $reconnect;
|
||||
|
||||
/**
|
||||
* @var array{
|
||||
@@ -51,37 +53,44 @@ abstract class Connection
|
||||
string $userAgent = null,
|
||||
int $heartbeatInterval = null,
|
||||
int $sampleRate = 0,
|
||||
ReconnectStrategy $reconnectStrategy = null,
|
||||
LoggerInterface $logger = null,
|
||||
) {
|
||||
$this->address = $address;
|
||||
|
||||
$this->features = [
|
||||
'client_id' => $clientId ?? '',
|
||||
'hostname' => $hostname ?? (static fn (mixed $host): string => \is_string($host) ? $host : '')(gethostname()),
|
||||
'hostname' => $hostname ?? (static fn (mixed $h): string => \is_string($h) ? $h : '')(gethostname()),
|
||||
'user_agent' => $userAgent ?? 'nsqphp/'.InstalledVersions::getPrettyVersion('nsq/nsq'),
|
||||
'heartbeat_interval' => $heartbeatInterval,
|
||||
'sample_rate' => $sampleRate,
|
||||
];
|
||||
|
||||
$this->logger = $logger ?? new NullLogger();
|
||||
$this->reconnect = $reconnectStrategy ?? new ExponentialStrategy(logger: $this->logger);
|
||||
}
|
||||
|
||||
public function connect(): void
|
||||
{
|
||||
try {
|
||||
$this->socket = (new Factory())->createClient($this->address);
|
||||
} catch (Exception $e) {
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
$this->reconnect->connect(function (): void {
|
||||
try {
|
||||
$this->socket = (new Factory())->createClient($this->address);
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->logger->error('Connecting to {address} failed.', ['address' => $this->address]);
|
||||
|
||||
$this->send(' V2');
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
|
||||
$body = json_encode($this->features, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
||||
$size = pack('N', \strlen($body));
|
||||
$this->send(' V2');
|
||||
|
||||
$this->logger->info('Feature Negotiation: '.http_build_query($this->features));
|
||||
$body = json_encode($this->features, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
||||
$size = pack('N', \strlen($body));
|
||||
|
||||
$this->sendWithResponse('IDENTIFY '.PHP_EOL.$size.$body)->okOrFail();
|
||||
$this->sendWithResponse('IDENTIFY '.PHP_EOL.$size.$body)->okOrFail();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -89,26 +98,26 @@ abstract class Connection
|
||||
*/
|
||||
public function disconnect(): void
|
||||
{
|
||||
if ($this->closed) {
|
||||
if (null === $this->socket) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$this->send('CLS'.PHP_EOL);
|
||||
|
||||
if (null !== $this->socket) {
|
||||
$this->socket->close();
|
||||
}
|
||||
} catch (Throwable $e) {
|
||||
$this->socket->write('CLS'.PHP_EOL);
|
||||
$this->socket->close();
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->logger->debug($e->getMessage(), ['exception' => $e]);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
|
||||
$this->closed = true;
|
||||
$this->socket = null;
|
||||
}
|
||||
|
||||
public function isClosed(): bool
|
||||
public function isReady(): bool
|
||||
{
|
||||
return $this->closed;
|
||||
return null !== $this->socket;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -129,63 +138,95 @@ abstract class Connection
|
||||
|
||||
try {
|
||||
$socket->write($buffer);
|
||||
} catch (Exception $e) {
|
||||
$this->closed = true;
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->disconnect();
|
||||
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function hasMessage(float $timeout = 0): bool
|
||||
{
|
||||
try {
|
||||
return false !== $this->socket()->selectRead($timeout);
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->disconnect();
|
||||
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
}
|
||||
|
||||
public function receive(float $timeout = 0): ?Response
|
||||
{
|
||||
$socket = $this->socket();
|
||||
$deadline = microtime(true) + $timeout;
|
||||
|
||||
if (false === $socket->selectRead($timeout)) {
|
||||
if (!$this->hasMessage($timeout)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$size = (new ByteBuffer($socket->read(Bytes::BYTES_SIZE)))->consumeUint32();
|
||||
$response = new Response(new ByteBuffer($socket->read($size)));
|
||||
try {
|
||||
$size = $socket->read(Bytes::BYTES_SIZE);
|
||||
|
||||
if ($response->isHeartBeat()) {
|
||||
$this->send('NOP'.PHP_EOL);
|
||||
if ('' === $size) {
|
||||
$this->disconnect();
|
||||
|
||||
return $this->receive(
|
||||
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
|
||||
throw new ConnectionFail('Probably connection lost');
|
||||
}
|
||||
|
||||
$buffer = new ByteBuffer(
|
||||
$socket->read(
|
||||
// @phpstan-ignore-next-line
|
||||
unpack('N', $size)[1]
|
||||
)
|
||||
);
|
||||
|
||||
$this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL));
|
||||
|
||||
$response = new Response($buffer);
|
||||
|
||||
if ($response->isHeartBeat()) {
|
||||
$this->send('NOP'.PHP_EOL);
|
||||
|
||||
return $this->receive(
|
||||
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
|
||||
);
|
||||
}
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->disconnect();
|
||||
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
|
||||
return $response;
|
||||
}
|
||||
|
||||
protected function sendWithResponse(string $buffer): Response
|
||||
{
|
||||
$this->send($buffer);
|
||||
|
||||
$response = $this->receive(0.1);
|
||||
|
||||
if (null === $response) {
|
||||
throw new UnexpectedResponse('Response was expected, but null received.');
|
||||
}
|
||||
|
||||
return $response;
|
||||
return $this
|
||||
->send($buffer)
|
||||
->receive(1) ?? throw UnexpectedResponse::null();
|
||||
}
|
||||
|
||||
private function socket(): Socket
|
||||
{
|
||||
if ($this->closed) {
|
||||
throw new ConnectionFail('This connection is closed, create new one.');
|
||||
}
|
||||
|
||||
if (null === $this->socket) {
|
||||
$this->connect();
|
||||
}
|
||||
|
||||
return $this->socket;
|
||||
return $this->socket ?? throw new ConnectionFail('This connection is closed, create new one.');
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user