Explode Response to Frames

This commit is contained in:
2021-01-30 17:14:19 +03:00
parent fc6b67cc92
commit f74b82a400
24 changed files with 367 additions and 148 deletions

View File

@ -8,14 +8,15 @@ return (new PhpCsFixer\Config())
'@PhpCsFixer:risky' => true, '@PhpCsFixer:risky' => true,
'@PSR12' => true, '@PSR12' => true,
'@PSR12:risky' => true, '@PSR12:risky' => true,
'declare_strict_types' => true,
'php_unit_internal_class' => false,
'php_unit_test_class_requires_covers' => false,
'yoda_style' => true,
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
'blank_line_before_statement' => [ 'blank_line_before_statement' => [
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try'] 'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try']
], ],
'declare_strict_types' => true,
'php_unit_internal_class' => false,
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
'php_unit_test_class_requires_covers' => false,
'phpdoc_to_comment' => false,
'yoda_style' => true,
]) ])
->setFinder( ->setFinder(
PhpCsFixer\Finder::create() PhpCsFixer\Finder::create()

View File

@ -65,7 +65,7 @@ $producer->dpub('topic', 'Deferred message', delay: 5000);
```php ```php
use Nsq\Consumer; use Nsq\Consumer;
use Nsq\Message; use Nsq\Protocol\Message;
use Nsq\Subscriber; use Nsq\Subscriber;
$consumer = new Consumer('tcp://nsqd:4150'); $consumer = new Consumer('tcp://nsqd:4150');

View File

@ -3,7 +3,7 @@ version: '3.7'
services: services:
nsqd: nsqd:
image: nsqio/nsq:v1.2.0 image: nsqio/nsq:v1.2.0
command: /nsqd command: /nsqd -log-level debug
ports: ports:
- 4150:4150 - 4150:4150
- 4151:4151 - 4151:4151

View File

@ -8,7 +8,14 @@ use Nsq\Config\ClientConfig;
use Nsq\Config\ConnectionConfig; use Nsq\Config\ConnectionConfig;
use Nsq\Exception\AuthenticationRequired; use Nsq\Exception\AuthenticationRequired;
use Nsq\Exception\ConnectionFail; use Nsq\Exception\ConnectionFail;
use Nsq\Exception\UnexpectedResponse; use Nsq\Exception\NsqError;
use Nsq\Exception\BadResponse;
use Nsq\Exception\NsqException;
use Nsq\Exception\NullReceived;
use Nsq\Protocol\Error;
use Nsq\Protocol\Frame;
use Nsq\Protocol\Message;
use Nsq\Protocol\Response;
use Nsq\Reconnect\ExponentialStrategy; use Nsq\Reconnect\ExponentialStrategy;
use Nsq\Reconnect\ReconnectStrategy; use Nsq\Reconnect\ReconnectStrategy;
use PHPinnacle\Buffer\ByteBuffer; use PHPinnacle\Buffer\ByteBuffer;
@ -77,20 +84,27 @@ abstract class Connection
$body = json_encode($this->clientConfig, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT); $body = json_encode($this->clientConfig, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
$response = $this->command('IDENTIFY', data: $body)->response(); $this->connectionConfig = ConnectionConfig::fromArray(
$this
$this->connectionConfig = ConnectionConfig::fromArray($response->toArray()); ->command('IDENTIFY', data: $body)
->readResponse()
->toArray()
);
if ($this->connectionConfig->snappy || $this->connectionConfig->deflate) { if ($this->connectionConfig->snappy || $this->connectionConfig->deflate) {
$this->response()->okOrFail(); $this->checkIsOK();
} }
if ($this->connectionConfig->authRequired) { if ($this->connectionConfig->authRequired) {
if (null === $this->clientConfig->authSecret) { if (null === $this->clientConfig->authSecret) {
throw new AuthenticationRequired('NSQ requires authorization, set ClientConfig::$authSecret before connecting'); throw new AuthenticationRequired();
} }
$authResponse = $this->command('AUTH', data: $this->clientConfig->authSecret)->response()->toArray(); $authResponse = $this
->command('AUTH', data: $this->clientConfig->authSecret)
->readResponse()
->toArray()
;
$this->logger->info('Authorization response: '.http_build_query($authResponse)); $this->logger->info('Authorization response: '.http_build_query($authResponse));
} }
@ -171,7 +185,7 @@ abstract class Connection
// @codeCoverageIgnoreEnd // @codeCoverageIgnoreEnd
} }
public function receive(float $timeout = null): ?Response protected function readFrame(float $timeout = null): ?Frame
{ {
$socket = $this->socket(); $socket = $this->socket();
@ -206,12 +220,23 @@ abstract class Connection
$this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL)); $this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL));
$response = new Response($buffer); $frame = match ($type = $buffer->consumeUint32()) {
0 => new Response($buffer->flush()),
1 => new Error($buffer->flush()),
2 => new Message(
timestamp: $buffer->consumeInt64(),
attempts: $buffer->consumeUint16(),
id: $buffer->consume(Bytes::BYTES_ID),
body: $buffer->flush(),
consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'),
),
default => throw new NsqException('Unexpected frame type: '.$type)
};
if ($response->isHeartBeat()) { if ($frame instanceof Response && $frame->isHeartBeat()) {
$this->command('NOP'); $this->command('NOP');
return $this->receive( return $this->readFrame(
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
); );
} }
@ -224,12 +249,35 @@ abstract class Connection
} }
// @codeCoverageIgnoreEnd // @codeCoverageIgnoreEnd
return $response; return $frame;
} }
protected function response(): Response protected function checkIsOK(): void
{ {
return $this->receive() ?? throw UnexpectedResponse::null(); $response = $this->readResponse();
if (!$response->isOk()) {
throw new BadResponse($response);
}
}
private function readResponse(): Response
{
$frame = $this->readFrame() ?? throw new NullReceived();
if ($frame instanceof Response) {
return $frame;
}
if ($frame instanceof Error) {
if ($frame->type->terminateConnection) {
$this->disconnect();
}
throw new NsqError($frame);
}
throw new NsqException('Unreachable statement.');
} }
private function socket(): Socket private function socket(): Socket

