21 Commits

Author SHA1 Message Date
443473f53d handle heartbeat after reading frame 2021-02-05 00:08:06 +03:00
57b7715c2e simplify command 2021-02-04 23:09:47 +03:00
b126094e74 Close connection on __destruct 2021-02-04 23:07:41 +03:00
15296f4b61 amphp 2021-02-04 22:57:46 +03:00
875cb8b542 Mark message methods as internal 2021-02-01 03:08:14 +03:00
b2b444d1ef cs: don't import globals 2021-02-01 02:49:53 +03:00
f6ef057e40 Snappy throw exception if extension not installed 2021-02-01 02:40:00 +03:00
db988a0914 Fix receiving partial message from nsq 2021-02-01 02:38:19 +03:00
3c0ec5574d add ru.grachevko.dhu labels 2021-02-01 02:33:38 +03:00
b395e1e3ee Fix receiving double heartbeat messages 2021-02-01 01:20:13 +03:00
801fdfe547 Snappy fix receiving partial message 2021-02-01 01:17:39 +03:00
3d8f5be2d0 Snappy 2021-02-01 00:39:07 +03:00
070b980003 Use Buffer in Producer::mpub 2021-01-31 18:11:37 +03:00
a66f622cf6 Expect heartbeat in checkIsOk 2021-01-31 18:11:24 +03:00
72dca5c73b Wrap Socket 2021-01-31 17:26:07 +03:00
e3485416a5 Move try/catch from readFrame to read 2021-01-30 18:40:34 +03:00
930314f1ac Convert comments to phpdoc 2021-01-30 18:26:31 +03:00
e1cca2d3eb Remove Subscriber 2021-01-30 18:22:34 +03:00
1a24efacfb NSQ Connection can subscribe only for one topic 2021-01-30 18:13:50 +03:00
a7b847146a Prevent recreate ByteBuffer objects 2021-01-30 18:05:36 +03:00
f74b82a400 Explode Response to Frames 2021-01-30 17:40:17 +03:00
39 changed files with 1119 additions and 813 deletions

View File

