Initial commit
This commit is contained in:
19
src/Config.php
Normal file
19
src/Config.php
Normal file
@@ -0,0 +1,19 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
/**
|
||||
* @psalm-immutable
|
||||
*/
|
||||
final class Config
|
||||
{
|
||||
public string $address;
|
||||
|
||||
public function __construct(
|
||||
string $address
|
||||
) {
|
||||
$this->address = $address;
|
||||
}
|
||||
}
|
136
src/Connection.php
Normal file
136
src/Connection.php
Normal file
@@ -0,0 +1,136 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use LogicException;
|
||||
use PHPinnacle\Buffer\ByteBuffer;
|
||||
use Socket\Raw\Factory;
|
||||
use Socket\Raw\Socket;
|
||||
use Throwable;
|
||||
use function json_encode;
|
||||
use function pack;
|
||||
use function sprintf;
|
||||
use const JSON_FORCE_OBJECT;
|
||||
use const JSON_THROW_ON_ERROR;
|
||||
use const PHP_EOL;
|
||||
|
||||
final class Connection
|
||||
{
|
||||
private const OK = 'OK';
|
||||
private const HEARTBEAT = '_heartbeat_';
|
||||
private const CLOSE_WAIT = 'CLOSE_WAIT';
|
||||
private const TYPE_RESPONSE = 0;
|
||||
private const TYPE_ERROR = 1;
|
||||
private const TYPE_MESSAGE = 2;
|
||||
private const BYTES_SIZE = 4;
|
||||
private const BYTES_TYPE = 4;
|
||||
private const BYTES_ATTEMPTS = 2;
|
||||
private const BYTES_TIMESTAMP = 8;
|
||||
private const BYTES_ID = 16;
|
||||
private const MAGIC_V2 = ' V2';
|
||||
|
||||
public Socket $socket;
|
||||
|
||||
public bool $closed = false;
|
||||
|
||||
private function __construct(Socket $socket)
|
||||
{
|
||||
$this->socket = $socket;
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-suppress UnsafeInstantiation
|
||||
*
|
||||
* @return static
|
||||
*/
|
||||
public static function connect(Config $config): self
|
||||
{
|
||||
$socket = (new Factory())->createClient($config->address);
|
||||
$socket->write(self::MAGIC_V2);
|
||||
|
||||
// @phpstan-ignore-next-line
|
||||
return new self($socket);
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-param array<string, string|numeric> $arr
|
||||
*
|
||||
* @psalm-suppress PossiblyFalseOperand
|
||||
*/
|
||||
public function identify(array $arr): string
|
||||
{
|
||||
$body = json_encode($arr, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
||||
$size = pack('N', \strlen($body));
|
||||
|
||||
return 'IDENTIFY '.PHP_EOL.$size.$body;
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-suppress PossiblyFalseOperand
|
||||
*/
|
||||
public function auth(string $secret): string
|
||||
{
|
||||
$size = pack('N', \strlen($secret));
|
||||
|
||||
return 'AUTH'.PHP_EOL.$size.$secret;
|
||||
}
|
||||
|
||||
public function write(string $buffer): void
|
||||
{
|
||||
if ($this->closed) {
|
||||
throw new LogicException('This connection is closed, create new one.');
|
||||
}
|
||||
|
||||
try {
|
||||
$this->socket->write($buffer);
|
||||
} catch (Throwable $e) {
|
||||
$this->closed = true;
|
||||
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
|
||||
public function read(): ?Message
|
||||
{
|
||||
$socket = $this->socket;
|
||||
|
||||
$buffer = new ByteBuffer($socket->read(self::BYTES_SIZE + self::BYTES_TYPE));
|
||||
$size = $buffer->consumeUint32();
|
||||
$type = $buffer->consumeUint32();
|
||||
|
||||
$buffer->append($socket->read($size - self::BYTES_TYPE));
|
||||
|
||||
if (self::TYPE_RESPONSE === $type) {
|
||||
$response = $buffer->consume($size - self::BYTES_TYPE);
|
||||
|
||||
if (self::OK === $response || self::CLOSE_WAIT === $response) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (self::HEARTBEAT === $response) {
|
||||
$socket->write('NOP'.PHP_EOL);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
throw new LogicException(sprintf('Unexpected response from nsq: "%s"', $response));
|
||||
}
|
||||
|
||||
if (self::TYPE_ERROR === $type) {
|
||||
throw new LogicException(sprintf('NSQ return error: "%s"', $socket->read($size)));
|
||||
}
|
||||
|
||||
if (self::TYPE_MESSAGE !== $type) {
|
||||
throw new LogicException(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $type));
|
||||
}
|
||||
|
||||
$timestamp = $buffer->consumeInt64();
|
||||
$attempts = $buffer->consumeUint16();
|
||||
$id = $buffer->consume(self::BYTES_ID);
|
||||
$body = $buffer->consume($size - self::BYTES_TYPE - self::BYTES_TIMESTAMP - self::BYTES_ATTEMPTS - self::BYTES_ID);
|
||||
|
||||
return new Message($timestamp, $attempts, $id, $body);
|
||||
}
|
||||
}
|
51
src/Envelope.php
Normal file
51
src/Envelope.php
Normal file
@@ -0,0 +1,51 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
/**
|
||||
* @psalm-immutable
|
||||
*/
|
||||
final class Envelope
|
||||
{
|
||||
public Message $message;
|
||||
|
||||
/**
|
||||
* @var callable
|
||||
*/
|
||||
private $acknowledge;
|
||||
|
||||
/**
|
||||
* @var callable
|
||||
*/
|
||||
private $requeue;
|
||||
|
||||
/**
|
||||
* @var callable
|
||||
*/
|
||||
private $touching;
|
||||
|
||||
public function __construct(Message $message, callable $ack, callable $req, callable $touch)
|
||||
{
|
||||
$this->message = $message;
|
||||
$this->acknowledge = $ack;
|
||||
$this->requeue = $req;
|
||||
$this->touching = $touch;
|
||||
}
|
||||
|
||||
public function ack(): void
|
||||
{
|
||||
\call_user_func($this->acknowledge);
|
||||
}
|
||||
|
||||
public function retry(int $timeout): void
|
||||
{
|
||||
\call_user_func($this->requeue, $timeout);
|
||||
}
|
||||
|
||||
public function touch(): void
|
||||
{
|
||||
\call_user_func($this->touching);
|
||||
}
|
||||
}
|
27
src/Message.php
Normal file
27
src/Message.php
Normal file
@@ -0,0 +1,27 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
/**
|
||||
* @psalm-immutable
|
||||
*/
|
||||
final class Message
|
||||
{
|
||||
public int $timestamp;
|
||||
|
||||
public int $attempts;
|
||||
|
||||
public string $id;
|
||||
|
||||
public string $body;
|
||||
|
||||
public function __construct(int $timestamp, int $attempts, string $id, string $body)
|
||||
{
|
||||
$this->timestamp = $timestamp;
|
||||
$this->attempts = $attempts;
|
||||
$this->id = $id;
|
||||
$this->body = $body;
|
||||
}
|
||||
}
|
99
src/Reader.php
Normal file
99
src/Reader.php
Normal file
@@ -0,0 +1,99 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Throwable;
|
||||
use function sprintf;
|
||||
use const PHP_EOL;
|
||||
|
||||
class Reader
|
||||
{
|
||||
private Connection $connection;
|
||||
|
||||
public function __construct(Connection $connection)
|
||||
{
|
||||
$this->connection = $connection;
|
||||
}
|
||||
|
||||
public function __destruct()
|
||||
{
|
||||
$this->close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to a topic/channel.
|
||||
*/
|
||||
public function sub(string $topic, string $channel): void
|
||||
{
|
||||
$buffer = sprintf('SUB %s %s', $topic, $channel).PHP_EOL;
|
||||
|
||||
$this->connection->write($buffer);
|
||||
$this->connection->read();
|
||||
}
|
||||
|
||||
/**
|
||||
* Update RDY state (indicate you are ready to receive N messages).
|
||||
*/
|
||||
public function rdy(int $count): void
|
||||
{
|
||||
$this->connection->write('RDY '.$count.PHP_EOL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish a message (indicate successful processing).
|
||||
*/
|
||||
public function fin(string $id): void
|
||||
{
|
||||
$this->connection->write('FIN '.$id.PHP_EOL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Re-queue a message (indicate failure to process)
|
||||
* The re-queued message is placed at the tail of the queue, equivalent to having just published it,
|
||||
* but for various implementation specific reasons that behavior should not be explicitly relied upon and may change in the future.
|
||||
* Similarly, a message that is in-flight and times out behaves identically to an explicit REQ.
|
||||
*/
|
||||
public function req(string $id, int $timeout): void
|
||||
{
|
||||
$this->connection->write(sprintf('REQ %s %s', $id, $timeout).PHP_EOL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the timeout for an in-flight message.
|
||||
*/
|
||||
public function touch(string $id): void
|
||||
{
|
||||
$this->connection->write('TOUCH '.$id.PHP_EOL);
|
||||
}
|
||||
|
||||
public function consume(?float $timeout = null): ?Message
|
||||
{
|
||||
if (false === $this->connection->socket->selectRead($timeout)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return $this->connection->read() ?? $this->consume(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanly close your connection (no more messages are sent).
|
||||
*/
|
||||
public function close(): void
|
||||
{
|
||||
if ($this->connection->closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->connection->closed = true;
|
||||
|
||||
$this->connection->socket->write('CLS'.PHP_EOL);
|
||||
$this->connection->read();
|
||||
|
||||
try {
|
||||
$this->connection->socket->close();
|
||||
} catch (Throwable $e) {
|
||||
}
|
||||
}
|
||||
}
|
74
src/Subscriber.php
Normal file
74
src/Subscriber.php
Normal file
@@ -0,0 +1,74 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Generator;
|
||||
use LogicException;
|
||||
|
||||
final class Subscriber
|
||||
{
|
||||
private Reader $reader;
|
||||
|
||||
public function __construct(Reader $reader)
|
||||
{
|
||||
$this->reader = $reader;
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Generator<int, Envelope|null, true|null, void>
|
||||
*/
|
||||
public function subscribe(string $topic, string $channel, ?float $timeout = 0): Generator
|
||||
{
|
||||
$reader = $this->reader;
|
||||
$reader->sub($topic, $channel);
|
||||
$reader->rdy(1);
|
||||
|
||||
while (true) {
|
||||
$message = $reader->consume($timeout);
|
||||
|
||||
if (null === $message) {
|
||||
if (true === yield null) {
|
||||
break;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
$finished = false;
|
||||
$envelop = new Envelope(
|
||||
$message,
|
||||
static function () use ($reader, $message, &$finished): void {
|
||||
if ($finished) {
|
||||
throw new LogicException('Can\'t ack, message already finished.');
|
||||
}
|
||||
|
||||
$finished = true;
|
||||
|
||||
$reader->fin($message->id);
|
||||
},
|
||||
static function (int $timeout) use ($reader, $message, &$finished): void {
|
||||
if ($finished) {
|
||||
throw new LogicException('Can\'t retry, message already finished.');
|
||||
}
|
||||
|
||||
$finished = true;
|
||||
|
||||
$reader->req($message->id, $timeout);
|
||||
},
|
||||
static function () use ($reader, $message): void {
|
||||
$reader->touch($message->id);
|
||||
},
|
||||
);
|
||||
|
||||
if (true === yield $envelop) {
|
||||
break;
|
||||
}
|
||||
|
||||
$reader->rdy(1);
|
||||
}
|
||||
|
||||
$reader->close();
|
||||
}
|
||||
}
|
68
src/Writer.php
Normal file
68
src/Writer.php
Normal file
@@ -0,0 +1,68 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use function array_map;
|
||||
use function implode;
|
||||
use function pack;
|
||||
use function sprintf;
|
||||
use const PHP_EOL;
|
||||
|
||||
final class Writer
|
||||
{
|
||||
private Connection $connection;
|
||||
|
||||
public function __construct(Connection $connection)
|
||||
{
|
||||
$this->connection = $connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-suppress PossiblyFalseOperand
|
||||
*/
|
||||
public function pub(string $topic, string $body): void
|
||||
{
|
||||
$size = pack('N', \strlen($body));
|
||||
|
||||
$buffer = 'PUB '.$topic.PHP_EOL.$size.$body;
|
||||
|
||||
$this->connection->write($buffer);
|
||||
$this->connection->read();
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-param array<mixed, mixed> $bodies
|
||||
*
|
||||
* @psalm-suppress PossiblyFalseOperand
|
||||
*/
|
||||
public function mpub(string $topic, array $bodies): void
|
||||
{
|
||||
$num = pack('N', \count($bodies));
|
||||
|
||||
$mb = implode('', array_map(static function ($body): string {
|
||||
return pack('N', \strlen($body)).$body;
|
||||
}, $bodies));
|
||||
|
||||
$size = pack('N', \strlen($num.$mb));
|
||||
|
||||
$buffer = 'MPUB '.$topic.PHP_EOL.$size.$num.$mb;
|
||||
|
||||
$this->connection->write($buffer);
|
||||
$this->connection->read();
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-suppress PossiblyFalseOperand
|
||||
*/
|
||||
public function dpub(string $topic, int $deferTime, string $body): void
|
||||
{
|
||||
$size = pack('N', \strlen($body));
|
||||
|
||||
$buffer = sprintf('DPUB %s %s', $topic, $deferTime).PHP_EOL.$size.$body;
|
||||
|
||||
$this->connection->write($buffer);
|
||||
$this->connection->read();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user