View File

@ -4,6 +4,11 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Nsq\Exception\NsqError;
use Nsq\Exception\NsqException;
use Nsq\Protocol\Error;
use Nsq\Protocol\Message;
final class Consumer extends Connection final class Consumer extends Connection
{ {
private int $rdy = 0; private int $rdy = 0;
@ -13,7 +18,26 @@ final class Consumer extends Connection
*/ */
public function sub(string $topic, string $channel): void public function sub(string $topic, string $channel): void
{ {
$this->command('SUB', [$topic, $channel])->response()->okOrFail(); $this->command('SUB', [$topic, $channel])->checkIsOK();
}
public function readMessage(): ?Message
{
$frame = $this->readFrame();
if ($frame instanceof Message || null === $frame) {
return $frame;
}
if ($frame instanceof Error) {
if ($frame->type->terminateConnection) {
$this->disconnect();
}
throw new NsqError($frame);
}
throw new NsqException('Unreachable statement.');
} }
/** /**

View File

@ -4,8 +4,10 @@ declare(strict_types=1);
namespace Nsq\Exception; namespace Nsq\Exception;
use RuntimeException; final class AuthenticationRequired extends NsqException
final class AuthenticationRequired extends RuntimeException implements NsqException
{ {
public function __construct()
{
parent::__construct('NSQ requires authorization, set ClientConfig::$authSecret before connecting');
}
} }

View File

@ -0,0 +1,15 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Nsq\Protocol\Response;
final class BadResponse extends NsqException
{
public function __construct(Response $response)
{
parent::__construct($response->msg);
}
}

View File

@ -4,10 +4,9 @@ declare(strict_types=1);
namespace Nsq\Exception; namespace Nsq\Exception;
use RuntimeException;
use Throwable; use Throwable;
final class ConnectionFail extends RuntimeException implements NsqException final class ConnectionFail extends NsqException
{ {
/** /**
* @codeCoverageIgnore * @codeCoverageIgnore

View File

@ -4,10 +4,9 @@ declare(strict_types=1);
namespace Nsq\Exception; namespace Nsq\Exception;
use Nsq\Message; use Nsq\Protocol\Message;
use RuntimeException;
final class MessageAlreadyFinished extends RuntimeException implements NsqException final class MessageAlreadyFinished extends NsqException
{ {
public static function finish(Message $message): self public static function finish(Message $message): self
{ {

View File

@ -4,8 +4,12 @@ declare(strict_types=1);
namespace Nsq\Exception; namespace Nsq\Exception;
use RuntimeException; use Nsq\Protocol\Error;
final class NsqError extends RuntimeException implements NsqException final class NsqError extends NsqException
{ {
public function __construct(Error $error)
{
parent::__construct($error->rawData);
}
} }

View File

@ -4,8 +4,8 @@ declare(strict_types=1);
namespace Nsq\Exception; namespace Nsq\Exception;
use Throwable; use RuntimeException;
interface NsqException extends Throwable class NsqException extends RuntimeException
{ {
} }

View File

@ -0,0 +1,9 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
final class NullReceived extends NsqException
{
}

View File

@ -1,18 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use RuntimeException;
final class UnexpectedResponse extends RuntimeException implements NsqException
{
/**
* @codeCoverageIgnore
*/
public static function null(): self
{
return new self('Response was expected, but null received.');
}
}

View File

@ -15,7 +15,7 @@ final class Producer extends Connection
*/ */
public function pub(string $topic, string $body): void public function pub(string $topic, string $body): void
{ {
$this->command('PUB', $topic, $body)->response()->okOrFail(); $this->command('PUB', $topic, $body)->checkIsOK();
} }
/** /**
@ -31,7 +31,7 @@ final class Producer extends Connection
return pack('N', \strlen($body)).$body; return pack('N', \strlen($body)).$body;
}, $bodies)); }, $bodies));
$this->command('MPUB', $topic, $num.$mb)->response()->okOrFail(); $this->command('MPUB', $topic, $num.$mb)->checkIsOK();
} }
/** /**
@ -39,6 +39,6 @@ final class Producer extends Connection
*/ */
public function dpub(string $topic, string $body, int $delay): void public function dpub(string $topic, string $body, int $delay): void
{ {
$this->command('DPUB', [$topic, $delay], $body)->response()->okOrFail(); $this->command('DPUB', [$topic, $delay], $body)->checkIsOK();
} }
} }

