Wrap Socket

This commit is contained in:
2021-01-30 20:07:28 +03:00
parent e3485416a5
commit 72dca5c73b
10 changed files with 288 additions and 214 deletions

View File

@ -5,7 +5,6 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use PHPinnacle\Buffer\ByteBuffer; use PHPinnacle\Buffer\ByteBuffer;
use const PHP_EOL;
final class Buffer final class Buffer
{ {
@ -23,17 +22,6 @@ final class Buffer
return $this; return $this;
} }
public function appendCommand(string $command): void
{
$this->buffer->append($command.PHP_EOL);
}
public function appendData(string $data): void
{
$this->buffer->appendUint32(\strlen($data));
$this->buffer->append($data);
}
public function consumeSize(): int public function consumeSize(): int
{ {
/** @see Bytes::BYTES_SIZE */ /** @see Bytes::BYTES_SIZE */

View File

@ -8,6 +8,9 @@ use Composer\InstalledVersions;
use InvalidArgumentException; use InvalidArgumentException;
use JsonSerializable; use JsonSerializable;
use function gethostname; use function gethostname;
use function json_encode;
use const JSON_FORCE_OBJECT;
use const JSON_THROW_ON_ERROR;
/** /**
* This class is used for configuring the clients for nsq. Immutable properties must be set when creating the object and * This class is used for configuring the clients for nsq. Immutable properties must be set when creating the object and
@ -136,4 +139,9 @@ final class ClientConfig implements JsonSerializable
'user_agent' => $this->userAgent, 'user_agent' => $this->userAgent,
]; ];
} }
public function toString(): string
{
return json_encode($this, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
}
} }

View File

@ -9,89 +9,54 @@ use Nsq\Config\ConnectionConfig;
use Nsq\Exception\AuthenticationRequired; use Nsq\Exception\AuthenticationRequired;
use Nsq\Exception\BadResponse; use Nsq\Exception\BadResponse;
use Nsq\Exception\ConnectionFail; use Nsq\Exception\ConnectionFail;
use Nsq\Exception\NotConnected;
use Nsq\Exception\NsqError; use Nsq\Exception\NsqError;
use Nsq\Exception\NsqException; use Nsq\Exception\NsqException;
use Nsq\Exception\NullReceived;
use Nsq\Protocol\Error; use Nsq\Protocol\Error;
use Nsq\Protocol\Frame; use Nsq\Protocol\Frame;
use Nsq\Protocol\Message; use Nsq\Protocol\Message;
use Nsq\Protocol\Response; use Nsq\Protocol\Response;
use Nsq\Reconnect\ExponentialStrategy; use Nsq\Socket\RawSocket;
use Nsq\Reconnect\ReconnectStrategy;
use Psr\Log\LoggerAwareTrait; use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger; use Psr\Log\NullLogger;
use Socket\Raw\Exception; use Throwable;
use Socket\Raw\Factory;
use Socket\Raw\Socket;
use function addcslashes; use function addcslashes;
use function http_build_query; use function http_build_query;
use function implode; use function implode;
use function json_encode;
use const JSON_FORCE_OBJECT;
use const JSON_THROW_ON_ERROR;
use const PHP_EOL; use const PHP_EOL;
/** /**
* @internal * @internal
*
* @property ConnectionConfig $connectionConfig
*/ */
abstract class Connection abstract class Connection
{ {
use LoggerAwareTrait; use LoggerAwareTrait;
private string $address; protected ClientConfig $clientConfig;
private Buffer $input; private NsqSocket $socket;
private Buffer $output; private ConnectionConfig $connectionConfig;
private ?Socket $socket = null; private bool $closed = false;
private ReconnectStrategy $reconnect;
private ClientConfig $clientConfig;
private ?ConnectionConfig $connectionConfig = null;
public function __construct( public function __construct(
string $address, private string $address,
ClientConfig $clientConfig = null, ClientConfig $clientConfig = null,
ReconnectStrategy $reconnectStrategy = null,
LoggerInterface $logger = null, LoggerInterface $logger = null,
) { ) {
$this->address = $address;
$this->input = new Buffer();
$this->output = new Buffer();
$this->logger = $logger ?? new NullLogger(); $this->logger = $logger ?? new NullLogger();
$this->reconnect = $reconnectStrategy ?? new ExponentialStrategy(logger: $this->logger);
$this->clientConfig = $clientConfig ?? new ClientConfig(); $this->clientConfig = $clientConfig ?? new ClientConfig();
}
public function connect(): void $socket = new RawSocket($this->address, $this->logger);
{ $socket->write(' V2');
$this->reconnect->connect(function (): void {
try {
$this->socket = (new Factory())->createClient($this->address);
}
// @codeCoverageIgnoreStart
catch (Exception $e) {
$this->logger->error('Connecting to {address} failed.', ['address' => $this->address]);
throw ConnectionFail::fromThrowable($e); $this->socket = new NsqSocket($socket);
}
// @codeCoverageIgnoreEnd
$this->socket->write(' V2');
$body = json_encode($this->clientConfig, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
$this->connectionConfig = ConnectionConfig::fromArray( $this->connectionConfig = ConnectionConfig::fromArray(
$this $this
->command('IDENTIFY', data: $body) ->command('IDENTIFY', data: $this->clientConfig->toString())
->readResponse() ->readResponse()
->toArray() ->toArray()
); );
@ -113,35 +78,29 @@ abstract class Connection
$this->logger->info('Authorization response: '.http_build_query($authResponse)); $this->logger->info('Authorization response: '.http_build_query($authResponse));
} }
});
} }
/** /**
* Cleanly close your connection (no more messages are sent). * Cleanly close your connection (no more messages are sent).
*/ */
public function disconnect(): void public function close(): void
{ {
if (null === $this->socket) { if ($this->closed) {
return; return;
} }
try { try {
$this->socket->write('CLS'.PHP_EOL); $this->command('CLS');
$this->socket->close(); $this->socket->close();
} } catch (Throwable) {
// @codeCoverageIgnoreStart
catch (Exception $e) {
$this->logger->debug($e->getMessage(), ['exception' => $e]);
}
// @codeCoverageIgnoreEnd
$this->socket = null;
$this->connectionConfig = null;
} }
public function isReady(): bool $this->closed = true;
}
public function isClosed(): bool
{ {
return null !== $this->socket; return $this->closed;
} }
/** /**
@ -149,49 +108,45 @@ abstract class Connection
*/ */
protected function command(string $command, array | string $params = [], string $data = null): self protected function command(string $command, array | string $params = [], string $data = null): self
{ {
$this->output->appendCommand( if ($this->closed) {
[] === $params throw new NotConnected('Connection closed.');
? $command
: implode(' ', [$command, ...((array) $params)]),
);
if (null !== $data) {
$this->output->appendData($data);
} }
$this->flush(); $command = [] === $params
? $command
: implode(' ', [$command, ...((array) $params)]);
$this->socket->write($command, $data);
return $this; return $this;
} }
public function hasMessage(float $timeout = 0): bool public function hasMessage(float $timeout): bool
{ {
if ($this->closed) {
throw new NotConnected('Connection closed.');
}
try { try {
return false !== $this->socket()->selectRead($timeout); return false !== $this->socket->wait($timeout);
} } catch (ConnectionFail $e) {
// @codeCoverageIgnoreStart $this->close();
catch (Exception $e) {
$this->disconnect();
throw ConnectionFail::fromThrowable($e); throw $e;
} }
// @codeCoverageIgnoreEnd
} }
protected function readFrame(float $timeout = null): ?Frame protected function readFrame(): Frame
{ {
$timeout ??= $this->clientConfig->readTimeout; if ($this->closed) {
$deadline = microtime(true) + $timeout; throw new NotConnected('Connection closed.');
if (!$this->hasMessage($timeout)) {
return null;
} }
$buffer = $this->read(); $buffer = $this->socket->read();
$this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL)); $this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL));
$frame = match ($type = $buffer->consumeType()) { return match ($type = $buffer->consumeType()) {
0 => new Response($buffer->flush()), 0 => new Response($buffer->flush()),
1 => new Error($buffer->flush()), 1 => new Error($buffer->flush()),
2 => new Message( 2 => new Message(
@ -203,16 +158,6 @@ abstract class Connection
), ),
default => throw new NsqException('Unexpected frame type: '.$type) default => throw new NsqException('Unexpected frame type: '.$type)
}; };
if ($frame instanceof Response && $frame->isHeartBeat()) {
$this->command('NOP');
return $this->readFrame(
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
);
}
return $frame;
} }
protected function checkIsOK(): void protected function checkIsOK(): void
@ -226,7 +171,7 @@ abstract class Connection
private function readResponse(): Response private function readResponse(): Response
{ {
$frame = $this->readFrame() ?? throw new NullReceived(); $frame = $this->readFrame();
if ($frame instanceof Response) { if ($frame instanceof Response) {
return $frame; return $frame;
@ -234,7 +179,7 @@ abstract class Connection
if ($frame instanceof Error) { if ($frame instanceof Error) {
if ($frame->type->terminateConnection) { if ($frame->type->terminateConnection) {
$this->disconnect(); $this->close();
} }
throw new NsqError($frame); throw new NsqError($frame);
@ -242,69 +187,4 @@ abstract class Connection
throw new NsqException('Unreachable statement.'); throw new NsqException('Unreachable statement.');
} }
private function read(): Buffer
{
try {
$socket = $this->socket();
$buffer = $this->input->append(
$socket->read(Bytes::BYTES_SIZE),
);
if ('' === $buffer->bytes()) {
$this->disconnect();
throw new ConnectionFail('Probably connection lost');
}
$size = $buffer->consumeSize();
do {
$buffer->append(
$socket->read($size),
);
} while ($buffer->size() < $size);
return $buffer;
}
// @codeCoverageIgnoreStart
catch (Exception $e) {
$this->disconnect();
throw ConnectionFail::fromThrowable($e);
}
// @codeCoverageIgnoreEnd
}
private function flush(): void
{
$buffer = $this->output->flush();
$this->logger->debug('Send buffer: '.addcslashes($buffer, PHP_EOL));
try {
$this->socket()->write(
$buffer,
);
}
// @codeCoverageIgnoreStart
catch (Exception $e) {
$this->disconnect();
$this->logger->error($e->getMessage(), ['exception' => $e]);
throw ConnectionFail::fromThrowable($e);
}
// @codeCoverageIgnoreEnd
}
private function socket(): Socket
{
if (null === $this->socket) {
$this->connect();
}
return $this->socket ?? throw new ConnectionFail('This connection is closed, create new one.');
}
} }

View File

@ -10,8 +10,9 @@ use Nsq\Exception\NsqError;
use Nsq\Exception\NsqException; use Nsq\Exception\NsqException;
use Nsq\Protocol\Error; use Nsq\Protocol\Error;
use Nsq\Protocol\Message; use Nsq\Protocol\Message;
use Nsq\Reconnect\ReconnectStrategy; use Nsq\Protocol\Response;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use function microtime;
final class Consumer extends Connection final class Consumer extends Connection
{ {
@ -22,17 +23,9 @@ final class Consumer extends Connection
private string $channel, private string $channel,
string $address, string $address,
ClientConfig $clientConfig = null, ClientConfig $clientConfig = null,
ReconnectStrategy $reconnectStrategy = null,
LoggerInterface $logger = null LoggerInterface $logger = null
) { ) {
parent::__construct($address, $clientConfig, $reconnectStrategy, $logger); parent::__construct($address, $clientConfig, $logger);
}
public function connect(): void
{
parent::connect();
$this->command('SUB', [$this->topic, $this->channel])->checkIsOK();
} }
/** /**
@ -40,30 +33,48 @@ final class Consumer extends Connection
*/ */
public function generator(): Generator public function generator(): Generator
{ {
$this->command('SUB', [$this->topic, $this->channel])->checkIsOK();
while (true) { while (true) {
$this->rdy(1); $this->rdy(1);
$command = yield $this->readMessage(); $timeout = $this->clientConfig->readTimeout;
do {
$deadline = microtime(true) + $timeout;
$message = $this->hasMessage($timeout) ? $this->readMessage() : null;
$timeout = ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime;
} while (0 < $timeout && null === $message);
$command = yield $message;
if (0 === $command) { if (0 === $command) {
break; break;
} }
} }
$this->disconnect(); $this->close();
} }
public function readMessage(): ?Message public function readMessage(): ?Message
{ {
$frame = $this->readFrame(); $frame = $this->readFrame();
if ($frame instanceof Message || null === $frame) { if ($frame instanceof Message) {
return $frame; return $frame;
} }
if ($frame instanceof Response && $frame->isHeartBeat()) {
$this->command('NOP');
return null;
}
if ($frame instanceof Error) { if ($frame instanceof Error) {
if ($frame->type->terminateConnection) { if ($frame->type->terminateConnection) {
$this->disconnect(); $this->close();
} }
throw new NsqError($frame); throw new NsqError($frame);

View File

@ -0,0 +1,9 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
final class NotConnected extends NsqException
{
}

73
src/NsqSocket.php Normal file
View File

@ -0,0 +1,73 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Nsq\Exception\ConnectionFail;
use Nsq\Socket\Socket;
use PHPinnacle\Buffer\ByteBuffer;
use Throwable;
use const PHP_EOL;
final class NsqSocket
{
private Buffer $input;
private ByteBuffer $output;
public function __construct(
private Socket $socket,
) {
$this->input = new Buffer();
$this->output = new ByteBuffer();
}
public function write(string $command, string $data = null): void
{
$this->output->append($command.PHP_EOL);
if (null !== $data) {
$this->output->appendUint32(\strlen($data));
$this->output->append($data);
}
$this->socket->write($this->output->flush());
}
public function wait(float $timeout): bool
{
return $this->socket->selectRead($timeout);
}
public function read(): Buffer
{
$buffer = $this->input;
$buffer->append(
$this->socket->read(Bytes::BYTES_SIZE),
);
if ('' === $buffer->bytes()) {
throw new ConnectionFail('Probably connection closed.');
}
$size = $buffer->consumeSize();
do {
$buffer->append(
$this->socket->read($size - $buffer->size()),
);
} while ($buffer->size() < $size);
return $buffer;
}
public function close(): void
{
try {
$this->socket->close();
} catch (Throwable) {
}
}
}

View File

@ -8,11 +8,11 @@ use function array_map;
use function implode; use function implode;
use function pack; use function pack;
/**
* @psalm-suppress PropertyNotSetInConstructor
*/
final class Producer extends Connection final class Producer extends Connection
{ {
/**
* @psalm-suppress PossiblyFalseOperand
*/
public function pub(string $topic, string $body): void public function pub(string $topic, string $body): void
{ {
$this->command('PUB', $topic, $body)->checkIsOK(); $this->command('PUB', $topic, $body)->checkIsOK();
@ -34,9 +34,6 @@ final class Producer extends Connection
$this->command('MPUB', $topic, $num.$mb)->checkIsOK(); $this->command('MPUB', $topic, $num.$mb)->checkIsOK();
} }
/**
* @psalm-suppress PossiblyFalseOperand
*/
public function dpub(string $topic, string $body, int $delay): void public function dpub(string $topic, string $body, int $delay): void
{ {
$this->command('DPUB', [$topic, $delay], $body)->checkIsOK(); $this->command('DPUB', [$topic, $delay], $body)->checkIsOK();

81
src/Socket/RawSocket.php Normal file
View File

@ -0,0 +1,81 @@
<?php
declare(strict_types=1);
namespace Nsq\Socket;
use Nsq\Exception\ConnectionFail;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Socket\Raw\Exception;
use Socket\Raw\Factory;
use Socket\Raw\Socket as ClueSocket;
use Throwable;
final class RawSocket implements Socket
{
private ClueSocket $socket;
private LoggerInterface $logger;
public function __construct(string $address, LoggerInterface $logger = null)
{
$this->socket = (new Factory())->createClient($address);
$this->logger = $logger ?? new NullLogger();
}
/**
* {@inheritDoc}
*/
public function selectRead(float $timeout): bool
{
try {
return false !== $this->socket->selectRead($timeout);
} // @codeCoverageIgnoreStart
catch (Exception $e) {
throw ConnectionFail::fromThrowable($e);
}
// @codeCoverageIgnoreEnd
}
/**
* {@inheritDoc}
*/
public function close(): void
{
try {
$this->socket->close();
} catch (Throwable) {
}
}
/**
* {@inheritDoc}
*/
public function write(string $data): void
{
try {
$this->socket->write($data);
} // @codeCoverageIgnoreStart
catch (Exception $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
throw ConnectionFail::fromThrowable($e);
}
// @codeCoverageIgnoreEnd
}
/**
* {@inheritDoc}
*/
public function read(int $length): string
{
try {
return $this->socket->read($length);
} // @codeCoverageIgnoreStart
catch (Exception $e) {
throw ConnectionFail::fromThrowable($e);
}
// @codeCoverageIgnoreEnd
}
}

27
src/Socket/Socket.php Normal file
View File

@ -0,0 +1,27 @@
<?php
declare(strict_types=1);
namespace Nsq\Socket;
use Nsq\Exception\ConnectionFail;
interface Socket
{
public function close(): void;
/**
* @throws ConnectionFail
*/
public function selectRead(float $timeout): bool;
/**
* @throws ConnectionFail
*/
public function write(string $data): void;
/**
* @throws ConnectionFail
*/
public function read(int $length): string;
}

View File

@ -85,8 +85,8 @@ final class NsqTest extends TestCase
$message->touch(); $message->touch();
$message->finish(); $message->finish();
self::assertTrue($consumer->isReady()); self::assertFalse($consumer->isClosed());
$generator->send(0); $generator->send(0);
self::assertFalse($consumer->isReady()); self::assertTrue($consumer->isClosed());
} }
} }