Add Response object
This commit is contained in:
14
src/Bytes.php
Normal file
14
src/Bytes.php
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq;
|
||||||
|
|
||||||
|
final class Bytes
|
||||||
|
{
|
||||||
|
public const BYTES_SIZE = 4;
|
||||||
|
public const BYTES_TYPE = 4;
|
||||||
|
public const BYTES_ATTEMPTS = 2;
|
||||||
|
public const BYTES_TIMESTAMP = 8;
|
||||||
|
public const BYTES_ID = 16;
|
||||||
|
}
|
@ -13,7 +13,6 @@ use Socket\Raw\Socket;
|
|||||||
use Throwable;
|
use Throwable;
|
||||||
use function json_encode;
|
use function json_encode;
|
||||||
use function pack;
|
use function pack;
|
||||||
use function sprintf;
|
|
||||||
use const JSON_FORCE_OBJECT;
|
use const JSON_FORCE_OBJECT;
|
||||||
use const JSON_THROW_ON_ERROR;
|
use const JSON_THROW_ON_ERROR;
|
||||||
use const PHP_EOL;
|
use const PHP_EOL;
|
||||||
@ -23,18 +22,6 @@ use const PHP_EOL;
|
|||||||
*/
|
*/
|
||||||
abstract class Connection
|
abstract class Connection
|
||||||
{
|
{
|
||||||
protected const OK = 'OK';
|
|
||||||
protected const HEARTBEAT = '_heartbeat_';
|
|
||||||
protected const CLOSE_WAIT = 'CLOSE_WAIT';
|
|
||||||
protected const TYPE_RESPONSE = 0;
|
|
||||||
protected const TYPE_ERROR = 1;
|
|
||||||
protected const TYPE_MESSAGE = 2;
|
|
||||||
protected const BYTES_SIZE = 4;
|
|
||||||
protected const BYTES_TYPE = 4;
|
|
||||||
protected const BYTES_ATTEMPTS = 2;
|
|
||||||
protected const BYTES_TIMESTAMP = 8;
|
|
||||||
protected const BYTES_ID = 16;
|
|
||||||
|
|
||||||
public ?Socket $socket = null;
|
public ?Socket $socket = null;
|
||||||
|
|
||||||
protected LoggerInterface $logger;
|
protected LoggerInterface $logger;
|
||||||
@ -76,7 +63,7 @@ abstract class Connection
|
|||||||
|
|
||||||
$this->logger->info('Feature Negotiation: '.http_build_query($this->features));
|
$this->logger->info('Feature Negotiation: '.http_build_query($this->features));
|
||||||
|
|
||||||
$this->send('IDENTIFY '.PHP_EOL.$size.$body)->expectResponse(self::OK);
|
$this->send('IDENTIFY '.PHP_EOL.$size.$body)->getResponse()->okOrFail();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -89,7 +76,7 @@ abstract class Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$this->send('CLS'.PHP_EOL)->expectResponse(self::CLOSE_WAIT);
|
$this->send('CLS'.PHP_EOL);
|
||||||
|
|
||||||
if (null !== $this->socket) {
|
if (null !== $this->socket) {
|
||||||
$this->socket->close();
|
$this->socket->close();
|
||||||
@ -135,7 +122,7 @@ abstract class Connection
|
|||||||
return $this;
|
return $this;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function receive(float $timeout = 0): ?ByteBuffer
|
protected function receive(float $timeout = 0): ?Response
|
||||||
{
|
{
|
||||||
$socket = $this->socket();
|
$socket = $this->socket();
|
||||||
|
|
||||||
@ -143,32 +130,20 @@ abstract class Connection
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
$size = (new ByteBuffer($socket->read(self::BYTES_SIZE)))->consumeUint32();
|
$size = (new ByteBuffer($socket->read(Bytes::BYTES_SIZE)))->consumeUint32();
|
||||||
|
|
||||||
return new ByteBuffer($socket->read($size));
|
return new Response(new ByteBuffer($socket->read($size)));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function expectResponse(string $expected): void
|
protected function getResponse(): Response
|
||||||
{
|
{
|
||||||
$buffer = $this->receive(0.1);
|
$response = $this->receive(0.1);
|
||||||
if (null === $buffer) {
|
|
||||||
throw new Exception('Success response was expected, but null received.');
|
if (null === $response) {
|
||||||
|
throw new Exception('Response was expected, but null received.');
|
||||||
}
|
}
|
||||||
|
|
||||||
$type = $buffer->consumeUint32();
|
return $response;
|
||||||
$response = $buffer->flush();
|
|
||||||
|
|
||||||
if (self::TYPE_ERROR === $type) {
|
|
||||||
throw new Exception($response);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (self::TYPE_RESPONSE !== $type) {
|
|
||||||
throw new Exception(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $type));
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($expected !== $response) {
|
|
||||||
throw new Exception(sprintf('"%s" response expected, but "%s" received.', $expected, $response));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private function socket(): Socket
|
private function socket(): Socket
|
||||||
|
@ -13,7 +13,7 @@ class Reader extends Connection
|
|||||||
{
|
{
|
||||||
$buffer = sprintf('SUB %s %s', $topic, $channel).PHP_EOL;
|
$buffer = sprintf('SUB %s %s', $topic, $channel).PHP_EOL;
|
||||||
|
|
||||||
$this->send($buffer)->expectResponse(self::OK);
|
$this->send($buffer)->getResponse()->okOrFail();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
62
src/Response.php
Normal file
62
src/Response.php
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq;
|
||||||
|
|
||||||
|
use PHPinnacle\Buffer\ByteBuffer;
|
||||||
|
|
||||||
|
final class Response
|
||||||
|
{
|
||||||
|
private const OK = 'OK';
|
||||||
|
private const HEARTBEAT = '_heartbeat_';
|
||||||
|
private const TYPE_RESPONSE = 0;
|
||||||
|
private const TYPE_ERROR = 1;
|
||||||
|
private const TYPE_MESSAGE = 2;
|
||||||
|
|
||||||
|
private int $type;
|
||||||
|
|
||||||
|
private ByteBuffer $buffer;
|
||||||
|
|
||||||
|
public function __construct(ByteBuffer $buffer)
|
||||||
|
{
|
||||||
|
$this->type = $buffer->consumeUint32();
|
||||||
|
$this->buffer = $buffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function okOrFail(): void
|
||||||
|
{
|
||||||
|
if (self::TYPE_ERROR === $this->type) {
|
||||||
|
throw new Exception($this->buffer->bytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (self::TYPE_RESPONSE !== $this->type) {
|
||||||
|
throw new Exception(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (self::OK !== $this->buffer->bytes()) {
|
||||||
|
throw new Exception(sprintf('OK response expected, but "%s" received.', $this->buffer->bytes()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public function isHeartBeat(): bool
|
||||||
|
{
|
||||||
|
return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function toMessage(): Message
|
||||||
|
{
|
||||||
|
if (self::TYPE_MESSAGE !== $this->type) {
|
||||||
|
throw new Exception(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type));
|
||||||
|
}
|
||||||
|
|
||||||
|
$buffer = new ByteBuffer($this->buffer->bytes());
|
||||||
|
|
||||||
|
$timestamp = $buffer->consumeInt64();
|
||||||
|
$attempts = $buffer->consumeUint16();
|
||||||
|
$id = $buffer->consume(Bytes::BYTES_ID);
|
||||||
|
$body = $buffer->flush();
|
||||||
|
|
||||||
|
return new Message($timestamp, $attempts, $id, $body);
|
||||||
|
}
|
||||||
|
}
|
@ -50,17 +50,12 @@ final class Subscriber extends Reader
|
|||||||
{
|
{
|
||||||
$deadline = microtime(true) + $timeout;
|
$deadline = microtime(true) + $timeout;
|
||||||
|
|
||||||
$buffer = $this->receive($timeout);
|
$response = $this->receive($timeout);
|
||||||
if (null === $buffer) {
|
if (null === $response) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
$type = $buffer->consumeUint32();
|
if ($response->isHeartBeat()) {
|
||||||
|
|
||||||
if (self::TYPE_RESPONSE === $type) {
|
|
||||||
$response = $buffer->flush();
|
|
||||||
|
|
||||||
if (self::HEARTBEAT === $response) {
|
|
||||||
$this->send('NOP'.PHP_EOL);
|
$this->send('NOP'.PHP_EOL);
|
||||||
|
|
||||||
return $this->consume(
|
return $this->consume(
|
||||||
@ -68,22 +63,6 @@ final class Subscriber extends Reader
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new Exception(sprintf('Unexpected response: %s', $response));
|
return $response->toMessage();
|
||||||
}
|
|
||||||
|
|
||||||
if (self::TYPE_ERROR === $type) {
|
|
||||||
throw new Exception(sprintf('NSQ return error: "%s"', $buffer->flush()));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (self::TYPE_MESSAGE !== $type) {
|
|
||||||
throw new Exception(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $type));
|
|
||||||
}
|
|
||||||
|
|
||||||
$timestamp = $buffer->consumeInt64();
|
|
||||||
$attempts = $buffer->consumeUint16();
|
|
||||||
$id = $buffer->consume(self::BYTES_ID);
|
|
||||||
$body = $buffer->flush();
|
|
||||||
|
|
||||||
return new Message($timestamp, $attempts, $id, $body);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,7 @@ final class Writer extends Connection
|
|||||||
|
|
||||||
$buffer = 'PUB '.$topic.PHP_EOL.$size.$body;
|
$buffer = 'PUB '.$topic.PHP_EOL.$size.$body;
|
||||||
|
|
||||||
$this->send($buffer)->expectResponse(self::OK);
|
$this->send($buffer)->getResponse()->okOrFail();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -41,7 +41,7 @@ final class Writer extends Connection
|
|||||||
|
|
||||||
$buffer = 'MPUB '.$topic.PHP_EOL.$size.$num.$mb;
|
$buffer = 'MPUB '.$topic.PHP_EOL.$size.$num.$mb;
|
||||||
|
|
||||||
$this->send($buffer)->expectResponse(self::OK);
|
$this->send($buffer)->getResponse()->okOrFail();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -53,6 +53,6 @@ final class Writer extends Connection
|
|||||||
|
|
||||||
$buffer = sprintf('DPUB %s %s', $topic, $deferTime).PHP_EOL.$size.$body;
|
$buffer = sprintf('DPUB %s %s', $topic, $deferTime).PHP_EOL.$size.$body;
|
||||||
|
|
||||||
$this->send($buffer)->expectResponse(self::OK);
|
$this->send($buffer)->getResponse()->okOrFail();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user