23
src/Protocol/Error.php Normal file
View File

@ -0,0 +1,23 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
use Nsq\Bytes;
use function explode;
/**
* @psalm-immutable
*/
final class Error extends Frame
{
public ErrorType $type;
public function __construct(public string $rawData)
{
parent::__construct(\strlen($this->rawData) + Bytes::BYTES_TYPE);
$this->type = new ErrorType(explode(' ', $this->rawData)[0]);
}
}

100
src/Protocol/ErrorType.php Normal file
View File

@ -0,0 +1,100 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
/**
* @psalm-immutable
*/
final class ErrorType
{
/**
* A generic error type without any more hints.
*/
public const E_INVALID = true;
/**
* This error might be returned during multiple occasions. It can be returned for IDENTIFY, AUTH or MPUB messages.
* It is caused for payloads that do not meet certain requirements. For IDENTIFY and AUTH, this is usually a bug in
* the library and should be reported. For MPUB, this error can occur if the payload is larger than the maximum
* payload size specified in the nsqd config.
*/
public const E_BAD_BODY = true;
/**
* This error indicates that the topic sent to nsqd is not valid.
*/
public const E_BAD_TOPIC = true;
/**
* This error indicates that the channel sent to nsqd is not valid.
*/
public const E_BAD_CHANNEL = true;
/**
* This error is returned by nsqd if the message in the payload of a publishing operation does not meet the
* requirements of the server. This might be caused by too big payloads being sent to nsqd. You should consider
* adding a limit to the payload size or increasing it in the nsqd config.
*/
public const E_BAD_MESSAGE = true;
/**
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
* temporary error and can be caused by topics being added, deleted or cleared.
*/
public const E_PUB_FAILED = true;
/**
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
* temporary error and can be caused by topics being added, deleted or cleared.
*/
public const E_MPUB_FAILED = true;
/**
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
* temporary error and can be caused by topics being added, deleted or cleared.
*/
public const E_DPUB_FAILED = true;
/**
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
* happen in particular for messages that are no longer queued on the server side.
*/
public const E_FIN_FAILED = false;
/**
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
* happen in particular for messages that are no longer queued on the server side.
*/
public const E_REQ_FAILED = false;
/**
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
* happen in particular for messages that are no longer queued on the server side.
*/
public const E_TOUCH_FAILED = false;
/**
* This error indicates that the authorization of the client failed on the server side. This might be related
* to connection issues to the authorization server. Depending on the authorization server implementation, this
* might also indicate that the given auth secret in the [ClientConfig] is not known on the server or the server
* denied authentication with the current connection properties (i.e. TLS status and IP).
*/
public const E_AUTH_FAILED = true;
/**
* This error happens if something breaks on the nsqd side while performing the authorization. This might be
* caused by bugs in nsqd, the authorization server or network issues.
*/
public const E_AUTH_ERROR = true;
/**
* This error is sent by nsqd if the client attempts an authentication, but the server does not support it. This
* should never happen using this library as authorization requests are only sent if the server supports it.
* It is safe to expect that this error is never thrown.
*/
public const E_AUTH_DISABLED = true;
/**
* This error indicates that the client related to the authorization secret set in the [ClientConfig] is not
* allowed to do the operation it tried to do.
*/
public const E_UNAUTHORIZED = true;
/**
* A boolean indicating whether or not an [Error] with this type terminates the connection or not.
*/
public bool $terminateConnection;
public function __construct(public string $type)
{
$this->terminateConnection = \constant('self::'.$this->type) ?? self::E_INVALID;
}
}