@@ -8,14 +8,19 @@ return (new PhpCsFixer\Config())
'@PhpCsFixer:risky' => true, '@PhpCsFixer:risky' => true,
'@PSR12' => true, '@PSR12' => true,
'@PSR12:risky' => true, '@PSR12:risky' => true,
'declare_strict_types' => true, 'braces' => [
'php_unit_internal_class' => false, 'allow_single_line_closure' => true,
'php_unit_test_class_requires_covers' => false,
'yoda_style' => true,
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
'blank_line_before_statement' => [
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try']
], ],
'blank_line_before_statement' => [
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try'],
],
'declare_strict_types' => true,
'global_namespace_import' => ['import_classes' => false, 'import_constants' => false, 'import_functions' => false],
'php_unit_internal_class' => false,
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
'php_unit_test_class_requires_covers' => false,
'phpdoc_to_comment' => false,
'yoda_style' => true,
]) ])
->setFinder( ->setFinder(
PhpCsFixer\Finder::create() PhpCsFixer\Finder::create()

View File

@@ -34,14 +34,15 @@ Features
- [ ] Discovery - [ ] Discovery
- [ ] Backoff - [ ] Backoff
- [ ] TLS - [ ] TLS
- [ ] Snappy - [ ] Deflate
- [X] Snappy
- [X] Sampling - [X] Sampling
- [X] AUTH - [X] AUTH
Usage Usage
----- -----
### Publish ### Producer
```php ```php
use Nsq\Producer; use Nsq\Producer;
@@ -61,17 +62,21 @@ $producer->mpub('topic', [
$producer->dpub('topic', 'Deferred message', delay: 5000); $producer->dpub('topic', 'Deferred message', delay: 5000);
``` ```
### Subscription ### Consumer
```php ```php
use Nsq\Consumer; use Nsq\Consumer;
use Nsq\Message; use Nsq\Protocol\Message;
use Nsq\Subscriber;
$consumer = new Consumer('tcp://nsqd:4150'); $consumer = new Consumer(
$subscriber = new Subscriber($consumer); topic: 'topic',
channel: 'channel',
address: 'tcp://nsqd:4150',
);
// Simple blocking loop based on generator
$generator = $consumer->generator();
$generator = $subscriber->subscribe('topic', 'channel');
foreach ($generator as $message) { foreach ($generator as $message) {
if ($message instanceof Message) { if ($message instanceof Message) {
$payload = $message->body; $payload = $message->body;
@@ -85,12 +90,9 @@ foreach ($generator as $message) {
// In case of nothing received during timeout generator will return NULL // In case of nothing received during timeout generator will return NULL
// Here we can do something between messages, like pcntl_signal_dispatch() // Here we can do something between messages, like pcntl_signal_dispatch()
// We can also communicate with generator through send
// for example:
// Gracefully close connection (loop will be ended) // Gracefully close connection (loop will be ended)
$generator->send(Subscriber::STOP); $generator->send(0);
} }
``` ```

View File

@@ -13,12 +13,13 @@
"require": { "require": {
"php": "^8.0.1", "php": "^8.0.1",
"ext-json": "*", "ext-json": "*",
"clue/socket-raw": "^1.5", "amphp/socket": "^1.1",
"composer/semver": "^3.2", "composer/semver": "^3.2",
"phpinnacle/buffer": "^1.2", "phpinnacle/buffer": "^1.2",
"psr/log": "^1.1" "psr/log": "^1.1"
}, },
"require-dev": { "require-dev": {
"amphp/log": "^1.1",
"dg/bypass-finals": "^1.3", "dg/bypass-finals": "^1.3",
"ergebnis/composer-normalize": "9999999-dev", "ergebnis/composer-normalize": "9999999-dev",
"friendsofphp/php-cs-fixer": "^2.18", "friendsofphp/php-cs-fixer": "^2.18",

View File

@@ -3,13 +3,18 @@ version: '3.7'
services: services:
nsqd: nsqd:
image: nsqio/nsq:v1.2.0 image: nsqio/nsq:v1.2.0
command: /nsqd labels:
ru.grachevko.dhu: 'nsqd'
command: /nsqd -log-level debug
# command: /nsqd
ports: ports:
- 4150:4150 - 4150:4150
- 4151:4151 - 4151:4151
nsqadmin: nsqadmin:
image: nsqio/nsq:v1.2.0 image: nsqio/nsq:v1.2.0
labels:
ru.grachevko.dhu: 'nsqadmin'
command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171 command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171
ports: ports:
- 4171:4171 - 4171:4171

49
examples/consumer.php Normal file
View File

@@ -0,0 +1,49 @@
<?php
declare(strict_types=1);
require __DIR__.'/../vendor/autoload.php';
use Amp\ByteStream;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig;
use Nsq\Consumer;
use Nsq\Protocol\Message;
use function Amp\call;
Loop::run(static function () {
$handler = new StreamHandler(ByteStream\getStdout());
$handler->setFormatter(new ConsoleFormatter());
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
$consumer = new Consumer(
'tcp://localhost:4150',
clientConfig: new ClientConfig(
deflate: false,
snappy: false,
),
logger: $logger,
);
yield $consumer->connect();
yield $consumer->listen(
topic: 'local',
channel: 'local',
onMessage: static function (Message $message) use ($logger): Promise {
return call(function () use ($message, $logger): Generator {
$logger->info('Received: {body}', ['body' => $message->body]);
yield $message->finish();
return new Success(false);
});
}
);
});

33
examples/producer.php Normal file
View File

@@ -0,0 +1,33 @@
<?php
declare(strict_types=1);
require __DIR__.'/../vendor/autoload.php';
use Amp\ByteStream;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Loop;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig;
use Nsq\Producer;
Loop::run(static function () {
$handler = new StreamHandler(ByteStream\getStdout());
$handler->setFormatter(new ConsoleFormatter());
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
$producer = new Producer(
'tcp://localhost:4150',
clientConfig: new ClientConfig(
deflate: false,
snappy: false,
),
logger: $logger,
);
yield $producer->connect();
yield $producer->pub(topic: 'topic', body: 'Message body!');
});

View File

@@ -5,9 +5,6 @@ declare(strict_types=1);
namespace Nsq\Config; namespace Nsq\Config;
use Composer\InstalledVersions; use Composer\InstalledVersions;
use InvalidArgumentException;
use JsonSerializable;
use function gethostname;
/** /**
* This class is used for configuring the clients for nsq. Immutable properties must be set when creating the object and * This class is used for configuring the clients for nsq. Immutable properties must be set when creating the object and
@@ -16,25 +13,34 @@ use function gethostname;
* *
* @psalm-immutable * @psalm-immutable
*/ */
final class ClientConfig implements JsonSerializable final class ClientConfig implements \JsonSerializable
{ {
/** @psalm-suppress ImpureFunctionCall */ /**
* @psalm-suppress ImpureFunctionCall
*/
public function __construct( public function __construct(
/* /**
* The secret used for authorization, if the server requires it. This value will be ignored if the server * The secret used for authorization, if the server requires it. This value will be ignored if the server
* does not require authorization. * does not require authorization.
*/ */
public ?string $authSecret = null, public ?string $authSecret = null,
// The timeout for establishing a connection in seconds. /**
* The timeout for establishing a connection in seconds.
*/
public int $connectTimeout = 10, public int $connectTimeout = 10,
// An identifier used to disambiguate this client (i.e. something specific to the consumer) /**
* An identifier used to disambiguate this client (i.e. something specific to the consumer).
*/
public string $clientId = '', public string $clientId = '',
// Enable deflate compression for this connection. A client cannot enable both [snappy] and [deflate]. /**
* Enable deflate compression for this connection. A client cannot enable both [snappy] and [deflate].
*/
public bool $deflate = false, public bool $deflate = false,
/*
/**
* Configure the deflate compression level for this connection. * Configure the deflate compression level for this connection.
* *
* Valid range: `1 <= deflate_level <= configured_max` * Valid range: `1 <= deflate_level <= configured_max`
@@ -43,42 +49,54 @@ final class ClientConfig implements JsonSerializable
*/ */
public int $deflateLevel = 6, public int $deflateLevel = 6,
/* /**
* Milliseconds between heartbeats. * Milliseconds between heartbeats.
* *
* Valid range: `1000 <= heartbeat_interval <= configured_max` (`-1` disables heartbeats) * Valid range: `1000 <= heartbeat_interval <= configured_max` (`-1` disables heartbeats)
*/ */
public int $heartbeatInterval = 30000, public int $heartbeatInterval = 30000,
// The hostname where the client is deployed /**
* The hostname where the client is deployed.
*/
public string $hostname = '', public string $hostname = '',
// Configure the server-side message timeout in milliseconds for messages delivered to this client. /**
* Configure the server-side message timeout in milliseconds for messages delivered to this client.
*/
public int $msgTimeout = 60000, public int $msgTimeout = 60000,
/* /**
* The sample rate for incoming data to deliver a percentage of all messages received to this connection. * The sample rate for incoming data to deliver a percentage of all messages received to this connection.
* This only applies to subscribing connections. The valid range is between 0 and 99, where 0 means that all * This only applies to subscribing connections. The valid range is between 0 and 99, where 0 means that all
* data is sent (this is the default). 1 means that 1% of the data is sent. * data is sent (this is the default). 1 means that 1% of the data is sent.
*/ */
public int $sampleRate = 0, public int $sampleRate = 0,
/* /**
* Boolean used to indicate that the client supports feature negotiation. If the server is capable, * Boolean used to indicate that the client supports feature negotiation. If the server is capable,
* it will send back a JSON payload of supported features and metadata. * it will send back a JSON payload of supported features and metadata.
*/ */
public bool $featureNegotiation = true, public bool $featureNegotiation = true,
// Enable TLS for this connection /**
* Enable TLS for this connection.
*/
public bool $tls = false, public bool $tls = false,
// Enable snappy compression for this connection. A client cannot enable both [snappy] and [deflate]. /**
* Enable snappy compression for this connection. A client cannot enable both [snappy] and [deflate].
*/
public bool $snappy = false, public bool $snappy = false,
// The read timeout for connection sockets and for awaiting responses from nsq. /**
* The read timeout for connection sockets and for awaiting responses from nsq.
*/
public int $readTimeout = 5, public int $readTimeout = 5,
// A string identifying the agent for this client in the spirit of HTTP. /**
* A string identifying the agent for this client in the spirit of HTTP.
*/
public string $userAgent = '', public string $userAgent = '',
) { ) {
$this->featureNegotiation = true; // Always enabled $this->featureNegotiation = true; // Always enabled
@@ -92,7 +110,7 @@ final class ClientConfig implements JsonSerializable
} }
if ($this->snappy && $this->deflate) { if ($this->snappy && $this->deflate) {
throw new InvalidArgumentException('Client cannot enable both [snappy] and [deflate]'); throw new \InvalidArgumentException('Client cannot enable both [snappy] and [deflate]');
} }
} }
@@ -115,4 +133,9 @@ final class ClientConfig implements JsonSerializable
'user_agent' => $this->userAgent, 'user_agent' => $this->userAgent,
]; ];
} }
public function toString(): string
{
return json_encode($this, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
}
} }

View File

@@ -12,47 +12,72 @@ namespace Nsq\Config;
final class ConnectionConfig final class ConnectionConfig
{ {
public function __construct( public function __construct(
// Whether or not authorization is required by nsqd. /**
* Whether or not authorization is required by nsqd.
*/
public bool $authRequired, public bool $authRequired,
// Whether deflate compression is enabled for this connection or not. /**
* Whether deflate compression is enabled for this connection or not.
*/
public bool $deflate, public bool $deflate,
// The deflate level. This value can be ignored if [deflate] is `false`.
/**
* The deflate level. This value can be ignored if [deflate] is `false`.
*/
public int $deflateLevel, public int $deflateLevel,
// The maximum deflate level supported by the server. /**
* The maximum deflate level supported by the server.
*/
public int $maxDeflateLevel, public int $maxDeflateLevel,
// The maximum value for message timeout. /**
* The maximum value for message timeout.
*/
public int $maxMsgTimeout, public int $maxMsgTimeout,
/*
/**
* Each nsqd is configurable with a max-rdy-count. If the consumer sends a RDY count that is outside * Each nsqd is configurable with a max-rdy-count. If the consumer sends a RDY count that is outside
* of the acceptable range its connection will be forcefully closed. * of the acceptable range its connection will be forcefully closed.
*/ */
public int $maxRdyCount, public int $maxRdyCount,
// The effective message timeout. /**
* The effective message timeout.
*/
public int $msgTimeout, public int $msgTimeout,
// The size in bytes of the buffer nsqd will use when writing to this client. /**
* The size in bytes of the buffer nsqd will use when writing to this client.
*/
public int $outputBufferSize, public int $outputBufferSize,
// The timeout after which any data that nsqd has buffered will be flushed to this client.
/**
* The timeout after which any data that nsqd has buffered will be flushed to this client.
*/
public int $outputBufferTimeout, public int $outputBufferTimeout,
/* /**
* The sample rate for incoming data to deliver a percentage of all messages received to this connection. * The sample rate for incoming data to deliver a percentage of all messages received to this connection.
* This only applies to subscribing connections. The valid range is between 0 and 99, where 0 means that all * This only applies to subscribing connections. The valid range is between 0 and 99, where 0 means that all
* data is sent (this is the default). 1 means that 1% of the data is sent. * data is sent (this is the default). 1 means that 1% of the data is sent.
*/ */
public int $sampleRate, public int $sampleRate,
// Whether snappy compression is enabled for this connection or not. /**
* Whether snappy compression is enabled for this connection or not.
*/
public bool $snappy, public bool $snappy,
// Whether TLS is enabled for this connection or not. /**
* Whether TLS is enabled for this connection or not.
*/
public bool $tls, public bool $tls,
// The nsqd version. /**
* The nsqd version.
*/
public string $version, public string $version,
) { ) {
} }

View File

@@ -4,340 +4,254 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\ZlibInputStream;
use Amp\ByteStream\ZlibOutputStream;
use Amp\Failure;
use Amp\Promise;
use Amp\Socket\Socket;
use Amp\Success;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Config\ConnectionConfig; use Nsq\Config\ConnectionConfig;
use Nsq\Exception\AuthenticationRequired; use Nsq\Exception\AuthenticationRequired;
use Nsq\Exception\ConnectionFail; use Nsq\Exception\BadResponse;
use Nsq\Exception\UnexpectedResponse; use Nsq\Exception\NotConnected;
use Nsq\Reconnect\ExponentialStrategy; use Nsq\Exception\NsqError;
use Nsq\Reconnect\ReconnectStrategy; use Nsq\Exception\NsqException;
use Nsq\Protocol\Error;
use Nsq\Protocol\Frame;
use Nsq\Protocol\Message;
use Nsq\Protocol\Response;
use Nsq\Stream\NsqInputStream;
use Nsq\Stream\NullStream;
use Nsq\Stream\SnappyInputStream;
use Nsq\Stream\SnappyOutputStream;
use PHPinnacle\Buffer\ByteBuffer; use PHPinnacle\Buffer\ByteBuffer;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger; use Psr\Log\NullLogger;
use Socket\Raw\Exception; use function Amp\call;
use Socket\Raw\Factory; use function Amp\Socket\connect;
use Socket\Raw\Socket;
use function addcslashes;
use function hash;
use function http_build_query;
use function implode;
use function json_encode;
use function pack;
use function snappy_compress;
use function unpack;
use const JSON_FORCE_OBJECT;
use const JSON_THROW_ON_ERROR;
use const PHP_EOL;
/** /**
* @internal * @internal
*
* @property ConnectionConfig $connectionConfig
*/ */
abstract class Connection abstract class Connection
{ {
use LoggerAwareTrait;
private string $address;
private ?Socket $socket = null; private ?Socket $socket = null;
private ReconnectStrategy $reconnect; private InputStream $inputStream;
private ClientConfig $clientConfig; private OutputStream $outputStream;
private ?ConnectionConfig $connectionConfig = null; private ByteBuffer $buffer;
public function __construct( protected ?ConnectionConfig $connectionConfig = null;
string $address,
protected ClientConfig $clientConfig;
protected LoggerInterface $logger;
final public function __construct(
private string $address,
ClientConfig $clientConfig = null, ClientConfig $clientConfig = null,
ReconnectStrategy $reconnectStrategy = null, ?LoggerInterface $logger = null,
LoggerInterface $logger = null,
) { ) {
$this->address = $address; $this->buffer = new ByteBuffer();
$this->inputStream = $this->outputStream = new NullStream();
$this->logger = $logger ?? new NullLogger();
$this->reconnect = $reconnectStrategy ?? new ExponentialStrategy(logger: $this->logger);
$this->clientConfig = $clientConfig ?? new ClientConfig(); $this->clientConfig = $clientConfig ?? new ClientConfig();
$this->logger = $logger ?? new NullLogger();
} }
public function connect(): void public function __destruct()
{ {
$this->reconnect->connect(function (): void { $this->close();
try { }
$this->socket = (new Factory())->createClient($this->address);
}
// @codeCoverageIgnoreStart
catch (Exception $e) {
$this->logger->error('Connecting to {address} failed.', ['address' => $this->address]);
throw ConnectionFail::fromThrowable($e); /**
} * @return Promise<void>
// @codeCoverageIgnoreEnd */
public function connect(): Promise
{
return call(function (): \Generator {
$this->socket = $this->outputStream = yield connect($this->address);
$this->inputStream = new NsqInputStream($this->socket);
$this->socket->write(' V2'); yield $this->outputStream->write(' V2');
$body = json_encode($this->clientConfig, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
$response = $this->command('IDENTIFY', data: $body)->response();
yield $this->command('IDENTIFY', data: $this->clientConfig->toString());
/** @var Response $response */
$response = yield $this->readResponse();
$this->connectionConfig = ConnectionConfig::fromArray($response->toArray()); $this->connectionConfig = ConnectionConfig::fromArray($response->toArray());
if ($this->connectionConfig->snappy || $this->connectionConfig->deflate) { if ($this->connectionConfig->snappy) {
$this->response()->okOrFail(); $this->inputStream = new NsqInputStream(
new SnappyInputStream($this->inputStream, $this->logger),
);
$this->outputStream = new SnappyOutputStream($this->outputStream);
$this->checkIsOK();
}
if ($this->connectionConfig->deflate) {
$this->inputStream = new NsqInputStream(
new ZlibInputStream($this->socket, ZLIB_ENCODING_DEFLATE, [
'level' => $this->connectionConfig->deflateLevel,
]),
);
$this->outputStream = new ZlibOutputStream($this->socket, ZLIB_ENCODING_DEFLATE, [
'level' => $this->connectionConfig->deflateLevel,
]);
$this->checkIsOK();
} }
if ($this->connectionConfig->authRequired) { if ($this->connectionConfig->authRequired) {
if (null === $this->clientConfig->authSecret) { if (null === $this->clientConfig->authSecret) {
throw new AuthenticationRequired('NSQ requires authorization, set ClientConfig::$authSecret before connecting'); yield $this->close();
throw new AuthenticationRequired();
} }
$authResponse = $this->command('AUTH', data: $this->clientConfig->authSecret)->response()->toArray(); yield $this->command('AUTH', data: $this->clientConfig->authSecret);
$response = yield $this->readResponse();
$this->logger->info('Authorization response: '.http_build_query($authResponse)); $this->logger->info('Authorization response: '.http_build_query($response->toArray()));
} }
}); });
} }
/** /**
* Cleanly close your connection (no more messages are sent). * Cleanly close your connection (no more messages are sent).
*
* @return Promise<void>
*/ */
public function disconnect(): void public function close(): Promise
{ {
if (null === $this->socket) { if (null === $this->socket) {
return; return new Success();
} }
try { return call(function (): \Generator {
$this->socket->write('CLS'.PHP_EOL); yield $this->command('CLS');
$this->socket->close();
}
// @codeCoverageIgnoreStart
catch (Exception $e) {
$this->logger->debug($e->getMessage(), ['exception' => $e]);
}
// @codeCoverageIgnoreEnd
$this->socket = null; if (null !== $this->socket) {
$this->connectionConfig = null; $this->socket->close();
$this->socket = null;
}
});
} }
public function isReady(): bool public function isClosed(): bool
{ {
return null !== $this->socket; return null === $this->socket;
} }
/** /**
* @param array<int, int|string>|string $params * @param array<int, int|string>|string $params
*
* @return Promise<void>
*/ */
protected function command(string $command, array | string $params = [], string $data = null): self protected function command(string $command, array | string $params = [], string $data = null): Promise
{
$socket = $this->socket();
$buffer = [] === $params ? $command : implode(' ', [$command, ...((array) $params)]);
$buffer .= PHP_EOL;
if (null !== $data) {
$buffer .= pack('N', \strlen($data));
$buffer .= $data;
}
$this->logger->debug('Prepare send uncompressed buffer: {bytes}', ['bytes' => addcslashes($buffer, PHP_EOL)]);
if ($this->connectionConfig?->snappy) {
$identifierFrame = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
$compressedFrame = 0x00;
$uncompressedFrame = 0x01;
$chunk = snappy_compress($buffer);
[$chunk, $compressFrame] = match (\strlen($chunk) < \strlen($buffer)) {
true => [$chunk, $compressedFrame],
false => [$buffer, $uncompressedFrame],
};
$size = \strlen($chunk) + 4;
$buffer = new ByteBuffer();
foreach ([...$identifierFrame, $compressFrame, $size, $size >> 8, $size >> 16] as $byte) {
$buffer->appendUint8($byte);
}
$crc32c = hash('crc32c', $data, true);
$crc32c = unpack('V', $crc32c)[1];
$unsignedRightShift = static function ($a, $b) {
if ($b >= 32 || $b < -32) {
$m = (int) ($b / 32);
$b -= ($m * 32);
}
if ($b < 0) {
$b = 32 + $b;
}
if (0 === $b) {
return (($a >> 1) & 0x7fffffff) * 2 + (($a >> $b) & 1);
}
if ($a < 0) {
$a >>= 1;
$a &= 2147483647;
$a |= 0x40000000;
$a >>= ($b - 1);
} else {
$a >>= $b;
}
return $a;
};
$checksum = $unsignedRightShift((($crc32c >> 15) | ($crc32c << 17)) + 0xa282ead8, 0);
$buffer->appendUint32($checksum);
$buffer->append($chunk);
$buffer = $buffer->bytes();
}
$this->logger->debug('Prepare send compressed buffer: {bytes}', ['bytes' => addcslashes($buffer, PHP_EOL)]);
try {
$socket->write($buffer);
}
// @codeCoverageIgnoreStart
catch (Exception $e) {
$this->disconnect();
$this->logger->error($e->getMessage(), ['exception' => $e]);
throw ConnectionFail::fromThrowable($e);
}
// @codeCoverageIgnoreEnd
return $this;
}
public function hasMessage(float $timeout = 0): bool
{
try {
return false !== $this->socket()->selectRead($timeout);
}
// @codeCoverageIgnoreStart
catch (Exception $e) {
$this->disconnect();
throw ConnectionFail::fromThrowable($e);
}
// @codeCoverageIgnoreEnd
}
public function receive(float $timeout = null): ?Response
{
$socket = $this->socket();
$timeout ??= $this->clientConfig->readTimeout;
$deadline = microtime(true) + $timeout;
if (!$this->hasMessage($timeout)) {
return null;
}
try {
$size = $socket->read(Bytes::BYTES_SIZE);
if ('' === $size) {
$this->disconnect();
throw new ConnectionFail('Probably connection lost');
}
if ($this->connectionConfig?->snappy) {
$buffer = new ByteBuffer();
$snappyBuffer = new ByteBuffer($size);
while (true) {
$typeByte = \ord($snappyBuffer->consume(1));
$size = \ord($snappyBuffer->consume(1)) + (\ord($snappyBuffer->consume(1)) << 8) + (\ord($snappyBuffer->consume(1)) << 16);
$type = match ($typeByte) {
0xff => 'identifier',
0x00 => 'compressed',
0x01 => 'uncompressed',
0xfe => 'padding',
};
$this->logger->debug('Received snappy chunk: {type}, size: {size}', [
'type' => $type,
'size' => $size,
]);
switch ($typeByte) {
case 0xff: // 'identifier',
$socket->read($size);
$snappyBuffer->append($socket->read(4));
continue 2;
case 0x00: // 'compressed',
case 0x01: // 'uncompressed',
$uncompressed = $socket->read($size);
$this->logger->debug('Received uncompressed bytes: {bytes}', ['bytes' => $uncompressed]);
$buffer->append($uncompressed);
$buffer->consume(4); // slice snappy prefix
$buffer->consumeUint32(); // slice size
break 2;
case 0xfe:// 'padding',
}
}
} else {
$this->logger->debug('Size bytes received: "{bytes}"', ['bytes' => $size]);
$buffer = new ByteBuffer($size);
$size = $buffer->consumeUint32();
do {
$chunk = $socket->read($size);
$buffer->append($chunk);
$size -= \strlen($chunk);
} while (0 < $size);
}
$this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL));
$response = new Response($buffer);
if ($response->isHeartBeat()) {
$this->command('NOP');
return $this->receive(
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
);
}
}
// @codeCoverageIgnoreStart
catch (Exception $e) {
$this->disconnect();
throw ConnectionFail::fromThrowable($e);
}
// @codeCoverageIgnoreEnd
return $response;
}
protected function response(): Response
{
return $this->receive() ?? throw UnexpectedResponse::null();
}
private function socket(): Socket
{ {
if (null === $this->socket) { if (null === $this->socket) {
$this->connect(); return new Failure(new NotConnected());
} }
return $this->socket ?? throw new ConnectionFail('This connection is closed, create new one.'); $command = implode(' ', [$command, ...((array) $params)]);
$buffer = $this->buffer->append($command.PHP_EOL);
if (null !== $data) {
$buffer->appendUint32(\strlen($data));
$buffer->append($data);
}
$this->logger->debug('Sending: {bytes}', ['bytes' => $buffer->bytes()]);
return $this->outputStream->write($buffer->flush());
}
/**
* @return Promise<Frame>
*/
protected function readFrame(): Promise
{
return call(function (): \Generator {
$bytes = yield $this->inputStream->read();
$this->logger->debug('Receiving: {bytes}', ['bytes' => $bytes]);
if (null === $bytes) {
throw new NotConnected();
}
$buffer = $this->buffer->append($bytes);
$frame = match ($type = $buffer->consumeUint32()) {
0 => new Response($buffer->flush()),
1 => new Error($buffer->flush()),
2 => new Message(
timestamp: $buffer->consumeInt64(),
attempts: $buffer->consumeUint16(),
id: $buffer->consume(Bytes::BYTES_ID),
body: $buffer->flush(),
consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'),
),
default => throw new NsqException('Unexpected frame type: '.$type)
};
if ($frame instanceof Response && $frame->isHeartBeat()) {
yield $this->command('NOP');
return $this->readFrame();
}
return $frame;
});
}
/**
* @return Promise<void>
*/
protected function checkIsOK(): Promise
{
return call(function (): \Generator {
/** @var Response $response */
$response = yield $this->readResponse();
if (!$response->isOk()) {
throw new BadResponse($response);
}
$this->logger->debug('Ok checked.');
return call(static function (): void {});
});
}
/**
* @return Promise<Response>
*/
private function readResponse(): Promise
{
return call(function (): \Generator {
$frame = yield $this->readFrame();
if ($frame instanceof Error) {
if ($frame->type->terminateConnection) {
$this->close();
}
throw new NsqError($frame);
}
if (!$frame instanceof Response) {
throw new NsqException('Unreachable statement.');
}
return $frame;
});
} }
} }

View File

@@ -4,60 +4,139 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Amp\Failure;
use Amp\Promise;
use Amp\Success;
use Nsq\Exception\NsqError;
use Nsq\Exception\NsqException;
use Nsq\Protocol\Error;
use Nsq\Protocol\Message;
use Nsq\Protocol\Response;
use function Amp\asyncCall;
use function Amp\call;
final class Consumer extends Connection final class Consumer extends Connection
{ {
private int $rdy = 0; private int $rdy = 0;
/** /**
* Subscribe to a topic/channel. * @return Promise<void>
*/ */
public function sub(string $topic, string $channel): void public function listen(
string $topic,
string $channel,
callable $onMessage,
): Promise {
return call(function () use ($topic, $channel, $onMessage): \Generator {
yield $this->command('SUB', [$topic, $channel]);
yield $this->checkIsOK();
asyncCall(function () use ($onMessage): \Generator {
yield $this->rdy(2500);
while ($message = yield $this->readMessage()) {
$command = yield $onMessage($message);
if (true === $command) {
break;
}
if ($this->rdy < 1000) {
yield $this->rdy(2500);
}
}
return new Success();
});
});
}
/**
* @return Promise<Message>
*/
public function readMessage(): Promise
{ {
$this->command('SUB', [$topic, $channel])->response()->okOrFail(); return call(function (): \Generator {
$frame = yield $this->readFrame();
if ($frame instanceof Message) {
return $frame;
}
if ($frame instanceof Error) {
if ($frame->type->terminateConnection) {
yield $this->close();
}
throw new NsqError($frame);
}
throw new NsqException('Unreachable statement.');
});
} }
/** /**
* Update RDY state (indicate you are ready to receive N messages). * Update RDY state (indicate you are ready to receive N messages).
*
* @return Promise<void>
*/ */
public function rdy(int $count): void public function rdy(int $count): Promise
{ {
if ($this->rdy === $count) { if ($this->rdy === $count) {
return; return call(static function (): void {});
} }
$this->command('RDY', (string) $count);
$this->rdy = $count; $this->rdy = $count;
return $this->command('RDY', (string) $count);
} }
/** /**
* Finish a message (indicate successful processing). * Finish a message (indicate successful processing).
*
* @return Promise<void>
*
* @internal
*/ */
public function fin(string $id): void public function fin(string $id): Promise
{ {
$this->command('FIN', $id); $promise = $this->command('FIN', $id);
$promise->onResolve(function (): void {
--$this->rdy;
});
--$this->rdy; return $promise;
} }
/** /**
* Re-queue a message (indicate failure to process) * Re-queue a message (indicate failure to process) The re-queued message is placed at the tail of the queue,
* The re-queued message is placed at the tail of the queue, equivalent to having just published it, * equivalent to having just published it, but for various implementation specific reasons that behavior should not
* but for various implementation specific reasons that behavior should not be explicitly relied upon and may change in the future. * be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
* Similarly, a message that is in-flight and times out behaves identically to an explicit REQ. * behaves identically to an explicit REQ.
*
* @return Promise<void>
*
* @internal
*/ */
public function req(string $id, int $timeout): void public function req(string $id, int $timeout): Promise
{ {
$this->command('REQ', [$id, $timeout]); $promise = $this->command('REQ', [$id, $timeout]);
$promise->onResolve(function (): void {
--$this->rdy;
});
--$this->rdy; return $promise;
} }
/** /**
* Reset the timeout for an in-flight message. * Reset the timeout for an in-flight message.
*
* @return Promise<void>
*
* @internal
*/ */
public function touch(string $id): void public function touch(string $id): Promise
{ {
$this->command('TOUCH', $id); return $this->command('TOUCH', $id);
} }
} }

View File

@@ -4,8 +4,10 @@ declare(strict_types=1);
namespace Nsq\Exception; namespace Nsq\Exception;
use RuntimeException; final class AuthenticationRequired extends NsqException
final class AuthenticationRequired extends RuntimeException implements NsqException
{ {
public function __construct()
{
parent::__construct('NSQ requires authorization, set ClientConfig::$authSecret before connecting');
}
} }

View File

@@ -0,0 +1,15 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Nsq\Protocol\Response;
final class BadResponse extends NsqException
{
public function __construct(Response $response)
{
parent::__construct($response->msg);
}
}

View File

@@ -4,15 +4,12 @@ declare(strict_types=1);
namespace Nsq\Exception; namespace Nsq\Exception;
use RuntimeException; final class ConnectionFail extends NsqException
use Throwable;
final class ConnectionFail extends RuntimeException implements NsqException
{ {
/** /**
* @codeCoverageIgnore * @codeCoverageIgnore
*/ */
public static function fromThrowable(Throwable $throwable): self public static function fromThrowable(\Throwable $throwable): self
{ {
return new self($throwable->getMessage(), (int) $throwable->getCode(), $throwable); return new self($throwable->getMessage(), (int) $throwable->getCode(), $throwable);
} }

View File

@@ -4,10 +4,9 @@ declare(strict_types=1);
namespace Nsq\Exception; namespace Nsq\Exception;
use Nsq\Message; use Nsq\Protocol\Message;
use RuntimeException;
final class MessageAlreadyFinished extends RuntimeException implements NsqException final class MessageAlreadyFinished extends NsqException
{ {
public static function finish(Message $message): self public static function finish(Message $message): self
{ {

View File

@@ -0,0 +1,9 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
final class NotConnected extends NsqException
{
}

View File

@@ -4,8 +4,12 @@ declare(strict_types=1);
namespace Nsq\Exception; namespace Nsq\Exception;
use RuntimeException; use Nsq\Protocol\Error;
final class NsqError extends RuntimeException implements NsqException final class NsqError extends NsqException
{ {
public function __construct(Error $error)
{
parent::__construct($error->rawData);
}
} }

View File

@@ -4,8 +4,6 @@ declare(strict_types=1);
namespace Nsq\Exception; namespace Nsq\Exception;
use Throwable; class NsqException extends \RuntimeException
interface NsqException extends Throwable
{ {
} }

View File

@@ -1,18 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use RuntimeException;
final class UnexpectedResponse extends RuntimeException implements NsqException
{
/**
* @codeCoverageIgnore
*/
public static function null(): self
{
return new self('Response was expected, but null received.');
}
}

View File

@@ -4,41 +4,56 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use function array_map; use Amp\Promise;
use function implode; use PHPinnacle\Buffer\ByteBuffer;
use function pack; use function Amp\call;
/**
* @psalm-suppress PropertyNotSetInConstructor
*/
final class Producer extends Connection final class Producer extends Connection
{ {
/** /**
* @psalm-suppress PossiblyFalseOperand * @return Promise<void>
*/ */
public function pub(string $topic, string $body): void public function pub(string $topic, string $body): Promise
{ {
$this->command('PUB', $topic, $body)->response()->okOrFail(); return call(function () use ($topic, $body): \Generator {
yield $this->command('PUB', $topic, $body);
yield $this->checkIsOK();
});
} }
/** /**
* @psalm-param array<mixed, mixed> $bodies * @psalm-param array<int, mixed> $bodies
* *
* @psalm-suppress PossiblyFalseOperand * @return Promise<void>
*/ */
public function mpub(string $topic, array $bodies): void public function mpub(string $topic, array $bodies): Promise
{ {
$num = pack('N', \count($bodies)); return call(function () use ($topic, $bodies): \Generator {
$buffer = new ByteBuffer();
$mb = implode('', array_map(static function ($body): string { $buffer->appendUint32(\count($bodies));
return pack('N', \strlen($body)).$body;
}, $bodies));
$this->command('MPUB', $topic, $num.$mb)->response()->okOrFail(); foreach ($bodies as $body) {
$buffer->appendUint32(\strlen($body));
$buffer->append($body);
}
yield $this->command('MPUB', $topic, $buffer->flush());
yield $this->checkIsOK();
});
} }
/** /**
* @psalm-suppress PossiblyFalseOperand * @return Promise<void>
*/ */
public function dpub(string $topic, string $body, int $delay): void public function dpub(string $topic, string $body, int $delay): Promise
{ {
$this->command('DPUB', [$topic, $delay], $body)->response()->okOrFail(); return call(function () use ($topic, $body, $delay): \Generator {
yield $this->command('DPUB', [$topic, $delay], $body);
yield $this->checkIsOK();
});
} }
} }

22
src/Protocol/Error.php Normal file
View File

@@ -0,0 +1,22 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
use Nsq\Bytes;
/**
* @psalm-immutable
*/
final class Error extends Frame
{
public ErrorType $type;
public function __construct(public string $rawData)
{
parent::__construct(\strlen($this->rawData) + Bytes::BYTES_TYPE);
$this->type = new ErrorType(explode(' ', $this->rawData)[0]);
}
}

100
src/Protocol/ErrorType.php Normal file
View File

@@ -0,0 +1,100 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
/**
* @psalm-immutable
*/
final class ErrorType
{
/**
* A generic error type without any more hints.
*/
public const E_INVALID = true;
/**
* This error might be returned during multiple occasions. It can be returned for IDENTIFY, AUTH or MPUB messages.
* It is caused for payloads that do not meet certain requirements. For IDENTIFY and AUTH, this is usually a bug in
* the library and should be reported. For MPUB, this error can occur if the payload is larger than the maximum
* payload size specified in the nsqd config.
*/
public const E_BAD_BODY = true;
/**
* This error indicates that the topic sent to nsqd is not valid.
*/
public const E_BAD_TOPIC = true;
/**
* This error indicates that the channel sent to nsqd is not valid.
*/
public const E_BAD_CHANNEL = true;
/**
* This error is returned by nsqd if the message in the payload of a publishing operation does not meet the
* requirements of the server. This might be caused by too big payloads being sent to nsqd. You should consider
* adding a limit to the payload size or increasing it in the nsqd config.
*/
public const E_BAD_MESSAGE = true;
/**
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
* temporary error and can be caused by topics being added, deleted or cleared.
*/
public const E_PUB_FAILED = true;
/**
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
* temporary error and can be caused by topics being added, deleted or cleared.
*/
public const E_MPUB_FAILED = true;
/**
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
* temporary error and can be caused by topics being added, deleted or cleared.
*/
public const E_DPUB_FAILED = true;
/**
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
* happen in particular for messages that are no longer queued on the server side.
*/
public const E_FIN_FAILED = false;
/**
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
* happen in particular for messages that are no longer queued on the server side.
*/
public const E_REQ_FAILED = false;
/**
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
* happen in particular for messages that are no longer queued on the server side.
*/
public const E_TOUCH_FAILED = false;
/**
* This error indicates that the authorization of the client failed on the server side. This might be related
* to connection issues to the authorization server. Depending on the authorization server implementation, this
* might also indicate that the given auth secret in the [ClientConfig] is not known on the server or the server
* denied authentication with the current connection properties (i.e. TLS status and IP).
*/
public const E_AUTH_FAILED = true;
/**
* This error happens if something breaks on the nsqd side while performing the authorization. This might be
* caused by bugs in nsqd, the authorization server or network issues.
*/
public const E_AUTH_ERROR = true;
/**
* This error is sent by nsqd if the client attempts an authentication, but the server does not support it. This
* should never happen using this library as authorization requests are only sent if the server supports it.
* It is safe to expect that this error is never thrown.
*/
public const E_AUTH_DISABLED = true;
/**
* This error indicates that the client related to the authorization secret set in the [ClientConfig] is not
* allowed to do the operation it tried to do.
*/
public const E_UNAUTHORIZED = true;
/**
* A boolean indicating whether or not an [Error] with this type terminates the connection or not.
*/
public bool $terminateConnection;
public function __construct(public string $type)
{
$this->terminateConnection = \constant('self::'.$this->type) ?? self::E_INVALID;
}
}

16
src/Protocol/Frame.php Normal file
View File

@@ -0,0 +1,16 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
abstract class Frame
{
public function __construct(
/**
* @psalm-readonly
*/
public int $length,
) {
}
}

View File

@@ -2,11 +2,15 @@
declare(strict_types=1); declare(strict_types=1);
namespace Nsq; namespace Nsq\Protocol;
use Amp\Failure;
use Amp\Promise;
use Nsq\Bytes;
use Nsq\Consumer;
use Nsq\Exception\MessageAlreadyFinished; use Nsq\Exception\MessageAlreadyFinished;
final class Message final class Message extends Frame
{ {
/** /**
* @psalm-readonly * @psalm-readonly
@@ -34,6 +38,14 @@ final class Message
public function __construct(int $timestamp, int $attempts, string $id, string $body, Consumer $consumer) public function __construct(int $timestamp, int $attempts, string $id, string $body, Consumer $consumer)
{ {
parent::__construct(
Bytes::BYTES_TYPE
+ Bytes::BYTES_TIMESTAMP
+ Bytes::BYTES_ATTEMPTS
+ Bytes::BYTES_ID
+ \strlen($body)
);
$this->timestamp = $timestamp; $this->timestamp = $timestamp;
$this->attempts = $attempts; $this->attempts = $attempts;
$this->id = $id; $this->id = $id;
@@ -47,32 +59,43 @@ final class Message
return $this->finished; return $this->finished;
} }
public function finish(): void /**
* @return Promise<void>
*/
public function finish(): Promise
{ {
if ($this->finished) { if ($this->finished) {
throw MessageAlreadyFinished::finish($this); return new Failure(MessageAlreadyFinished::finish($this));
} }
$this->consumer->fin($this->id);
$this->finished = true; $this->finished = true;
return $this->consumer->fin($this->id);
} }
public function requeue(int $timeout): void /**
* @return Promise<void>
*/
public function requeue(int $timeout): Promise
{ {
if ($this->finished) { if ($this->finished) {
throw MessageAlreadyFinished::requeue($this); return new Failure(MessageAlreadyFinished::requeue($this));
} }
$this->consumer->req($this->id, $timeout);
$this->finished = true; $this->finished = true;
return $this->consumer->req($this->id, $timeout);
} }
public function touch(): void /**
* @return Promise<void>
*/
public function touch(): Promise
{ {
if ($this->finished) { if ($this->finished) {
throw MessageAlreadyFinished::touch($this); return new Failure(MessageAlreadyFinished::touch($this));
} }
$this->consumer->touch($this->id); return $this->consumer->touch($this->id);
} }
} }

39
src/Protocol/Response.php Normal file
View File

@@ -0,0 +1,39 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
use Nsq\Bytes;
/**
* @psalm-immutable
*/
final class Response extends Frame
{
public const OK = 'OK';
public const HEARTBEAT = '_heartbeat_';
public function __construct(public string $msg)
{
parent::__construct(\strlen($this->msg) + Bytes::BYTES_TYPE);
}
public function isOk(): bool
{
return self::OK === $this->msg;
}
public function isHeartBeat(): bool
{
return self::HEARTBEAT === $this->msg;
}
/**
* @return array<mixed, mixed>
*/
public function toArray(): array
{
return json_decode($this->msg, true, flags: JSON_THROW_ON_ERROR);
}
}

View File

@@ -1,63 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Reconnect;
use Nsq\Exception\ConnectionFail;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Throwable;
final class ExponentialStrategy implements ReconnectStrategy
{
use LoggerAwareTrait;
private int $delay;
private int $nextTryAfter;
private int $attempt = 0;
private TimeProvider $timeProvider;
public function __construct(
private int $minDelay = 8,
private int $maxDelay = 32,
TimeProvider $timeProvider = null,
LoggerInterface $logger = null,
) {
$this->delay = 0;
$this->timeProvider = $timeProvider ?? new RealTimeProvider();
$this->nextTryAfter = $this->timeProvider->time();
$this->logger = $logger ?? new NullLogger();
}
/**
* {@inheritDoc}
*/
public function connect(callable $callable): void
{
$currentTime = $this->timeProvider->time();
if ($currentTime < $this->nextTryAfter) {
throw new ConnectionFail('Time to reconnect has not yet come');
}
try {
$callable();
} catch (Throwable $e) {
$nextDelay = 0 === $this->delay ? $this->minDelay : $this->delay * 2;
$this->delay = $nextDelay > $this->maxDelay ? $this->maxDelay : $nextDelay;
$this->nextTryAfter = $currentTime + $this->delay;
$this->logger->warning('Reconnect #{attempt} after {delay}s', ['attempt' => ++$this->attempt, 'delay' => $this->delay]);
throw $e;
}
$this->delay = 0;
$this->attempt = 0;
}
}

View File

@@ -1,13 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Reconnect;
final class RealTimeProvider implements TimeProvider
{
public function time(): int
{
return time();
}
}

View File

@@ -1,15 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Reconnect;
use Nsq\Exception\ConnectionFail;
interface ReconnectStrategy
{
/**
* @throws ConnectionFail
*/
public function connect(callable $callable): void;
}

View File

@@ -1,10 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Reconnect;
interface TimeProvider
{
public function time(): int;
}

View File

@@ -1,87 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Nsq\Exception\NsqError;
use Nsq\Exception\UnexpectedResponse;
use PHPinnacle\Buffer\ByteBuffer;
use function json_decode;
use function sprintf;
use const JSON_THROW_ON_ERROR;
final class Response
{
private const OK = 'OK';
private const HEARTBEAT = '_heartbeat_';
private const TYPE_RESPONSE = 0;
private const TYPE_ERROR = 1;
private const TYPE_MESSAGE = 2;
private int $type;
private ByteBuffer $buffer;
public function __construct(ByteBuffer $buffer)
{
$this->type = $buffer->consumeUint32();
$this->buffer = $buffer;
}
public function okOrFail(): void
{
if (self::TYPE_ERROR === $this->type) {
throw new NsqError($this->buffer->bytes());
}
if (self::TYPE_RESPONSE !== $this->type) {
// @codeCoverageIgnoreStart
throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type));
// @codeCoverageIgnoreEnd
}
if (self::OK !== $this->buffer->bytes()) {
// @codeCoverageIgnoreStart
throw new UnexpectedResponse(sprintf('OK response expected, but "%s" received.', $this->buffer->bytes()));
// @codeCoverageIgnoreEnd
}
}
public function isHeartBeat(): bool
{
return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes();
}
/**
* @phpstan-ignore-next-line
*/
public function toArray(): array
{
if (self::TYPE_RESPONSE !== $this->type) {
// @codeCoverageIgnoreStart
throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type));
// @codeCoverageIgnoreEnd
}
return json_decode($this->buffer->bytes(), true, flags: JSON_THROW_ON_ERROR);
}
public function toMessage(Consumer $reader): Message
{
if (self::TYPE_MESSAGE !== $this->type) {
// @codeCoverageIgnoreStart
throw new UnexpectedResponse(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type));
// @codeCoverageIgnoreEnd
}
$buffer = new ByteBuffer($this->buffer->bytes());
$timestamp = $buffer->consumeInt64();
$attempts = $buffer->consumeUint16();
$id = $buffer->consume(Bytes::BYTES_ID);
$body = $buffer->flush();
return new Message($timestamp, $attempts, $id, $body, $reader);
}
}

View File

@@ -0,0 +1,57 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\ByteStream\InputStream;
use Amp\Promise;
use Nsq\Bytes;
use Nsq\Exception\NotConnected;
use PHPinnacle\Buffer\ByteBuffer;
use function Amp\call;
final class NsqInputStream implements InputStream
{
private ByteBuffer $buffer;
public function __construct(
private InputStream $inputStream,
) {
$this->buffer = new ByteBuffer();
}
/**
* {@inheritDoc}
*/
public function read(): Promise
{
return call(function (): \Generator {
$buffer = $this->buffer;
while ($buffer->size() < Bytes::BYTES_SIZE) {
$bytes = yield $this->inputStream->read();
if (null === $bytes) {
throw new NotConnected();
}
$buffer->append($bytes);
}
$size = $buffer->consumeUint32();
while ($buffer->size() < $size) {
$bytes = yield $this->inputStream->read();
if (null === $bytes) {
throw new NotConnected();
}
$buffer->append($bytes);
}
return $buffer->consume($size);
});
}
}

42
src/Stream/NullStream.php Normal file
View File

@@ -0,0 +1,42 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\Failure;
use Amp\Promise;
use Nsq\Exception\NotConnected;
final class NullStream implements InputStream, OutputStream
{
/**
* {@inheritDoc}
*/
public function read(): Promise
{
return new Failure(new NotConnected());
}
/**
* {@inheritDoc}
*
* @return Promise<void>
*/
public function write(string $data): Promise
{
return new Failure(new NotConnected());
}
/**
* {@inheritDoc}
*
* @return Promise<void>
*/
public function end(string $finalData = ''): Promise
{
return new Failure(new NotConnected());
}
}

View File

@@ -0,0 +1,106 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\ByteStream\InputStream;
use Amp\Promise;
use Nsq\Exception\NotConnected;
use PHPinnacle\Buffer\ByteBuffer;
use Psr\Log\LoggerInterface;
use function Amp\call;
final class SnappyInputStream implements InputStream
{
private ByteBuffer $buffer;
public function __construct(
private InputStream $inputStream,
private LoggerInterface $logger,
) {
if (!\function_exists('snappy_uncompress')) {
throw new \LogicException('Snappy extension not installed.');
}
$this->buffer = new ByteBuffer();
}
/**
* {@inheritDoc}
*/
public function read(): Promise
{
return call(function (): \Generator {
$buffer = $this->buffer;
while ($buffer->size() < 4) {
$bytes = yield $this->inputStream->read();
if (null === $bytes) {
throw new NotConnected();
}
$buffer->append($bytes);
}
/** @phpstan-ignore-next-line */
$chunkType = unpack('V', $buffer->consume(4))[1];
$size = $chunkType >> 8;
$chunkType &= 0xff;
$this->logger->debug('Snappy receive chunk [{chunk}], size [{size}]', [
'chunk' => $chunkType,
'size' => $size,
]);
while ($buffer->size() < $size) {
$bytes = yield $this->inputStream->read();
if (null === $bytes) {
throw new NotConnected();
}
$buffer->append($bytes);
}
switch ($chunkType) {
case 0xff:
$this->logger->debug('Snappy identifier chunk');
$buffer->discard(6); // discard identifier body
break;
case 0x00: // 'compressed',
$this->logger->debug('Snappy compressed chunk');
$data = $buffer
->discard(4) // discard checksum
->consume($size)
;
$this->logger->debug('Snappy compressed data [{data}]', ['data' => $data]);
return snappy_uncompress($data);
case 0x01: // 'uncompressed',
$this->logger->debug('Snappy uncompressed chunk');
$data = $buffer
->discard(4) // discard checksum
->consume($size)
;
$this->logger->debug('Snappy uncompressed data [{data}]', ['data' => $data]);
return $data;
case 0xfe:// 'padding',
$this->logger->debug('Snappy padding chunk');
$buffer->discard($size); // TODO ?
}
return $this->read();
});
}
}

View File

@@ -0,0 +1,74 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\ByteStream\OutputStream;
use Amp\Promise;
use PHPinnacle\Buffer\ByteBuffer;
final class SnappyOutputStream implements OutputStream
{
private ByteBuffer $buffer;
public function __construct(
private OutputStream $outputStream,
) {
if (!\function_exists('snappy_compress')) {
throw new \LogicException('Snappy extension not installed.');
}
$this->buffer = new ByteBuffer();
}
/**
* {@inheritDoc}
*
* @return Promise<void>
*/
public function write(string $data): Promise
{
$identifierFrame = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
$compressedFrame = 0x00;
$uncompressedFrame = 0x01; // 11
$maxChunkLength = 65536;
$buffer = $this->buffer;
foreach ($identifierFrame as $bite) {
$buffer->appendUint8($bite);
}
foreach (str_split($data, $maxChunkLength) as $chunk) {
$compressedChunk = snappy_compress($chunk);
[$chunk, $chunkType] = \strlen($compressedChunk) <= 0.875 * \strlen($data)
? [$compressedChunk, $compressedFrame]
: [$data, $uncompressedFrame];
/** @var string $checksum */
$checksum = hash('crc32c', $data, true);
/** @phpstan-ignore-next-line */
$checksum = unpack('N', $checksum)[1];
$maskedChecksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff;
$size = (\strlen($chunk) + 4) << 8;
$buffer->append(pack('V', $chunkType + $size));
$buffer->append(pack('V', $maskedChecksum));
$buffer->append($chunk);
}
return $this->outputStream->write($buffer->flush());
}
/**
* {@inheritDoc}
*
* @return Promise<void>
*/
public function end(string $finalData = ''): Promise
{
return $this->outputStream->end($finalData);
}
}

View File

@@ -1,39 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Generator;
final class Subscriber
{
public const STOP = 0;
private Consumer $reader;
public function __construct(Consumer $reader)
{
$this->reader = $reader;
}
/**
* @psalm-return Generator<int, Message|float|null, int|float|null, void>
*/
public function subscribe(string $topic, string $channel): Generator
{
$this->reader->sub($topic, $channel);
while (true) {
$this->reader->rdy(1);
$command = yield $this->reader->receive()?->toMessage($this->reader);
if (self::STOP === $command) {
break;
}
}
$this->reader->disconnect();
}
}

View File

@@ -1,90 +0,0 @@
<?php
declare(strict_types=1);
use Nsq\Exception\ConnectionFail;
use Nsq\Reconnect\ExponentialStrategy;
use Nsq\Reconnect\TimeProvider;
use PHPUnit\Framework\TestCase;
final class ExponentialStrategyTest extends TestCase
{
public function testTimeNotYetCome(): void
{
$timeProvider = new FakeTimeProvider();
$strategy = new ExponentialStrategy(
minDelay: 8,
maxDelay: 32,
timeProvider: $timeProvider,
);
$successConnect = static function (int $time = null) use ($strategy, $timeProvider): void {
$timeProvider($time);
$strategy->connect(static function (): void {
});
};
$failConnect = static function (int $time = null) use ($strategy, $timeProvider): void {
$timeProvider($time);
try {
$strategy->connect(function (): void {
throw new ConnectionFail('Time come but failed');
});
} catch (ConnectionFail $e) {
self::assertSame('Time come but failed', $e->getMessage());
return;
}
self::fail('Expecting exception with message "Time come but failed"');
};
$timeNotCome = static function (int $time = null) use ($strategy, $timeProvider): void {
$timeProvider($time);
try {
$strategy->connect(function (): void {
throw new ConnectionFail('');
});
} catch (ConnectionFail $e) {
self::assertSame('Time to reconnect has not yet come', $e->getMessage());
return;
}
self::fail('Was expecting exception with message "Time to reconnect has not yet come"');
};
$failConnect(0);
$timeNotCome(7);
$failConnect(8);
$timeNotCome(22);
$timeNotCome(13);
$failConnect(24);
$successConnect(56);
$failConnect();
$timeNotCome();
$timeNotCome(63);
$failConnect(64);
$this->expectException(ConnectionFail::class);
$this->expectExceptionMessage('Time to reconnect has not yet come');
$successConnect();
}
}
class FakeTimeProvider implements TimeProvider
{
public int $time = 0;
public function time(): int
{
return $this->time;
}
public function __invoke(int $time = null): void
{
$this->time = $time ?? $this->time;
}
}

View File

@@ -2,10 +2,12 @@
declare(strict_types=1); declare(strict_types=1);
use Amp\Success;
use Nsq\Consumer; use Nsq\Consumer;
use Nsq\Exception\MessageAlreadyFinished; use Nsq\Exception\MessageAlreadyFinished;
use Nsq\Message; use Nsq\Protocol\Message;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use function Amp\Promise\wait;
final class MessageTest extends TestCase final class MessageTest extends TestCase
{ {
@@ -16,14 +18,14 @@ final class MessageTest extends TestCase
{ {
self::assertFalse($message->isFinished()); self::assertFalse($message->isFinished());
$message->finish(); wait($message->finish());
self::assertTrue($message->isFinished()); self::assertTrue($message->isFinished());
$this->expectException(MessageAlreadyFinished::class); $this->expectException(MessageAlreadyFinished::class);
$this->expectExceptionMessage('Can\'t finish message as it already finished.'); $this->expectExceptionMessage('Can\'t finish message as it already finished.');
$message->finish(); wait($message->finish());
} }
/** /**
@@ -33,14 +35,14 @@ final class MessageTest extends TestCase
{ {
self::assertFalse($message->isFinished()); self::assertFalse($message->isFinished());
$message->requeue(1); wait($message->requeue(1));
self::assertTrue($message->isFinished()); self::assertTrue($message->isFinished());
$this->expectException(MessageAlreadyFinished::class); $this->expectException(MessageAlreadyFinished::class);
$this->expectExceptionMessage('Can\'t requeue message as it already finished.'); $this->expectExceptionMessage('Can\'t requeue message as it already finished.');
$message->requeue(5); wait($message->requeue(5));
} }
/** /**
@@ -50,12 +52,12 @@ final class MessageTest extends TestCase
{ {
self::assertFalse($message->isFinished()); self::assertFalse($message->isFinished());
$message->finish(); wait($message->finish());
$this->expectException(MessageAlreadyFinished::class); $this->expectException(MessageAlreadyFinished::class);
$this->expectExceptionMessage('Can\'t touch message as it already finished.'); $this->expectExceptionMessage('Can\'t touch message as it already finished.');
$message->touch(); wait($message->touch());
} }
/** /**
@@ -63,6 +65,11 @@ final class MessageTest extends TestCase
*/ */
public function messages(): Generator public function messages(): Generator
{ {
yield [new Message(0, 0, 'id', 'body', $this->createStub(Consumer::class))]; $consumer = $this->createMock(Consumer::class);
$consumer->method('fin')->willReturn(new Success());
$consumer->method('touch')->willReturn(new Success());
$consumer->method('req')->willReturn(new Success());
yield [new Message(0, 0, 'id', 'body', $consumer)];
} }
} }

View File

@@ -3,90 +3,37 @@
declare(strict_types=1); declare(strict_types=1);
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Consumer;
use Nsq\Message;
use Nsq\Producer;
use Nsq\Subscriber;
use Nyholm\NSA;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
final class NsqTest extends TestCase final class NsqTest extends TestCase
{ {
public function test(): void /**
* @dataProvider configs
*/
public function test(ClientConfig $clientConfig): void
{ {
$producer = new Producer('tcp://localhost:4150'); self::markTestSkipped('');
$producer->pub(__FUNCTION__, __FUNCTION__); }
$consumer = new Consumer( /**
address: 'tcp://localhost:4150', * @return Generator<string, array<int, ClientConfig>>
clientConfig: new ClientConfig( */
heartbeatInterval: 1000, public function configs(): Generator
{
yield 'default' => [
new ClientConfig(
heartbeatInterval: 3000,
snappy: false,
readTimeout: 1, readTimeout: 1,
), ),
); ];
$subscriber = new Subscriber($consumer);
$generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__);
/** @var null|Message $message */ yield 'snappy' => [
$message = $generator->current(); new ClientConfig(
heartbeatInterval: 3000,
self::assertInstanceOf(Message::class, $message); snappy: true,
self::assertSame(__FUNCTION__, $message->body); readTimeout: 1,
$message->finish(); ),
];
$generator->next();
self::assertNull($generator->current());
$producer->mpub(__FUNCTION__, [
'First mpub message.',
'Second mpub message.',
]);
$generator->next();
/** @var null|Message $message */
$message = $generator->current();
self::assertInstanceOf(Message::class, $message);
self::assertSame('First mpub message.', $message->body);
$message->finish();
$generator->next();
/** @var null|Message $message */
$message = $generator->current();
self::assertInstanceOf(Message::class, $message);
self::assertSame('Second mpub message.', $message->body);
$message->requeue(0);
$generator->next();
/** @var null|Message $message */
$message = $generator->current();
self::assertInstanceOf(Message::class, $message);
self::assertSame('Second mpub message.', $message->body);
$message->finish();
$producer->dpub(__FUNCTION__, 'Deferred message.', 2000);
$generator->next();
/** @var null|Message $message */
$message = $generator->current();
self::assertNull($message);
NSA::setProperty(
NSA::getProperty($consumer, 'clientConfig'),
'readTimeout',
10,
);
$generator->next();
/** @var null|Message $message */
$message = $generator->current();
self::assertInstanceOf(Message::class, $message);
self::assertSame('Deferred message.', $message->body);
$message->touch();
$message->finish();
self::assertTrue($consumer->isReady());
$generator->send(Subscriber::STOP);
self::assertFalse($consumer->isReady());
} }
} }

View File

@@ -5,6 +5,7 @@ declare(strict_types=1);
use Nsq\Exception\NsqError; use Nsq\Exception\NsqError;
use Nsq\Producer; use Nsq\Producer;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use function Amp\Promise\wait;
final class ProducerTest extends TestCase final class ProducerTest extends TestCase
{ {
@@ -17,7 +18,9 @@ final class ProducerTest extends TestCase
$this->expectExceptionMessage($exceptionMessage); $this->expectExceptionMessage($exceptionMessage);
$producer = new Producer('tcp://localhost:4150'); $producer = new Producer('tcp://localhost:4150');
$producer->pub($topic, $body);
wait($producer->connect());
wait($producer->pub($topic, $body));
} }
/** /**

View File

@@ -0,0 +1,30 @@
<?php
declare(strict_types=1);
namespace Protocol;
use Nsq\Protocol\ErrorType;
use PHPUnit\Framework\TestCase;
final class ErrorTypeTest extends TestCase
{
/**
* @dataProvider data
*/
public function testConstructor(string $type, bool $isConnectionTerminated): void
{
$errorType = new ErrorType($type);
self::assertSame($isConnectionTerminated, $errorType->terminateConnection);
}
/**
* @return \Generator<string, array<int, bool|string>>
*/
public function data(): \Generator
{
foreach ((new \ReflectionClass(ErrorType::class))->getConstants() as $constant => $isTerminated) {
yield $constant => [$constant, $isTerminated];
}
}
}