This commit is contained in:
2021-02-04 00:34:07 +03:00
parent 875cb8b542
commit 15296f4b61
28 changed files with 695 additions and 975 deletions

View File

@@ -4,11 +4,17 @@ declare(strict_types=1);
namespace Nsq;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\ZlibInputStream;
use Amp\ByteStream\ZlibOutputStream;
use Amp\Failure;
use Amp\Promise;
use Amp\Socket\Socket;
use Nsq\Config\ClientConfig;
use Nsq\Config\ConnectionConfig;
use Nsq\Exception\AuthenticationRequired;
use Nsq\Exception\BadResponse;
use Nsq\Exception\ConnectionFail;
use Nsq\Exception\NotConnected;
use Nsq\Exception\NsqError;
use Nsq\Exception\NsqException;
@@ -16,201 +22,230 @@ use Nsq\Protocol\Error;
use Nsq\Protocol\Frame;
use Nsq\Protocol\Message;
use Nsq\Protocol\Response;
use Nsq\Socket\DeflateSocket;
use Nsq\Socket\RawSocket;
use Nsq\Socket\SnappySocket;
use Psr\Log\LoggerAwareTrait;
use Nsq\Stream\NsqInputStream;
use Nsq\Stream\NullStream;
use Nsq\Stream\SnappyInputStream;
use Nsq\Stream\SnappyOutputStream;
use PHPinnacle\Buffer\ByteBuffer;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function Amp\call;
use function Amp\Socket\connect;
/**
* @internal
*/
abstract class Connection
{
use LoggerAwareTrait;
private ?Socket $socket = null;
private InputStream $inputStream;
private OutputStream $outputStream;
private ByteBuffer $buffer;
protected ?ConnectionConfig $connectionConfig = null;
protected ClientConfig $clientConfig;
private NsqSocket $socket;
protected LoggerInterface $logger;
private ConnectionConfig $connectionConfig;
private bool $closed = false;
public function __construct(
final public function __construct(
private string $address,
ClientConfig $clientConfig = null,
LoggerInterface $logger = null,
?LoggerInterface $logger = null,
) {
$this->logger = $logger ?? new NullLogger();
$this->buffer = new ByteBuffer();
$this->inputStream = $this->outputStream = new NullStream();
$this->clientConfig = $clientConfig ?? new ClientConfig();
$this->logger = $logger ?? new NullLogger();
}
$socket = new RawSocket($this->address, $this->logger);
$socket->write(' V2');
/**
* @return Promise<void>
*/
public function connect(): Promise
{
return call(function (): \Generator {
$this->socket = $this->outputStream = yield connect($this->address);
$this->inputStream = new NsqInputStream($this->socket);
$this->socket = new NsqSocket($socket);
yield $this->outputStream->write(' V2');
$this->connectionConfig = ConnectionConfig::fromArray(
$this
->command('IDENTIFY', data: $this->clientConfig->toString())
->readResponse()
->toArray()
);
yield $this->command('IDENTIFY', data: $this->clientConfig->toString());
/** @var Response $response */
$response = yield $this->readResponse();
$this->connectionConfig = ConnectionConfig::fromArray($response->toArray());
if ($this->connectionConfig->snappy) {
$this->socket = new NsqSocket(
new SnappySocket(
$socket,
$this->logger,
),
);
if ($this->connectionConfig->snappy) {
$this->inputStream = new NsqInputStream(
new SnappyInputStream($this->inputStream, $this->logger),
);
$this->outputStream = new SnappyOutputStream($this->outputStream);
$this->checkIsOK();
}
if ($this->connectionConfig->deflate) {
$this->socket = new NsqSocket(
new DeflateSocket(
$socket,
),
);
$this->checkIsOK();
}
if ($this->connectionConfig->authRequired) {
if (null === $this->clientConfig->authSecret) {
throw new AuthenticationRequired();
$this->checkIsOK();
}
$authResponse = $this
->command('AUTH', data: $this->clientConfig->authSecret)
->readResponse()
->toArray()
;
if ($this->connectionConfig->deflate) {
$this->inputStream = new NsqInputStream(
new ZlibInputStream($this->socket, ZLIB_ENCODING_DEFLATE, [
'level' => $this->connectionConfig->deflateLevel,
]),
);
$this->outputStream = new ZlibOutputStream($this->socket, ZLIB_ENCODING_DEFLATE, [
'level' => $this->connectionConfig->deflateLevel,
]);
$this->logger->info('Authorization response: '.http_build_query($authResponse));
}
$this->checkIsOK();
}
if ($this->connectionConfig->authRequired) {
if (null === $this->clientConfig->authSecret) {
yield $this->close();
throw new AuthenticationRequired();
}
yield $this->command('AUTH', data: $this->clientConfig->authSecret);
$response = yield $this->readResponse();
$this->logger->info('Authorization response: '.http_build_query($response->toArray()));
}
});
}
/**
* Cleanly close your connection (no more messages are sent).
*
* @return Promise<void>
*/
public function close(): void
public function close(): Promise
{
if ($this->closed) {
return;
if (null === $this->socket) {
return new Failure(new NotConnected());
}
try {
$this->command('CLS');
$this->socket->close();
} catch (\Throwable $e) {
}
return call(function (): \Generator {
yield $this->command('CLS');
$this->closed = true;
if (null !== $this->socket) {
$this->socket->close();
$this->socket = null;
}
});
}
public function isClosed(): bool
{
return $this->closed;
return null === $this->socket;
}
/**
* @param array<int, int|string>|string $params
*
* @return Promise<void>
*/
protected function command(string $command, array | string $params = [], string $data = null): self
protected function command(string $command, array | string $params = [], string $data = null): Promise
{
if ($this->closed) {
throw new NotConnected('Connection closed.');
if (null === $this->socket) {
return new Failure(new NotConnected());
}
$command = [] === $params
? $command
: implode(' ', [$command, ...((array) $params)]);
$this->logger->info('Command [{command}] with data [{data}]', ['command' => $command, 'data' => $data ?? 'null']);
$buffer = $this->buffer->append($command.PHP_EOL);
$this->socket->write($command, $data);
if (null !== $data) {
$buffer->appendUint32(\strlen($data));
$buffer->append($data);
}
return $this;
$this->logger->debug('Sending: {bytes}', ['bytes' => $buffer->bytes()]);
return $this->outputStream->write($buffer->flush());
}
public function hasMessage(float $timeout): bool
/**
* @return Promise<Frame>
*/
protected function readFrame(): Promise
{
if ($this->closed) {
throw new NotConnected('Connection closed.');
}
return call(function (): \Generator {
$bytes = yield $this->inputStream->read();
try {
return false !== $this->socket->wait($timeout);
} catch (ConnectionFail $e) {
$this->close();
$this->logger->debug('Receiving: {bytes}', ['bytes' => $bytes]);
throw $e;
}
}
protected function readFrame(): Frame
{
if ($this->closed) {
throw new NotConnected('Connection closed.');
}
$buffer = $this->socket->read();
$this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL));
return match ($type = $buffer->consumeType()) {
0 => new Response($buffer->flush()),
1 => new Error($buffer->flush()),
2 => new Message(
timestamp: $buffer->consumeTimestamp(),
attempts: $buffer->consumeAttempts(),
id: $buffer->consumeId(),
body: $buffer->flush(),
consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'),
),
default => throw new NsqException('Unexpected frame type: '.$type)
};
}
protected function checkIsOK(): void
{
$response = $this->readResponse();
if ($response->isHeartBeat()) {
$this->command('NOP');
$this->checkIsOK();
return;
}
if (!$response->isOk()) {
throw new BadResponse($response);
}
$this->logger->info('Ok checked.');
}
private function readResponse(): Response
{
$frame = $this->readFrame();
if ($frame instanceof Response) {
return $frame;
}
if ($frame instanceof Error) {
if ($frame->type->terminateConnection) {
$this->close();
if (null === $bytes) {
throw new NotConnected();
}
throw new NsqError($frame);
}
$buffer = $this->buffer->append($bytes);
throw new NsqException('Unreachable statement.');
return match ($type = $buffer->consumeUint32()) {
0 => new Response($buffer->flush()),
1 => new Error($buffer->flush()),
2 => new Message(
timestamp: $buffer->consumeInt64(),
attempts: $buffer->consumeUint16(),
id: $buffer->consume(Bytes::BYTES_ID),
body: $buffer->flush(),
consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'),
),
default => throw new NsqException('Unexpected frame type: '.$type)
};
});
}
/**
* @return Promise<void>
*/
protected function checkIsOK(): Promise
{
return call(function (): \Generator {
/** @var Response $response */
$response = yield $this->readResponse();
if ($response->isHeartBeat()) {
yield $this->command('NOP');
return $this->checkIsOK();
}
if (!$response->isOk()) {
throw new BadResponse($response);
}
$this->logger->info('Ok checked.');
return call(static function (): void {});
});
}
/**
* @return Promise<Response>
*/
private function readResponse(): Promise
{
return call(function (): \Generator {
$frame = yield $this->readFrame();
if ($frame instanceof Error) {
if ($frame->type->terminateConnection) {
$this->close();
}
throw new NsqError($frame);
}
if (!$frame instanceof Response) {
throw new NsqException('Unreachable statement.');
}
return $frame;
});
}
}