16
src/Protocol/Frame.php Normal file
View File

@ -0,0 +1,16 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
abstract class Frame
{
public function __construct(
/**
* @psalm-readonly
*/
public int $length,
) {
}
}

View File

@ -2,11 +2,13 @@
declare(strict_types=1); declare(strict_types=1);
namespace Nsq; namespace Nsq\Protocol;
use Nsq\Bytes;
use Nsq\Consumer;
use Nsq\Exception\MessageAlreadyFinished; use Nsq\Exception\MessageAlreadyFinished;
final class Message final class Message extends Frame
{ {
/** /**
* @psalm-readonly * @psalm-readonly
@ -34,6 +36,14 @@ final class Message
public function __construct(int $timestamp, int $attempts, string $id, string $body, Consumer $consumer) public function __construct(int $timestamp, int $attempts, string $id, string $body, Consumer $consumer)
{ {
parent::__construct(
Bytes::BYTES_TYPE
+ Bytes::BYTES_TIMESTAMP
+ Bytes::BYTES_ATTEMPTS
+ Bytes::BYTES_ID
+ \strlen($body)
);
$this->timestamp = $timestamp; $this->timestamp = $timestamp;
$this->attempts = $attempts; $this->attempts = $attempts;
$this->id = $id; $this->id = $id;

41
src/Protocol/Response.php Normal file
View File

@ -0,0 +1,41 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
use Nsq\Bytes;
use function json_decode;
use const JSON_THROW_ON_ERROR;
/**
* @psalm-immutable
*/
final class Response extends Frame
{
public const OK = 'OK';
public const HEARTBEAT = '_heartbeat_';
public function __construct(public string $msg)
{
parent::__construct(\strlen($this->msg) + Bytes::BYTES_TYPE);
}
public function isOk(): bool
{
return self::OK === $this->msg;
}
public function isHeartBeat(): bool
{
return self::HEARTBEAT === $this->msg;
}
/**
* @return array<mixed, mixed>
*/
public function toArray(): array
{
return json_decode($this->msg, true, flags: JSON_THROW_ON_ERROR);
}
}

View File

@ -1,87 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Nsq\Exception\NsqError;
use Nsq\Exception\UnexpectedResponse;
use PHPinnacle\Buffer\ByteBuffer;
use function json_decode;
use function sprintf;
use const JSON_THROW_ON_ERROR;
final class Response
{
private const OK = 'OK';
private const HEARTBEAT = '_heartbeat_';
private const TYPE_RESPONSE = 0;
private const TYPE_ERROR = 1;
private const TYPE_MESSAGE = 2;
private int $type;
private ByteBuffer $buffer;
public function __construct(ByteBuffer $buffer)
{
$this->type = $buffer->consumeUint32();
$this->buffer = $buffer;
}
public function okOrFail(): void
{
if (self::TYPE_ERROR === $this->type) {
throw new NsqError($this->buffer->bytes());
}
if (self::TYPE_RESPONSE !== $this->type) {
// @codeCoverageIgnoreStart
throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type));
// @codeCoverageIgnoreEnd
}
if (self::OK !== $this->buffer->bytes()) {
// @codeCoverageIgnoreStart
throw new UnexpectedResponse(sprintf('OK response expected, but "%s" received.', $this->buffer->bytes()));
// @codeCoverageIgnoreEnd
}
}
public function isHeartBeat(): bool
{
return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes();
}
/**
* @phpstan-ignore-next-line
*/
public function toArray(): array
{
if (self::TYPE_RESPONSE !== $this->type) {
// @codeCoverageIgnoreStart
throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type));
// @codeCoverageIgnoreEnd
}
return json_decode($this->buffer->bytes(), true, flags: JSON_THROW_ON_ERROR);
}
public function toMessage(Consumer $reader): Message
{
if (self::TYPE_MESSAGE !== $this->type) {
// @codeCoverageIgnoreStart
throw new UnexpectedResponse(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type));
// @codeCoverageIgnoreEnd
}
$buffer = new ByteBuffer($this->buffer->bytes());
$timestamp = $buffer->consumeInt64();
$attempts = $buffer->consumeUint16();
$id = $buffer->consume(Bytes::BYTES_ID);
$body = $buffer->flush();
return new Message($timestamp, $attempts, $id, $body, $reader);
}
}

View File

@ -5,6 +5,7 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Generator; use Generator;
use Nsq\Protocol\Message;
final class Subscriber final class Subscriber
{ {
@ -27,7 +28,7 @@ final class Subscriber
while (true) { while (true) {
$this->reader->rdy(1); $this->reader->rdy(1);
$command = yield $this->reader->receive()?->toMessage($this->reader); $command = yield $this->reader->readMessage();
if (self::STOP === $command) { if (self::STOP === $command) {
break; break;

View File

@ -4,7 +4,7 @@ declare(strict_types=1);
use Nsq\Consumer; use Nsq\Consumer;
use Nsq\Exception\MessageAlreadyFinished; use Nsq\Exception\MessageAlreadyFinished;
use Nsq\Message; use Nsq\Protocol\Message;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
final class MessageTest extends TestCase final class MessageTest extends TestCase

View File

@ -4,8 +4,8 @@ declare(strict_types=1);
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Consumer; use Nsq\Consumer;
use Nsq\Message;
use Nsq\Producer; use Nsq\Producer;
use Nsq\Protocol\Message;
use Nsq\Subscriber; use Nsq\Subscriber;
use Nyholm\NSA; use Nyholm\NSA;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
@ -20,7 +20,7 @@ final class NsqTest extends TestCase
$consumer = new Consumer( $consumer = new Consumer(
address: 'tcp://localhost:4150', address: 'tcp://localhost:4150',
clientConfig: new ClientConfig( clientConfig: new ClientConfig(
heartbeatInterval: 1000, heartbeatInterval: 3000,
readTimeout: 1, readTimeout: 1,
), ),
); );

View File

@ -0,0 +1,32 @@
<?php
declare(strict_types=1);
namespace Protocol;
use Generator;
use Nsq\Protocol\ErrorType;
use PHPUnit\Framework\TestCase;
use ReflectionClass;
final class ErrorTypeTest extends TestCase
{
/**
* @dataProvider data
*/
public function testConstructor(string $type, bool $isConnectionTerminated): void
{
$errorType = new ErrorType($type);
self::assertSame($isConnectionTerminated, $errorType->terminateConnection);
}
/**
* @return Generator<string, array<int, bool|string>>
*/
public function data(): Generator
{
foreach ((new ReflectionClass(ErrorType::class))->getConstants() as $constant => $isTerminated) {
yield $constant => [$constant, $isTerminated];
}
}
}