Compare commits
17 Commits
snappy
...
875cb8b542
Author | SHA1 | Date | |
---|---|---|---|
875cb8b542 | |||
b2b444d1ef | |||
f6ef057e40 | |||
db988a0914 | |||
3c0ec5574d | |||
b395e1e3ee | |||
801fdfe547 | |||
3d8f5be2d0 | |||
070b980003 | |||
a66f622cf6 | |||
72dca5c73b | |||
e3485416a5 | |||
930314f1ac | |||
e1cca2d3eb | |||
1a24efacfb | |||
a7b847146a | |||
f74b82a400 |
14
.php_cs.dist
14
.php_cs.dist
@@ -8,14 +8,16 @@ return (new PhpCsFixer\Config())
|
|||||||
'@PhpCsFixer:risky' => true,
|
'@PhpCsFixer:risky' => true,
|
||||||
'@PSR12' => true,
|
'@PSR12' => true,
|
||||||
'@PSR12:risky' => true,
|
'@PSR12:risky' => true,
|
||||||
'declare_strict_types' => true,
|
|
||||||
'php_unit_internal_class' => false,
|
|
||||||
'php_unit_test_class_requires_covers' => false,
|
|
||||||
'yoda_style' => true,
|
|
||||||
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
|
|
||||||
'blank_line_before_statement' => [
|
'blank_line_before_statement' => [
|
||||||
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try']
|
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try'],
|
||||||
],
|
],
|
||||||
|
'declare_strict_types' => true,
|
||||||
|
'global_namespace_import' => ['import_classes' => false, 'import_constants' => false, 'import_functions' => false],
|
||||||
|
'php_unit_internal_class' => false,
|
||||||
|
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
|
||||||
|
'php_unit_test_class_requires_covers' => false,
|
||||||
|
'phpdoc_to_comment' => false,
|
||||||
|
'yoda_style' => true,
|
||||||
])
|
])
|
||||||
->setFinder(
|
->setFinder(
|
||||||
PhpCsFixer\Finder::create()
|
PhpCsFixer\Finder::create()
|
||||||
|
26
README.md
26
README.md
@@ -34,14 +34,15 @@ Features
|
|||||||
- [ ] Discovery
|
- [ ] Discovery
|
||||||
- [ ] Backoff
|
- [ ] Backoff
|
||||||
- [ ] TLS
|
- [ ] TLS
|
||||||
- [ ] Snappy
|
- [ ] Deflate
|
||||||
|
- [X] Snappy
|
||||||
- [X] Sampling
|
- [X] Sampling
|
||||||
- [X] AUTH
|
- [X] AUTH
|
||||||
|
|
||||||
Usage
|
Usage
|
||||||
-----
|
-----
|
||||||
|
|
||||||
### Publish
|
### Producer
|
||||||
|
|
||||||
```php
|
```php
|
||||||
use Nsq\Producer;
|
use Nsq\Producer;
|
||||||
@@ -61,17 +62,21 @@ $producer->mpub('topic', [
|
|||||||
$producer->dpub('topic', 'Deferred message', delay: 5000);
|
$producer->dpub('topic', 'Deferred message', delay: 5000);
|
||||||
```
|
```
|
||||||
|
|
||||||
### Subscription
|
### Consumer
|
||||||
|
|
||||||
```php
|
```php
|
||||||
use Nsq\Consumer;
|
use Nsq\Consumer;
|
||||||
use Nsq\Message;
|
use Nsq\Protocol\Message;
|
||||||
use Nsq\Subscriber;
|
|
||||||
|
|
||||||
$consumer = new Consumer('tcp://nsqd:4150');
|
$consumer = new Consumer(
|
||||||
$subscriber = new Subscriber($consumer);
|
topic: 'topic',
|
||||||
|
channel: 'channel',
|
||||||
|
address: 'tcp://nsqd:4150',
|
||||||
|
);
|
||||||
|
|
||||||
|
// Simple blocking loop based on generator
|
||||||
|
$generator = $consumer->generator();
|
||||||
|
|
||||||
$generator = $subscriber->subscribe('topic', 'channel');
|
|
||||||
foreach ($generator as $message) {
|
foreach ($generator as $message) {
|
||||||
if ($message instanceof Message) {
|
if ($message instanceof Message) {
|
||||||
$payload = $message->body;
|
$payload = $message->body;
|
||||||
@@ -86,11 +91,8 @@ foreach ($generator as $message) {
|
|||||||
// In case of nothing received during timeout generator will return NULL
|
// In case of nothing received during timeout generator will return NULL
|
||||||
// Here we can do something between messages, like pcntl_signal_dispatch()
|
// Here we can do something between messages, like pcntl_signal_dispatch()
|
||||||
|
|
||||||
// We can also communicate with generator through send
|
|
||||||
// for example:
|
|
||||||
|
|
||||||
// Gracefully close connection (loop will be ended)
|
// Gracefully close connection (loop will be ended)
|
||||||
$generator->send(Subscriber::STOP);
|
$generator->send(0);
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@@ -3,6 +3,8 @@ version: '3.7'
|
|||||||
services:
|
services:
|
||||||
nsqd:
|
nsqd:
|
||||||
image: nsqio/nsq:v1.2.0
|
image: nsqio/nsq:v1.2.0
|
||||||
|
labels:
|
||||||
|
ru.grachevko.dhu: 'nsqd'
|
||||||
command: /nsqd
|
command: /nsqd
|
||||||
ports:
|
ports:
|
||||||
- 4150:4150
|
- 4150:4150
|
||||||
@@ -10,6 +12,8 @@ services:
|
|||||||
|
|
||||||
nsqadmin:
|
nsqadmin:
|
||||||
image: nsqio/nsq:v1.2.0
|
image: nsqio/nsq:v1.2.0
|
||||||
|
labels:
|
||||||
|
ru.grachevko.dhu: 'nsqadmin'
|
||||||
command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171
|
command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171
|
||||||
ports:
|
ports:
|
||||||
- 4171:4171
|
- 4171:4171
|
||||||
|
68
src/Buffer.php
Normal file
68
src/Buffer.php
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq;
|
||||||
|
|
||||||
|
use PHPinnacle\Buffer\ByteBuffer;
|
||||||
|
|
||||||
|
final class Buffer
|
||||||
|
{
|
||||||
|
private ByteBuffer $buffer;
|
||||||
|
|
||||||
|
public function __construct(string $initial = '')
|
||||||
|
{
|
||||||
|
$this->buffer = new ByteBuffer($initial);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function append(string $data): self
|
||||||
|
{
|
||||||
|
$this->buffer->append($data);
|
||||||
|
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function consumeSize(): int
|
||||||
|
{
|
||||||
|
/** @see Bytes::BYTES_SIZE */
|
||||||
|
return $this->buffer->consumeUint32();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function consumeType(): int
|
||||||
|
{
|
||||||
|
/** @see Bytes::BYTES_TYPE */
|
||||||
|
return $this->buffer->consumeUint32();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function consumeTimestamp(): int
|
||||||
|
{
|
||||||
|
/** @see Bytes::BYTES_TIMESTAMP */
|
||||||
|
return $this->buffer->consumeInt64();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function consumeAttempts(): int
|
||||||
|
{
|
||||||
|
/** @see Bytes::BYTES_ATTEMPTS */
|
||||||
|
return $this->buffer->consumeUint16();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function consumeId(): string
|
||||||
|
{
|
||||||
|
return $this->buffer->consume(Bytes::BYTES_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function size(): int
|
||||||
|
{
|
||||||
|
return $this->buffer->size();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function bytes(): string
|
||||||
|
{
|
||||||
|
return $this->buffer->bytes();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function flush(): string
|
||||||
|
{
|
||||||
|
return $this->buffer->flush();
|
||||||
|
}
|
||||||
|
}
|
@@ -5,9 +5,6 @@ declare(strict_types=1);
|
|||||||
namespace Nsq\Config;
|
namespace Nsq\Config;
|
||||||
|
|
||||||
use Composer\InstalledVersions;
|
use Composer\InstalledVersions;
|
||||||
use InvalidArgumentException;
|
|
||||||
use JsonSerializable;
|
|
||||||
use function gethostname;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is used for configuring the clients for nsq. Immutable properties must be set when creating the object and
|
* This class is used for configuring the clients for nsq. Immutable properties must be set when creating the object and
|
||||||
@@ -16,25 +13,34 @@ use function gethostname;
|
|||||||
*
|
*
|
||||||
* @psalm-immutable
|
* @psalm-immutable
|
||||||
*/
|
*/
|
||||||
final class ClientConfig implements JsonSerializable
|
final class ClientConfig implements \JsonSerializable
|
||||||
{
|
{
|
||||||
/** @psalm-suppress ImpureFunctionCall */
|
/**
|
||||||
|
* @psalm-suppress ImpureFunctionCall
|
||||||
|
*/
|
||||||
public function __construct(
|
public function __construct(
|
||||||
/*
|
/**
|
||||||
* The secret used for authorization, if the server requires it. This value will be ignored if the server
|
* The secret used for authorization, if the server requires it. This value will be ignored if the server
|
||||||
* does not require authorization.
|
* does not require authorization.
|
||||||
*/
|
*/
|
||||||
public ?string $authSecret = null,
|
public ?string $authSecret = null,
|
||||||
|
|
||||||
// The timeout for establishing a connection in seconds.
|
/**
|
||||||
|
* The timeout for establishing a connection in seconds.
|
||||||
|
*/
|
||||||
public int $connectTimeout = 10,
|
public int $connectTimeout = 10,
|
||||||
|
|
||||||
// An identifier used to disambiguate this client (i.e. something specific to the consumer)
|
/**
|
||||||
|
* An identifier used to disambiguate this client (i.e. something specific to the consumer).
|
||||||
|
*/
|
||||||
public string $clientId = '',
|
public string $clientId = '',
|
||||||
|
|
||||||
// Enable deflate compression for this connection. A client cannot enable both [snappy] and [deflate].
|
/**
|
||||||
|
* Enable deflate compression for this connection. A client cannot enable both [snappy] and [deflate].
|
||||||
|
*/
|
||||||
public bool $deflate = false,
|
public bool $deflate = false,
|
||||||
/*
|
|
||||||
|
/**
|
||||||
* Configure the deflate compression level for this connection.
|
* Configure the deflate compression level for this connection.
|
||||||
*
|
*
|
||||||
* Valid range: `1 <= deflate_level <= configured_max`
|
* Valid range: `1 <= deflate_level <= configured_max`
|
||||||
@@ -43,42 +49,54 @@ final class ClientConfig implements JsonSerializable
|
|||||||
*/
|
*/
|
||||||
public int $deflateLevel = 6,
|
public int $deflateLevel = 6,
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* Milliseconds between heartbeats.
|
* Milliseconds between heartbeats.
|
||||||
*
|
*
|
||||||
* Valid range: `1000 <= heartbeat_interval <= configured_max` (`-1` disables heartbeats)
|
* Valid range: `1000 <= heartbeat_interval <= configured_max` (`-1` disables heartbeats)
|
||||||
*/
|
*/
|
||||||
public int $heartbeatInterval = 30000,
|
public int $heartbeatInterval = 30000,
|
||||||
|
|
||||||
// The hostname where the client is deployed
|
/**
|
||||||
|
* The hostname where the client is deployed.
|
||||||
|
*/
|
||||||
public string $hostname = '',
|
public string $hostname = '',
|
||||||
|
|
||||||
// Configure the server-side message timeout in milliseconds for messages delivered to this client.
|
/**
|
||||||
|
* Configure the server-side message timeout in milliseconds for messages delivered to this client.
|
||||||
|
*/
|
||||||
public int $msgTimeout = 60000,
|
public int $msgTimeout = 60000,
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* The sample rate for incoming data to deliver a percentage of all messages received to this connection.
|
* The sample rate for incoming data to deliver a percentage of all messages received to this connection.
|
||||||
* This only applies to subscribing connections. The valid range is between 0 and 99, where 0 means that all
|
* This only applies to subscribing connections. The valid range is between 0 and 99, where 0 means that all
|
||||||
* data is sent (this is the default). 1 means that 1% of the data is sent.
|
* data is sent (this is the default). 1 means that 1% of the data is sent.
|
||||||
*/
|
*/
|
||||||
public int $sampleRate = 0,
|
public int $sampleRate = 0,
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
||||||
* it will send back a JSON payload of supported features and metadata.
|
* it will send back a JSON payload of supported features and metadata.
|
||||||
*/
|
*/
|
||||||
public bool $featureNegotiation = true,
|
public bool $featureNegotiation = true,
|
||||||
|
|
||||||
// Enable TLS for this connection
|
/**
|
||||||
|
* Enable TLS for this connection.
|
||||||
|
*/
|
||||||
public bool $tls = false,
|
public bool $tls = false,
|
||||||
|
|
||||||
// Enable snappy compression for this connection. A client cannot enable both [snappy] and [deflate].
|
/**
|
||||||
|
* Enable snappy compression for this connection. A client cannot enable both [snappy] and [deflate].
|
||||||
|
*/
|
||||||
public bool $snappy = false,
|
public bool $snappy = false,
|
||||||
|
|
||||||
// The read timeout for connection sockets and for awaiting responses from nsq.
|
/**
|
||||||
|
* The read timeout for connection sockets and for awaiting responses from nsq.
|
||||||
|
*/
|
||||||
public int $readTimeout = 5,
|
public int $readTimeout = 5,
|
||||||
|
|
||||||
// A string identifying the agent for this client in the spirit of HTTP.
|
/**
|
||||||
|
* A string identifying the agent for this client in the spirit of HTTP.
|
||||||
|
*/
|
||||||
public string $userAgent = '',
|
public string $userAgent = '',
|
||||||
) {
|
) {
|
||||||
$this->featureNegotiation = true; // Always enabled
|
$this->featureNegotiation = true; // Always enabled
|
||||||
@@ -92,7 +110,7 @@ final class ClientConfig implements JsonSerializable
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ($this->snappy && $this->deflate) {
|
if ($this->snappy && $this->deflate) {
|
||||||
throw new InvalidArgumentException('Client cannot enable both [snappy] and [deflate]');
|
throw new \InvalidArgumentException('Client cannot enable both [snappy] and [deflate]');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,4 +133,9 @@ final class ClientConfig implements JsonSerializable
|
|||||||
'user_agent' => $this->userAgent,
|
'user_agent' => $this->userAgent,
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function toString(): string
|
||||||
|
{
|
||||||
|
return json_encode($this, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -12,47 +12,72 @@ namespace Nsq\Config;
|
|||||||
final class ConnectionConfig
|
final class ConnectionConfig
|
||||||
{
|
{
|
||||||
public function __construct(
|
public function __construct(
|
||||||
// Whether or not authorization is required by nsqd.
|
/**
|
||||||
|
* Whether or not authorization is required by nsqd.
|
||||||
|
*/
|
||||||
public bool $authRequired,
|
public bool $authRequired,
|
||||||
|
|
||||||
// Whether deflate compression is enabled for this connection or not.
|
/**
|
||||||
|
* Whether deflate compression is enabled for this connection or not.
|
||||||
|
*/
|
||||||
public bool $deflate,
|
public bool $deflate,
|
||||||
// The deflate level. This value can be ignored if [deflate] is `false`.
|
|
||||||
|
/**
|
||||||
|
* The deflate level. This value can be ignored if [deflate] is `false`.
|
||||||
|
*/
|
||||||
public int $deflateLevel,
|
public int $deflateLevel,
|
||||||
|
|
||||||
// The maximum deflate level supported by the server.
|
/**
|
||||||
|
* The maximum deflate level supported by the server.
|
||||||
|
*/
|
||||||
public int $maxDeflateLevel,
|
public int $maxDeflateLevel,
|
||||||
|
|
||||||
// The maximum value for message timeout.
|
/**
|
||||||
|
* The maximum value for message timeout.
|
||||||
|
*/
|
||||||
public int $maxMsgTimeout,
|
public int $maxMsgTimeout,
|
||||||
/*
|
|
||||||
|
/**
|
||||||
* Each nsqd is configurable with a max-rdy-count. If the consumer sends a RDY count that is outside
|
* Each nsqd is configurable with a max-rdy-count. If the consumer sends a RDY count that is outside
|
||||||
* of the acceptable range its connection will be forcefully closed.
|
* of the acceptable range its connection will be forcefully closed.
|
||||||
*/
|
*/
|
||||||
public int $maxRdyCount,
|
public int $maxRdyCount,
|
||||||
|
|
||||||
// The effective message timeout.
|
/**
|
||||||
|
* The effective message timeout.
|
||||||
|
*/
|
||||||
public int $msgTimeout,
|
public int $msgTimeout,
|
||||||
|
|
||||||
// The size in bytes of the buffer nsqd will use when writing to this client.
|
/**
|
||||||
|
* The size in bytes of the buffer nsqd will use when writing to this client.
|
||||||
|
*/
|
||||||
public int $outputBufferSize,
|
public int $outputBufferSize,
|
||||||
// The timeout after which any data that nsqd has buffered will be flushed to this client.
|
|
||||||
|
/**
|
||||||
|
* The timeout after which any data that nsqd has buffered will be flushed to this client.
|
||||||
|
*/
|
||||||
public int $outputBufferTimeout,
|
public int $outputBufferTimeout,
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* The sample rate for incoming data to deliver a percentage of all messages received to this connection.
|
* The sample rate for incoming data to deliver a percentage of all messages received to this connection.
|
||||||
* This only applies to subscribing connections. The valid range is between 0 and 99, where 0 means that all
|
* This only applies to subscribing connections. The valid range is between 0 and 99, where 0 means that all
|
||||||
* data is sent (this is the default). 1 means that 1% of the data is sent.
|
* data is sent (this is the default). 1 means that 1% of the data is sent.
|
||||||
*/
|
*/
|
||||||
public int $sampleRate,
|
public int $sampleRate,
|
||||||
|
|
||||||
// Whether snappy compression is enabled for this connection or not.
|
/**
|
||||||
|
* Whether snappy compression is enabled for this connection or not.
|
||||||
|
*/
|
||||||
public bool $snappy,
|
public bool $snappy,
|
||||||
|
|
||||||
// Whether TLS is enabled for this connection or not.
|
/**
|
||||||
|
* Whether TLS is enabled for this connection or not.
|
||||||
|
*/
|
||||||
public bool $tls,
|
public bool $tls,
|
||||||
|
|
||||||
// The nsqd version.
|
/**
|
||||||
|
* The nsqd version.
|
||||||
|
*/
|
||||||
public string $version,
|
public string $version,
|
||||||
) {
|
) {
|
||||||
}
|
}
|
||||||
|
@@ -7,122 +7,114 @@ namespace Nsq;
|
|||||||
use Nsq\Config\ClientConfig;
|
use Nsq\Config\ClientConfig;
|
||||||
use Nsq\Config\ConnectionConfig;
|
use Nsq\Config\ConnectionConfig;
|
||||||
use Nsq\Exception\AuthenticationRequired;
|
use Nsq\Exception\AuthenticationRequired;
|
||||||
|
use Nsq\Exception\BadResponse;
|
||||||
use Nsq\Exception\ConnectionFail;
|
use Nsq\Exception\ConnectionFail;
|
||||||
use Nsq\Exception\UnexpectedResponse;
|
use Nsq\Exception\NotConnected;
|
||||||
use Nsq\Reconnect\ExponentialStrategy;
|
use Nsq\Exception\NsqError;
|
||||||
use Nsq\Reconnect\ReconnectStrategy;
|
use Nsq\Exception\NsqException;
|
||||||
use PHPinnacle\Buffer\ByteBuffer;
|
use Nsq\Protocol\Error;
|
||||||
|
use Nsq\Protocol\Frame;
|
||||||
|
use Nsq\Protocol\Message;
|
||||||
|
use Nsq\Protocol\Response;
|
||||||
|
use Nsq\Socket\DeflateSocket;
|
||||||
|
use Nsq\Socket\RawSocket;
|
||||||
|
use Nsq\Socket\SnappySocket;
|
||||||
use Psr\Log\LoggerAwareTrait;
|
use Psr\Log\LoggerAwareTrait;
|
||||||
use Psr\Log\LoggerInterface;
|
use Psr\Log\LoggerInterface;
|
||||||
use Psr\Log\NullLogger;
|
use Psr\Log\NullLogger;
|
||||||
use Socket\Raw\Exception;
|
|
||||||
use Socket\Raw\Factory;
|
|
||||||
use Socket\Raw\Socket;
|
|
||||||
use function addcslashes;
|
|
||||||
use function http_build_query;
|
|
||||||
use function implode;
|
|
||||||
use function json_encode;
|
|
||||||
use function pack;
|
|
||||||
use const JSON_FORCE_OBJECT;
|
|
||||||
use const JSON_THROW_ON_ERROR;
|
|
||||||
use const PHP_EOL;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @internal
|
* @internal
|
||||||
*
|
|
||||||
* @property ConnectionConfig $connectionConfig
|
|
||||||
*/
|
*/
|
||||||
abstract class Connection
|
abstract class Connection
|
||||||
{
|
{
|
||||||
use LoggerAwareTrait;
|
use LoggerAwareTrait;
|
||||||
|
|
||||||
private string $address;
|
protected ClientConfig $clientConfig;
|
||||||
|
|
||||||
private ?Socket $socket = null;
|
private NsqSocket $socket;
|
||||||
|
|
||||||
private ReconnectStrategy $reconnect;
|
private ConnectionConfig $connectionConfig;
|
||||||
|
|
||||||
private ClientConfig $clientConfig;
|
private bool $closed = false;
|
||||||
|
|
||||||
private ?ConnectionConfig $connectionConfig = null;
|
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
string $address,
|
private string $address,
|
||||||
ClientConfig $clientConfig = null,
|
ClientConfig $clientConfig = null,
|
||||||
ReconnectStrategy $reconnectStrategy = null,
|
|
||||||
LoggerInterface $logger = null,
|
LoggerInterface $logger = null,
|
||||||
) {
|
) {
|
||||||
$this->address = $address;
|
|
||||||
|
|
||||||
$this->logger = $logger ?? new NullLogger();
|
$this->logger = $logger ?? new NullLogger();
|
||||||
$this->reconnect = $reconnectStrategy ?? new ExponentialStrategy(logger: $this->logger);
|
|
||||||
$this->clientConfig = $clientConfig ?? new ClientConfig();
|
$this->clientConfig = $clientConfig ?? new ClientConfig();
|
||||||
}
|
|
||||||
|
|
||||||
public function connect(): void
|
$socket = new RawSocket($this->address, $this->logger);
|
||||||
{
|
$socket->write(' V2');
|
||||||
$this->reconnect->connect(function (): void {
|
|
||||||
try {
|
|
||||||
$this->socket = (new Factory())->createClient($this->address);
|
|
||||||
}
|
|
||||||
// @codeCoverageIgnoreStart
|
|
||||||
catch (Exception $e) {
|
|
||||||
$this->logger->error('Connecting to {address} failed.', ['address' => $this->address]);
|
|
||||||
|
|
||||||
throw ConnectionFail::fromThrowable($e);
|
$this->socket = new NsqSocket($socket);
|
||||||
}
|
|
||||||
// @codeCoverageIgnoreEnd
|
|
||||||
|
|
||||||
$this->socket->write(' V2');
|
$this->connectionConfig = ConnectionConfig::fromArray(
|
||||||
|
$this
|
||||||
|
->command('IDENTIFY', data: $this->clientConfig->toString())
|
||||||
|
->readResponse()
|
||||||
|
->toArray()
|
||||||
|
);
|
||||||
|
|
||||||
$body = json_encode($this->clientConfig, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
if ($this->connectionConfig->snappy) {
|
||||||
|
$this->socket = new NsqSocket(
|
||||||
|
new SnappySocket(
|
||||||
|
$socket,
|
||||||
|
$this->logger,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
$response = $this->command('IDENTIFY', data: $body)->response();
|
$this->checkIsOK();
|
||||||
|
}
|
||||||
|
|
||||||
$this->connectionConfig = ConnectionConfig::fromArray($response->toArray());
|
if ($this->connectionConfig->deflate) {
|
||||||
|
$this->socket = new NsqSocket(
|
||||||
|
new DeflateSocket(
|
||||||
|
$socket,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
if ($this->connectionConfig->snappy || $this->connectionConfig->deflate) {
|
$this->checkIsOK();
|
||||||
$this->response()->okOrFail();
|
}
|
||||||
|
|
||||||
|
if ($this->connectionConfig->authRequired) {
|
||||||
|
if (null === $this->clientConfig->authSecret) {
|
||||||
|
throw new AuthenticationRequired();
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($this->connectionConfig->authRequired) {
|
$authResponse = $this
|
||||||
if (null === $this->clientConfig->authSecret) {
|
->command('AUTH', data: $this->clientConfig->authSecret)
|
||||||
throw new AuthenticationRequired('NSQ requires authorization, set ClientConfig::$authSecret before connecting');
|
->readResponse()
|
||||||
}
|
->toArray()
|
||||||
|
;
|
||||||
|
|
||||||
$authResponse = $this->command('AUTH', data: $this->clientConfig->authSecret)->response()->toArray();
|
$this->logger->info('Authorization response: '.http_build_query($authResponse));
|
||||||
|
}
|
||||||
$this->logger->info('Authorization response: '.http_build_query($authResponse));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cleanly close your connection (no more messages are sent).
|
* Cleanly close your connection (no more messages are sent).
|
||||||
*/
|
*/
|
||||||
public function disconnect(): void
|
public function close(): void
|
||||||
{
|
{
|
||||||
if (null === $this->socket) {
|
if ($this->closed) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$this->socket->write('CLS'.PHP_EOL);
|
$this->command('CLS');
|
||||||
$this->socket->close();
|
$this->socket->close();
|
||||||
|
} catch (\Throwable $e) {
|
||||||
}
|
}
|
||||||
// @codeCoverageIgnoreStart
|
|
||||||
catch (Exception $e) {
|
|
||||||
$this->logger->debug($e->getMessage(), ['exception' => $e]);
|
|
||||||
}
|
|
||||||
// @codeCoverageIgnoreEnd
|
|
||||||
|
|
||||||
$this->socket = null;
|
$this->closed = true;
|
||||||
$this->connectionConfig = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function isReady(): bool
|
public function isClosed(): bool
|
||||||
{
|
{
|
||||||
return null !== $this->socket;
|
return $this->closed;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -130,114 +122,95 @@ abstract class Connection
|
|||||||
*/
|
*/
|
||||||
protected function command(string $command, array | string $params = [], string $data = null): self
|
protected function command(string $command, array | string $params = [], string $data = null): self
|
||||||
{
|
{
|
||||||
$socket = $this->socket();
|
if ($this->closed) {
|
||||||
|
throw new NotConnected('Connection closed.');
|
||||||
$buffer = [] === $params ? $command : implode(' ', [$command, ...((array) $params)]);
|
|
||||||
$buffer .= PHP_EOL;
|
|
||||||
|
|
||||||
if (null !== $data) {
|
|
||||||
$buffer .= pack('N', \strlen($data));
|
|
||||||
$buffer .= $data;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->logger->debug('Send buffer: '.addcslashes($buffer, PHP_EOL));
|
$command = [] === $params
|
||||||
|
? $command
|
||||||
|
: implode(' ', [$command, ...((array) $params)]);
|
||||||
|
|
||||||
try {
|
$this->logger->info('Command [{command}] with data [{data}]', ['command' => $command, 'data' => $data ?? 'null']);
|
||||||
$socket->write($buffer);
|
|
||||||
}
|
|
||||||
// @codeCoverageIgnoreStart
|
|
||||||
catch (Exception $e) {
|
|
||||||
$this->disconnect();
|
|
||||||
|
|
||||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
$this->socket->write($command, $data);
|
||||||
|
|
||||||
throw ConnectionFail::fromThrowable($e);
|
|
||||||
}
|
|
||||||
// @codeCoverageIgnoreEnd
|
|
||||||
return $this;
|
return $this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function hasMessage(float $timeout = 0): bool
|
public function hasMessage(float $timeout): bool
|
||||||
{
|
{
|
||||||
try {
|
if ($this->closed) {
|
||||||
return false !== $this->socket()->selectRead($timeout);
|
throw new NotConnected('Connection closed.');
|
||||||
}
|
|
||||||
// @codeCoverageIgnoreStart
|
|
||||||
catch (Exception $e) {
|
|
||||||
$this->disconnect();
|
|
||||||
|
|
||||||
throw ConnectionFail::fromThrowable($e);
|
|
||||||
}
|
|
||||||
// @codeCoverageIgnoreEnd
|
|
||||||
}
|
|
||||||
|
|
||||||
public function receive(float $timeout = null): ?Response
|
|
||||||
{
|
|
||||||
$socket = $this->socket();
|
|
||||||
|
|
||||||
$timeout ??= $this->clientConfig->readTimeout;
|
|
||||||
$deadline = microtime(true) + $timeout;
|
|
||||||
|
|
||||||
if (!$this->hasMessage($timeout)) {
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$size = $socket->read(Bytes::BYTES_SIZE);
|
return false !== $this->socket->wait($timeout);
|
||||||
|
} catch (ConnectionFail $e) {
|
||||||
|
$this->close();
|
||||||
|
|
||||||
if ('' === $size) {
|
throw $e;
|
||||||
$this->disconnect();
|
|
||||||
|
|
||||||
throw new ConnectionFail('Probably connection lost');
|
|
||||||
}
|
|
||||||
|
|
||||||
$buffer = new ByteBuffer();
|
|
||||||
|
|
||||||
/** @phpstan-ignore-next-line */
|
|
||||||
$size = unpack('N', $size)[1];
|
|
||||||
|
|
||||||
do {
|
|
||||||
$chunk = $socket->read($size);
|
|
||||||
|
|
||||||
$buffer->append($chunk);
|
|
||||||
|
|
||||||
$size -= \strlen($chunk);
|
|
||||||
} while (0 < $size);
|
|
||||||
|
|
||||||
$this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL));
|
|
||||||
|
|
||||||
$response = new Response($buffer);
|
|
||||||
|
|
||||||
if ($response->isHeartBeat()) {
|
|
||||||
$this->command('NOP');
|
|
||||||
|
|
||||||
return $this->receive(
|
|
||||||
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// @codeCoverageIgnoreStart
|
|
||||||
catch (Exception $e) {
|
|
||||||
$this->disconnect();
|
|
||||||
|
|
||||||
throw ConnectionFail::fromThrowable($e);
|
|
||||||
}
|
|
||||||
// @codeCoverageIgnoreEnd
|
|
||||||
|
|
||||||
return $response;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function response(): Response
|
protected function readFrame(): Frame
|
||||||
{
|
{
|
||||||
return $this->receive() ?? throw UnexpectedResponse::null();
|
if ($this->closed) {
|
||||||
}
|
throw new NotConnected('Connection closed.');
|
||||||
|
|
||||||
private function socket(): Socket
|
|
||||||
{
|
|
||||||
if (null === $this->socket) {
|
|
||||||
$this->connect();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return $this->socket ?? throw new ConnectionFail('This connection is closed, create new one.');
|
$buffer = $this->socket->read();
|
||||||
|
|
||||||
|
$this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL));
|
||||||
|
|
||||||
|
return match ($type = $buffer->consumeType()) {
|
||||||
|
0 => new Response($buffer->flush()),
|
||||||
|
1 => new Error($buffer->flush()),
|
||||||
|
2 => new Message(
|
||||||
|
timestamp: $buffer->consumeTimestamp(),
|
||||||
|
attempts: $buffer->consumeAttempts(),
|
||||||
|
id: $buffer->consumeId(),
|
||||||
|
body: $buffer->flush(),
|
||||||
|
consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'),
|
||||||
|
),
|
||||||
|
default => throw new NsqException('Unexpected frame type: '.$type)
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function checkIsOK(): void
|
||||||
|
{
|
||||||
|
$response = $this->readResponse();
|
||||||
|
|
||||||
|
if ($response->isHeartBeat()) {
|
||||||
|
$this->command('NOP');
|
||||||
|
|
||||||
|
$this->checkIsOK();
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$response->isOk()) {
|
||||||
|
throw new BadResponse($response);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->logger->info('Ok checked.');
|
||||||
|
}
|
||||||
|
|
||||||
|
private function readResponse(): Response
|
||||||
|
{
|
||||||
|
$frame = $this->readFrame();
|
||||||
|
|
||||||
|
if ($frame instanceof Response) {
|
||||||
|
return $frame;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($frame instanceof Error) {
|
||||||
|
if ($frame->type->terminateConnection) {
|
||||||
|
$this->close();
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new NsqError($frame);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new NsqException('Unreachable statement.');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -4,16 +4,82 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq;
|
namespace Nsq;
|
||||||
|
|
||||||
|
use Generator;
|
||||||
|
use Nsq\Config\ClientConfig;
|
||||||
|
use Nsq\Exception\NsqError;
|
||||||
|
use Nsq\Exception\NsqException;
|
||||||
|
use Nsq\Protocol\Error;
|
||||||
|
use Nsq\Protocol\Message;
|
||||||
|
use Nsq\Protocol\Response;
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
|
||||||
final class Consumer extends Connection
|
final class Consumer extends Connection
|
||||||
{
|
{
|
||||||
private int $rdy = 0;
|
private int $rdy = 0;
|
||||||
|
|
||||||
|
public function __construct(
|
||||||
|
private string $topic,
|
||||||
|
private string $channel,
|
||||||
|
string $address,
|
||||||
|
ClientConfig $clientConfig = null,
|
||||||
|
LoggerInterface $logger = null
|
||||||
|
) {
|
||||||
|
parent::__construct($address, $clientConfig, $logger);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subscribe to a topic/channel.
|
* @psalm-return Generator<int, Message|float|null, int|null, void>
|
||||||
*/
|
*/
|
||||||
public function sub(string $topic, string $channel): void
|
public function generator(): \Generator
|
||||||
{
|
{
|
||||||
$this->command('SUB', [$topic, $channel])->response()->okOrFail();
|
$this->command('SUB', [$this->topic, $this->channel])->checkIsOK();
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
$this->rdy(1);
|
||||||
|
|
||||||
|
$timeout = $this->clientConfig->readTimeout;
|
||||||
|
|
||||||
|
do {
|
||||||
|
$deadline = microtime(true) + $timeout;
|
||||||
|
|
||||||
|
$message = $this->hasMessage($timeout) ? $this->readMessage() : null;
|
||||||
|
|
||||||
|
$timeout = ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime;
|
||||||
|
} while (0 < $timeout && null === $message);
|
||||||
|
|
||||||
|
$command = yield $message;
|
||||||
|
|
||||||
|
if (0 === $command) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function readMessage(): ?Message
|
||||||
|
{
|
||||||
|
$frame = $this->readFrame();
|
||||||
|
|
||||||
|
if ($frame instanceof Message) {
|
||||||
|
return $frame;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($frame instanceof Response && $frame->isHeartBeat()) {
|
||||||
|
$this->command('NOP');
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($frame instanceof Error) {
|
||||||
|
if ($frame->type->terminateConnection) {
|
||||||
|
$this->close();
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new NsqError($frame);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new NsqException('Unreachable statement.');
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -32,6 +98,8 @@ final class Consumer extends Connection
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Finish a message (indicate successful processing).
|
* Finish a message (indicate successful processing).
|
||||||
|
*
|
||||||
|
* @internal
|
||||||
*/
|
*/
|
||||||
public function fin(string $id): void
|
public function fin(string $id): void
|
||||||
{
|
{
|
||||||
@@ -41,10 +109,12 @@ final class Consumer extends Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Re-queue a message (indicate failure to process)
|
* Re-queue a message (indicate failure to process) The re-queued message is placed at the tail of the queue,
|
||||||
* The re-queued message is placed at the tail of the queue, equivalent to having just published it,
|
* equivalent to having just published it, but for various implementation specific reasons that behavior should not
|
||||||
* but for various implementation specific reasons that behavior should not be explicitly relied upon and may change in the future.
|
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
|
||||||
* Similarly, a message that is in-flight and times out behaves identically to an explicit REQ.
|
* behaves identically to an explicit REQ.
|
||||||
|
*
|
||||||
|
* @internal
|
||||||
*/
|
*/
|
||||||
public function req(string $id, int $timeout): void
|
public function req(string $id, int $timeout): void
|
||||||
{
|
{
|
||||||
@@ -55,6 +125,8 @@ final class Consumer extends Connection
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Reset the timeout for an in-flight message.
|
* Reset the timeout for an in-flight message.
|
||||||
|
*
|
||||||
|
* @internal
|
||||||
*/
|
*/
|
||||||
public function touch(string $id): void
|
public function touch(string $id): void
|
||||||
{
|
{
|
||||||
|
@@ -4,8 +4,10 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq\Exception;
|
namespace Nsq\Exception;
|
||||||
|
|
||||||
use RuntimeException;
|
final class AuthenticationRequired extends NsqException
|
||||||
|
|
||||||
final class AuthenticationRequired extends RuntimeException implements NsqException
|
|
||||||
{
|
{
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
parent::__construct('NSQ requires authorization, set ClientConfig::$authSecret before connecting');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
15
src/Exception/BadResponse.php
Normal file
15
src/Exception/BadResponse.php
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Exception;
|
||||||
|
|
||||||
|
use Nsq\Protocol\Response;
|
||||||
|
|
||||||
|
final class BadResponse extends NsqException
|
||||||
|
{
|
||||||
|
public function __construct(Response $response)
|
||||||
|
{
|
||||||
|
parent::__construct($response->msg);
|
||||||
|
}
|
||||||
|
}
|
@@ -4,15 +4,12 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq\Exception;
|
namespace Nsq\Exception;
|
||||||
|
|
||||||
use RuntimeException;
|
final class ConnectionFail extends NsqException
|
||||||
use Throwable;
|
|
||||||
|
|
||||||
final class ConnectionFail extends RuntimeException implements NsqException
|
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* @codeCoverageIgnore
|
* @codeCoverageIgnore
|
||||||
*/
|
*/
|
||||||
public static function fromThrowable(Throwable $throwable): self
|
public static function fromThrowable(\Throwable $throwable): self
|
||||||
{
|
{
|
||||||
return new self($throwable->getMessage(), (int) $throwable->getCode(), $throwable);
|
return new self($throwable->getMessage(), (int) $throwable->getCode(), $throwable);
|
||||||
}
|
}
|
||||||
|
@@ -4,10 +4,9 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq\Exception;
|
namespace Nsq\Exception;
|
||||||
|
|
||||||
use Nsq\Message;
|
use Nsq\Protocol\Message;
|
||||||
use RuntimeException;
|
|
||||||
|
|
||||||
final class MessageAlreadyFinished extends RuntimeException implements NsqException
|
final class MessageAlreadyFinished extends NsqException
|
||||||
{
|
{
|
||||||
public static function finish(Message $message): self
|
public static function finish(Message $message): self
|
||||||
{
|
{
|
||||||
|
9
src/Exception/NotConnected.php
Normal file
9
src/Exception/NotConnected.php
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Exception;
|
||||||
|
|
||||||
|
final class NotConnected extends NsqException
|
||||||
|
{
|
||||||
|
}
|
@@ -4,8 +4,12 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq\Exception;
|
namespace Nsq\Exception;
|
||||||
|
|
||||||
use RuntimeException;
|
use Nsq\Protocol\Error;
|
||||||
|
|
||||||
final class NsqError extends RuntimeException implements NsqException
|
final class NsqError extends NsqException
|
||||||
{
|
{
|
||||||
|
public function __construct(Error $error)
|
||||||
|
{
|
||||||
|
parent::__construct($error->rawData);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -4,8 +4,6 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq\Exception;
|
namespace Nsq\Exception;
|
||||||
|
|
||||||
use Throwable;
|
class NsqException extends \RuntimeException
|
||||||
|
|
||||||
interface NsqException extends Throwable
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
9
src/Exception/NullReceived.php
Normal file
9
src/Exception/NullReceived.php
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Exception;
|
||||||
|
|
||||||
|
final class NullReceived extends NsqException
|
||||||
|
{
|
||||||
|
}
|
@@ -1,18 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Exception;
|
|
||||||
|
|
||||||
use RuntimeException;
|
|
||||||
|
|
||||||
final class UnexpectedResponse extends RuntimeException implements NsqException
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* @codeCoverageIgnore
|
|
||||||
*/
|
|
||||||
public static function null(): self
|
|
||||||
{
|
|
||||||
return new self('Response was expected, but null received.');
|
|
||||||
}
|
|
||||||
}
|
|
78
src/NsqSocket.php
Normal file
78
src/NsqSocket.php
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq;
|
||||||
|
|
||||||
|
use Nsq\Exception\ConnectionFail;
|
||||||
|
use Nsq\Socket\Socket;
|
||||||
|
use PHPinnacle\Buffer\ByteBuffer;
|
||||||
|
use Throwable;
|
||||||
|
|
||||||
|
final class NsqSocket
|
||||||
|
{
|
||||||
|
private Buffer $input;
|
||||||
|
|
||||||
|
private ByteBuffer $output;
|
||||||
|
|
||||||
|
public function __construct(
|
||||||
|
private Socket $socket,
|
||||||
|
) {
|
||||||
|
$this->input = new Buffer();
|
||||||
|
$this->output = new ByteBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function write(string $command, string $data = null): void
|
||||||
|
{
|
||||||
|
$this->output->append($command.PHP_EOL);
|
||||||
|
|
||||||
|
if (null !== $data) {
|
||||||
|
$this->output->appendUint32(\strlen($data));
|
||||||
|
$this->output->append($data);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->socket->write($this->output->flush());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function wait(float $timeout): bool
|
||||||
|
{
|
||||||
|
return $this->socket->selectRead($timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function read(): Buffer
|
||||||
|
{
|
||||||
|
$buffer = $this->input;
|
||||||
|
|
||||||
|
$size = Bytes::BYTES_SIZE;
|
||||||
|
|
||||||
|
do {
|
||||||
|
$buffer->append(
|
||||||
|
$this->socket->read($size),
|
||||||
|
);
|
||||||
|
|
||||||
|
$size -= $buffer->size();
|
||||||
|
} while ($buffer->size() < Bytes::BYTES_SIZE);
|
||||||
|
|
||||||
|
if ('' === $buffer->bytes()) {
|
||||||
|
throw new ConnectionFail('Probably connection closed.');
|
||||||
|
}
|
||||||
|
|
||||||
|
$size = $buffer->consumeSize();
|
||||||
|
|
||||||
|
do {
|
||||||
|
$buffer->append(
|
||||||
|
$this->socket->read($size - $buffer->size()),
|
||||||
|
);
|
||||||
|
} while ($buffer->size() < $size);
|
||||||
|
|
||||||
|
return $buffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function close(): void
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
$this->socket->close();
|
||||||
|
} catch (Throwable) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@@ -4,41 +4,38 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq;
|
namespace Nsq;
|
||||||
|
|
||||||
use function array_map;
|
use PHPinnacle\Buffer\ByteBuffer;
|
||||||
use function implode;
|
|
||||||
use function pack;
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @psalm-suppress PropertyNotSetInConstructor
|
||||||
|
*/
|
||||||
final class Producer extends Connection
|
final class Producer extends Connection
|
||||||
{
|
{
|
||||||
/**
|
|
||||||
* @psalm-suppress PossiblyFalseOperand
|
|
||||||
*/
|
|
||||||
public function pub(string $topic, string $body): void
|
public function pub(string $topic, string $body): void
|
||||||
{
|
{
|
||||||
$this->command('PUB', $topic, $body)->response()->okOrFail();
|
$this->command('PUB', $topic, $body)->checkIsOK();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @psalm-param array<mixed, mixed> $bodies
|
* @psalm-param array<int, mixed> $bodies
|
||||||
*
|
|
||||||
* @psalm-suppress PossiblyFalseOperand
|
|
||||||
*/
|
*/
|
||||||
public function mpub(string $topic, array $bodies): void
|
public function mpub(string $topic, array $bodies): void
|
||||||
{
|
{
|
||||||
$num = pack('N', \count($bodies));
|
static $buffer;
|
||||||
|
$buffer ??= new ByteBuffer();
|
||||||
|
|
||||||
$mb = implode('', array_map(static function ($body): string {
|
$buffer->appendUint32(\count($bodies));
|
||||||
return pack('N', \strlen($body)).$body;
|
|
||||||
}, $bodies));
|
|
||||||
|
|
||||||
$this->command('MPUB', $topic, $num.$mb)->response()->okOrFail();
|
foreach ($bodies as $body) {
|
||||||
|
$buffer->appendUint32(\strlen($body));
|
||||||
|
$buffer->append($body);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->command('MPUB', $topic, $buffer->flush())->checkIsOK();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @psalm-suppress PossiblyFalseOperand
|
|
||||||
*/
|
|
||||||
public function dpub(string $topic, string $body, int $delay): void
|
public function dpub(string $topic, string $body, int $delay): void
|
||||||
{
|
{
|
||||||
$this->command('DPUB', [$topic, $delay], $body)->response()->okOrFail();
|
$this->command('DPUB', [$topic, $delay], $body)->checkIsOK();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
22
src/Protocol/Error.php
Normal file
22
src/Protocol/Error.php
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Protocol;
|
||||||
|
|
||||||
|
use Nsq\Bytes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @psalm-immutable
|
||||||
|
*/
|
||||||
|
final class Error extends Frame
|
||||||
|
{
|
||||||
|
public ErrorType $type;
|
||||||
|
|
||||||
|
public function __construct(public string $rawData)
|
||||||
|
{
|
||||||
|
parent::__construct(\strlen($this->rawData) + Bytes::BYTES_TYPE);
|
||||||
|
|
||||||
|
$this->type = new ErrorType(explode(' ', $this->rawData)[0]);
|
||||||
|
}
|
||||||
|
}
|
100
src/Protocol/ErrorType.php
Normal file
100
src/Protocol/ErrorType.php
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Protocol;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @psalm-immutable
|
||||||
|
*/
|
||||||
|
final class ErrorType
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* A generic error type without any more hints.
|
||||||
|
*/
|
||||||
|
public const E_INVALID = true;
|
||||||
|
/**
|
||||||
|
* This error might be returned during multiple occasions. It can be returned for IDENTIFY, AUTH or MPUB messages.
|
||||||
|
* It is caused for payloads that do not meet certain requirements. For IDENTIFY and AUTH, this is usually a bug in
|
||||||
|
* the library and should be reported. For MPUB, this error can occur if the payload is larger than the maximum
|
||||||
|
* payload size specified in the nsqd config.
|
||||||
|
*/
|
||||||
|
public const E_BAD_BODY = true;
|
||||||
|
/**
|
||||||
|
* This error indicates that the topic sent to nsqd is not valid.
|
||||||
|
*/
|
||||||
|
public const E_BAD_TOPIC = true;
|
||||||
|
/**
|
||||||
|
* This error indicates that the channel sent to nsqd is not valid.
|
||||||
|
*/
|
||||||
|
public const E_BAD_CHANNEL = true;
|
||||||
|
/**
|
||||||
|
* This error is returned by nsqd if the message in the payload of a publishing operation does not meet the
|
||||||
|
* requirements of the server. This might be caused by too big payloads being sent to nsqd. You should consider
|
||||||
|
* adding a limit to the payload size or increasing it in the nsqd config.
|
||||||
|
*/
|
||||||
|
public const E_BAD_MESSAGE = true;
|
||||||
|
/**
|
||||||
|
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
|
||||||
|
* temporary error and can be caused by topics being added, deleted or cleared.
|
||||||
|
*/
|
||||||
|
public const E_PUB_FAILED = true;
|
||||||
|
/**
|
||||||
|
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
|
||||||
|
* temporary error and can be caused by topics being added, deleted or cleared.
|
||||||
|
*/
|
||||||
|
public const E_MPUB_FAILED = true;
|
||||||
|
/**
|
||||||
|
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
|
||||||
|
* temporary error and can be caused by topics being added, deleted or cleared.
|
||||||
|
*/
|
||||||
|
public const E_DPUB_FAILED = true;
|
||||||
|
/**
|
||||||
|
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
|
||||||
|
* happen in particular for messages that are no longer queued on the server side.
|
||||||
|
*/
|
||||||
|
public const E_FIN_FAILED = false;
|
||||||
|
/**
|
||||||
|
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
|
||||||
|
* happen in particular for messages that are no longer queued on the server side.
|
||||||
|
*/
|
||||||
|
public const E_REQ_FAILED = false;
|
||||||
|
/**
|
||||||
|
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
|
||||||
|
* happen in particular for messages that are no longer queued on the server side.
|
||||||
|
*/
|
||||||
|
public const E_TOUCH_FAILED = false;
|
||||||
|
/**
|
||||||
|
* This error indicates that the authorization of the client failed on the server side. This might be related
|
||||||
|
* to connection issues to the authorization server. Depending on the authorization server implementation, this
|
||||||
|
* might also indicate that the given auth secret in the [ClientConfig] is not known on the server or the server
|
||||||
|
* denied authentication with the current connection properties (i.e. TLS status and IP).
|
||||||
|
*/
|
||||||
|
public const E_AUTH_FAILED = true;
|
||||||
|
/**
|
||||||
|
* This error happens if something breaks on the nsqd side while performing the authorization. This might be
|
||||||
|
* caused by bugs in nsqd, the authorization server or network issues.
|
||||||
|
*/
|
||||||
|
public const E_AUTH_ERROR = true;
|
||||||
|
/**
|
||||||
|
* This error is sent by nsqd if the client attempts an authentication, but the server does not support it. This
|
||||||
|
* should never happen using this library as authorization requests are only sent if the server supports it.
|
||||||
|
* It is safe to expect that this error is never thrown.
|
||||||
|
*/
|
||||||
|
public const E_AUTH_DISABLED = true;
|
||||||
|
/**
|
||||||
|
* This error indicates that the client related to the authorization secret set in the [ClientConfig] is not
|
||||||
|
* allowed to do the operation it tried to do.
|
||||||
|
*/
|
||||||
|
public const E_UNAUTHORIZED = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A boolean indicating whether or not an [Error] with this type terminates the connection or not.
|
||||||
|
*/
|
||||||
|
public bool $terminateConnection;
|
||||||
|
|
||||||
|
public function __construct(public string $type)
|
||||||
|
{
|
||||||
|
$this->terminateConnection = \constant('self::'.$this->type) ?? self::E_INVALID;
|
||||||
|
}
|
||||||
|
}
|
16
src/Protocol/Frame.php
Normal file
16
src/Protocol/Frame.php
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Protocol;
|
||||||
|
|
||||||
|
abstract class Frame
|
||||||
|
{
|
||||||
|
public function __construct(
|
||||||
|
/**
|
||||||
|
* @psalm-readonly
|
||||||
|
*/
|
||||||
|
public int $length,
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
}
|
@@ -2,11 +2,13 @@
|
|||||||
|
|
||||||
declare(strict_types=1);
|
declare(strict_types=1);
|
||||||
|
|
||||||
namespace Nsq;
|
namespace Nsq\Protocol;
|
||||||
|
|
||||||
|
use Nsq\Bytes;
|
||||||
|
use Nsq\Consumer;
|
||||||
use Nsq\Exception\MessageAlreadyFinished;
|
use Nsq\Exception\MessageAlreadyFinished;
|
||||||
|
|
||||||
final class Message
|
final class Message extends Frame
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* @psalm-readonly
|
* @psalm-readonly
|
||||||
@@ -34,6 +36,14 @@ final class Message
|
|||||||
|
|
||||||
public function __construct(int $timestamp, int $attempts, string $id, string $body, Consumer $consumer)
|
public function __construct(int $timestamp, int $attempts, string $id, string $body, Consumer $consumer)
|
||||||
{
|
{
|
||||||
|
parent::__construct(
|
||||||
|
Bytes::BYTES_TYPE
|
||||||
|
+ Bytes::BYTES_TIMESTAMP
|
||||||
|
+ Bytes::BYTES_ATTEMPTS
|
||||||
|
+ Bytes::BYTES_ID
|
||||||
|
+ \strlen($body)
|
||||||
|
);
|
||||||
|
|
||||||
$this->timestamp = $timestamp;
|
$this->timestamp = $timestamp;
|
||||||
$this->attempts = $attempts;
|
$this->attempts = $attempts;
|
||||||
$this->id = $id;
|
$this->id = $id;
|
39
src/Protocol/Response.php
Normal file
39
src/Protocol/Response.php
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Protocol;
|
||||||
|
|
||||||
|
use Nsq\Bytes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @psalm-immutable
|
||||||
|
*/
|
||||||
|
final class Response extends Frame
|
||||||
|
{
|
||||||
|
public const OK = 'OK';
|
||||||
|
public const HEARTBEAT = '_heartbeat_';
|
||||||
|
|
||||||
|
public function __construct(public string $msg)
|
||||||
|
{
|
||||||
|
parent::__construct(\strlen($this->msg) + Bytes::BYTES_TYPE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function isOk(): bool
|
||||||
|
{
|
||||||
|
return self::OK === $this->msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function isHeartBeat(): bool
|
||||||
|
{
|
||||||
|
return self::HEARTBEAT === $this->msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return array<mixed, mixed>
|
||||||
|
*/
|
||||||
|
public function toArray(): array
|
||||||
|
{
|
||||||
|
return json_decode($this->msg, true, flags: JSON_THROW_ON_ERROR);
|
||||||
|
}
|
||||||
|
}
|
@@ -8,7 +8,6 @@ use Nsq\Exception\ConnectionFail;
|
|||||||
use Psr\Log\LoggerAwareTrait;
|
use Psr\Log\LoggerAwareTrait;
|
||||||
use Psr\Log\LoggerInterface;
|
use Psr\Log\LoggerInterface;
|
||||||
use Psr\Log\NullLogger;
|
use Psr\Log\NullLogger;
|
||||||
use Throwable;
|
|
||||||
|
|
||||||
final class ExponentialStrategy implements ReconnectStrategy
|
final class ExponentialStrategy implements ReconnectStrategy
|
||||||
{
|
{
|
||||||
@@ -47,7 +46,7 @@ final class ExponentialStrategy implements ReconnectStrategy
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
$callable();
|
$callable();
|
||||||
} catch (Throwable $e) {
|
} catch (\Throwable $e) {
|
||||||
$nextDelay = 0 === $this->delay ? $this->minDelay : $this->delay * 2;
|
$nextDelay = 0 === $this->delay ? $this->minDelay : $this->delay * 2;
|
||||||
$this->delay = $nextDelay > $this->maxDelay ? $this->maxDelay : $nextDelay;
|
$this->delay = $nextDelay > $this->maxDelay ? $this->maxDelay : $nextDelay;
|
||||||
$this->nextTryAfter = $currentTime + $this->delay;
|
$this->nextTryAfter = $currentTime + $this->delay;
|
||||||
|
@@ -1,87 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq;
|
|
||||||
|
|
||||||
use Nsq\Exception\NsqError;
|
|
||||||
use Nsq\Exception\UnexpectedResponse;
|
|
||||||
use PHPinnacle\Buffer\ByteBuffer;
|
|
||||||
use function json_decode;
|
|
||||||
use function sprintf;
|
|
||||||
use const JSON_THROW_ON_ERROR;
|
|
||||||
|
|
||||||
final class Response
|
|
||||||
{
|
|
||||||
private const OK = 'OK';
|
|
||||||
private const HEARTBEAT = '_heartbeat_';
|
|
||||||
private const TYPE_RESPONSE = 0;
|
|
||||||
private const TYPE_ERROR = 1;
|
|
||||||
private const TYPE_MESSAGE = 2;
|
|
||||||
|
|
||||||
private int $type;
|
|
||||||
|
|
||||||
private ByteBuffer $buffer;
|
|
||||||
|
|
||||||
public function __construct(ByteBuffer $buffer)
|
|
||||||
{
|
|
||||||
$this->type = $buffer->consumeUint32();
|
|
||||||
$this->buffer = $buffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function okOrFail(): void
|
|
||||||
{
|
|
||||||
if (self::TYPE_ERROR === $this->type) {
|
|
||||||
throw new NsqError($this->buffer->bytes());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (self::TYPE_RESPONSE !== $this->type) {
|
|
||||||
// @codeCoverageIgnoreStart
|
|
||||||
throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type));
|
|
||||||
// @codeCoverageIgnoreEnd
|
|
||||||
}
|
|
||||||
|
|
||||||
if (self::OK !== $this->buffer->bytes()) {
|
|
||||||
// @codeCoverageIgnoreStart
|
|
||||||
throw new UnexpectedResponse(sprintf('OK response expected, but "%s" received.', $this->buffer->bytes()));
|
|
||||||
// @codeCoverageIgnoreEnd
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public function isHeartBeat(): bool
|
|
||||||
{
|
|
||||||
return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @phpstan-ignore-next-line
|
|
||||||
*/
|
|
||||||
public function toArray(): array
|
|
||||||
{
|
|
||||||
if (self::TYPE_RESPONSE !== $this->type) {
|
|
||||||
// @codeCoverageIgnoreStart
|
|
||||||
throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type));
|
|
||||||
// @codeCoverageIgnoreEnd
|
|
||||||
}
|
|
||||||
|
|
||||||
return json_decode($this->buffer->bytes(), true, flags: JSON_THROW_ON_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function toMessage(Consumer $reader): Message
|
|
||||||
{
|
|
||||||
if (self::TYPE_MESSAGE !== $this->type) {
|
|
||||||
// @codeCoverageIgnoreStart
|
|
||||||
throw new UnexpectedResponse(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type));
|
|
||||||
// @codeCoverageIgnoreEnd
|
|
||||||
}
|
|
||||||
|
|
||||||
$buffer = new ByteBuffer($this->buffer->bytes());
|
|
||||||
|
|
||||||
$timestamp = $buffer->consumeInt64();
|
|
||||||
$attempts = $buffer->consumeUint16();
|
|
||||||
$id = $buffer->consume(Bytes::BYTES_ID);
|
|
||||||
$body = $buffer->flush();
|
|
||||||
|
|
||||||
return new Message($timestamp, $attempts, $id, $body, $reader);
|
|
||||||
}
|
|
||||||
}
|
|
45
src/Socket/DeflateSocket.php
Normal file
45
src/Socket/DeflateSocket.php
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Socket;
|
||||||
|
|
||||||
|
final class DeflateSocket implements Socket
|
||||||
|
{
|
||||||
|
public function __construct(
|
||||||
|
private Socket $socket,
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function write(string $data): void
|
||||||
|
{
|
||||||
|
throw new \LogicException('not implemented.');
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function read(int $length): string
|
||||||
|
{
|
||||||
|
throw new \LogicException('not implemented.');
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function close(): void
|
||||||
|
{
|
||||||
|
throw new \LogicException('not implemented.');
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function selectRead(float $timeout): bool
|
||||||
|
{
|
||||||
|
return $this->socket->selectRead($timeout);
|
||||||
|
}
|
||||||
|
}
|
81
src/Socket/RawSocket.php
Normal file
81
src/Socket/RawSocket.php
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Socket;
|
||||||
|
|
||||||
|
use Nsq\Exception\ConnectionFail;
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Psr\Log\NullLogger;
|
||||||
|
use Socket\Raw\Exception;
|
||||||
|
use Socket\Raw\Factory;
|
||||||
|
use Socket\Raw\Socket as ClueSocket;
|
||||||
|
use Throwable;
|
||||||
|
|
||||||
|
final class RawSocket implements Socket
|
||||||
|
{
|
||||||
|
private ClueSocket $socket;
|
||||||
|
|
||||||
|
private LoggerInterface $logger;
|
||||||
|
|
||||||
|
public function __construct(string $address, LoggerInterface $logger = null)
|
||||||
|
{
|
||||||
|
$this->socket = (new Factory())->createClient($address);
|
||||||
|
$this->logger = $logger ?? new NullLogger();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function selectRead(float $timeout): bool
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
return false !== $this->socket->selectRead($timeout);
|
||||||
|
} // @codeCoverageIgnoreStart
|
||||||
|
catch (Exception $e) {
|
||||||
|
throw ConnectionFail::fromThrowable($e);
|
||||||
|
}
|
||||||
|
// @codeCoverageIgnoreEnd
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function close(): void
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
$this->socket->close();
|
||||||
|
} catch (Throwable) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function write(string $data): void
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
$this->socket->write($data);
|
||||||
|
} // @codeCoverageIgnoreStart
|
||||||
|
catch (Exception $e) {
|
||||||
|
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||||
|
|
||||||
|
throw ConnectionFail::fromThrowable($e);
|
||||||
|
}
|
||||||
|
// @codeCoverageIgnoreEnd
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function read(int $length): string
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
return $this->socket->read($length);
|
||||||
|
} // @codeCoverageIgnoreStart
|
||||||
|
catch (Exception $e) {
|
||||||
|
throw ConnectionFail::fromThrowable($e);
|
||||||
|
}
|
||||||
|
// @codeCoverageIgnoreEnd
|
||||||
|
}
|
||||||
|
}
|
162
src/Socket/SnappySocket.php
Normal file
162
src/Socket/SnappySocket.php
Normal file
@@ -0,0 +1,162 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Socket;
|
||||||
|
|
||||||
|
use PHPinnacle\Buffer\ByteBuffer;
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
|
||||||
|
final class SnappySocket implements Socket
|
||||||
|
{
|
||||||
|
private ByteBuffer $output;
|
||||||
|
|
||||||
|
private ByteBuffer $input;
|
||||||
|
|
||||||
|
public function __construct(
|
||||||
|
private Socket $socket,
|
||||||
|
private LoggerInterface $logger,
|
||||||
|
) {
|
||||||
|
if (
|
||||||
|
!\function_exists('snappy_compress')
|
||||||
|
|| !\function_exists('snappy_uncompress')
|
||||||
|
|| !\extension_loaded('snappy')
|
||||||
|
) {
|
||||||
|
throw new \LogicException('Snappy extension not installed.');
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->output = new ByteBuffer();
|
||||||
|
$this->input = new ByteBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function write(string $data): void
|
||||||
|
{
|
||||||
|
$identifierFrame = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
|
||||||
|
$compressedFrame = 0x00;
|
||||||
|
$uncompressedFrame = 0x01; // 11
|
||||||
|
$maxChunkLength = 65536;
|
||||||
|
|
||||||
|
$byteBuffer = new ByteBuffer();
|
||||||
|
foreach ($identifierFrame as $bite) {
|
||||||
|
$byteBuffer->appendUint8($bite);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (str_split($data, $maxChunkLength) as $chunk) {
|
||||||
|
$compressedChunk = snappy_compress($chunk);
|
||||||
|
|
||||||
|
[$chunk, $chunkType] = \strlen($compressedChunk) <= 0.875 * \strlen($data)
|
||||||
|
? [$compressedChunk, $compressedFrame]
|
||||||
|
: [$data, $uncompressedFrame];
|
||||||
|
|
||||||
|
/** @var string $checksum */
|
||||||
|
$checksum = hash('crc32c', $data, true);
|
||||||
|
/** @phpstan-ignore-next-line */
|
||||||
|
$checksum = unpack('N', $checksum)[1];
|
||||||
|
$maskedChecksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff;
|
||||||
|
|
||||||
|
$size = (\strlen($chunk) + 4) << 8;
|
||||||
|
|
||||||
|
$byteBuffer->append(pack('V', $chunkType + $size));
|
||||||
|
$byteBuffer->append(pack('V', $maskedChecksum));
|
||||||
|
$byteBuffer->append($chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->socket->write($byteBuffer->flush());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function read(int $length): string
|
||||||
|
{
|
||||||
|
$output = $this->output;
|
||||||
|
$input = $this->input;
|
||||||
|
|
||||||
|
$this->logger->debug('Snappy requested {length} bytes.', ['length' => $length]);
|
||||||
|
|
||||||
|
while ($output->size() < $length) {
|
||||||
|
$this->logger->debug('Snappy enter loop');
|
||||||
|
|
||||||
|
/** @phpstan-ignore-next-line */
|
||||||
|
$chunkType = unpack('V', $this->socket->read(4))[1];
|
||||||
|
|
||||||
|
$size = $chunkType >> 8;
|
||||||
|
$chunkType &= 0xff;
|
||||||
|
|
||||||
|
$this->logger->debug('Snappy receive chunk [{chunk}], size [{size}]', [
|
||||||
|
'chunk' => $chunkType,
|
||||||
|
'size' => $size,
|
||||||
|
]);
|
||||||
|
|
||||||
|
do {
|
||||||
|
$input->append(
|
||||||
|
$this->socket->read($size),
|
||||||
|
);
|
||||||
|
|
||||||
|
$size -= $input->size();
|
||||||
|
} while ($input->size() < $size);
|
||||||
|
|
||||||
|
switch ($chunkType) {
|
||||||
|
case 0xff:
|
||||||
|
$this->logger->debug('Snappy identifier chunk');
|
||||||
|
|
||||||
|
$input->discard(6); // discard identifier body
|
||||||
|
|
||||||
|
break;
|
||||||
|
case 0x00: // 'compressed',
|
||||||
|
$this->logger->debug('Snappy compressed chunk');
|
||||||
|
|
||||||
|
$data = $input
|
||||||
|
->discard(4) // discard checksum
|
||||||
|
->flush()
|
||||||
|
;
|
||||||
|
|
||||||
|
$this->logger->debug('Snappy compressed data [{data}]', ['data' => $data]);
|
||||||
|
|
||||||
|
$output->append(snappy_uncompress($data));
|
||||||
|
|
||||||
|
break;
|
||||||
|
case 0x01: // 'uncompressed',
|
||||||
|
$this->logger->debug('Snappy uncompressed chunk');
|
||||||
|
|
||||||
|
$data = $input
|
||||||
|
->discard(4) // discard checksum
|
||||||
|
->flush()
|
||||||
|
;
|
||||||
|
|
||||||
|
$this->logger->debug('Snappy uncompressed data [{data}]', ['data' => $data]);
|
||||||
|
|
||||||
|
$output->append($data);
|
||||||
|
|
||||||
|
break;
|
||||||
|
case 0xfe:// 'padding',
|
||||||
|
$this->logger->debug('Snappy padding chunk');
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->logger->debug('Snappy return message [{message}]', ['message' => $output->read($length)]);
|
||||||
|
|
||||||
|
return $output->consume($length);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function close(): void
|
||||||
|
{
|
||||||
|
$this->socket->close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function selectRead(float $timeout): bool
|
||||||
|
{
|
||||||
|
return !$this->input->empty() || $this->socket->selectRead($timeout);
|
||||||
|
}
|
||||||
|
}
|
27
src/Socket/Socket.php
Normal file
27
src/Socket/Socket.php
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Socket;
|
||||||
|
|
||||||
|
use Nsq\Exception\ConnectionFail;
|
||||||
|
|
||||||
|
interface Socket
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @throws ConnectionFail
|
||||||
|
*/
|
||||||
|
public function write(string $data): void;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws ConnectionFail
|
||||||
|
*/
|
||||||
|
public function read(int $length): string;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws ConnectionFail
|
||||||
|
*/
|
||||||
|
public function selectRead(float $timeout): bool;
|
||||||
|
|
||||||
|
public function close(): void;
|
||||||
|
}
|
@@ -1,39 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq;
|
|
||||||
|
|
||||||
use Generator;
|
|
||||||
|
|
||||||
final class Subscriber
|
|
||||||
{
|
|
||||||
public const STOP = 0;
|
|
||||||
|
|
||||||
private Consumer $reader;
|
|
||||||
|
|
||||||
public function __construct(Consumer $reader)
|
|
||||||
{
|
|
||||||
$this->reader = $reader;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @psalm-return Generator<int, Message|float|null, int|float|null, void>
|
|
||||||
*/
|
|
||||||
public function subscribe(string $topic, string $channel): Generator
|
|
||||||
{
|
|
||||||
$this->reader->sub($topic, $channel);
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
$this->reader->rdy(1);
|
|
||||||
|
|
||||||
$command = yield $this->reader->receive()?->toMessage($this->reader);
|
|
||||||
|
|
||||||
if (self::STOP === $command) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->reader->disconnect();
|
|
||||||
}
|
|
||||||
}
|
|
@@ -4,7 +4,7 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
use Nsq\Consumer;
|
use Nsq\Consumer;
|
||||||
use Nsq\Exception\MessageAlreadyFinished;
|
use Nsq\Exception\MessageAlreadyFinished;
|
||||||
use Nsq\Message;
|
use Nsq\Protocol\Message;
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
|
||||||
final class MessageTest extends TestCase
|
final class MessageTest extends TestCase
|
||||||
|
@@ -4,28 +4,28 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
use Nsq\Config\ClientConfig;
|
use Nsq\Config\ClientConfig;
|
||||||
use Nsq\Consumer;
|
use Nsq\Consumer;
|
||||||
use Nsq\Message;
|
|
||||||
use Nsq\Producer;
|
use Nsq\Producer;
|
||||||
use Nsq\Subscriber;
|
use Nsq\Protocol\Message;
|
||||||
use Nyholm\NSA;
|
use Nyholm\NSA;
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
|
||||||
final class NsqTest extends TestCase
|
final class NsqTest extends TestCase
|
||||||
{
|
{
|
||||||
public function test(): void
|
/**
|
||||||
|
* @dataProvider configs
|
||||||
|
*/
|
||||||
|
public function test(ClientConfig $clientConfig): void
|
||||||
{
|
{
|
||||||
$producer = new Producer('tcp://localhost:4150');
|
$producer = new Producer('tcp://localhost:4150');
|
||||||
$producer->pub(__FUNCTION__, __FUNCTION__);
|
$producer->pub(__FUNCTION__, __FUNCTION__);
|
||||||
|
|
||||||
$consumer = new Consumer(
|
$consumer = new Consumer(
|
||||||
|
topic: 'test',
|
||||||
|
channel: 'test',
|
||||||
address: 'tcp://localhost:4150',
|
address: 'tcp://localhost:4150',
|
||||||
clientConfig: new ClientConfig(
|
clientConfig: $clientConfig,
|
||||||
heartbeatInterval: 1000,
|
|
||||||
readTimeout: 1,
|
|
||||||
),
|
|
||||||
);
|
);
|
||||||
$subscriber = new Subscriber($consumer);
|
$generator = $consumer->generator();
|
||||||
$generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__);
|
|
||||||
|
|
||||||
/** @var null|Message $message */
|
/** @var null|Message $message */
|
||||||
$message = $generator->current();
|
$message = $generator->current();
|
||||||
@@ -85,8 +85,30 @@ final class NsqTest extends TestCase
|
|||||||
$message->touch();
|
$message->touch();
|
||||||
$message->finish();
|
$message->finish();
|
||||||
|
|
||||||
self::assertTrue($consumer->isReady());
|
self::assertFalse($consumer->isClosed());
|
||||||
$generator->send(Subscriber::STOP);
|
$generator->send(0);
|
||||||
self::assertFalse($consumer->isReady());
|
self::assertTrue($consumer->isClosed());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Generator<string, array<int, ClientConfig>>
|
||||||
|
*/
|
||||||
|
public function configs(): Generator
|
||||||
|
{
|
||||||
|
yield 'default' => [
|
||||||
|
new ClientConfig(
|
||||||
|
heartbeatInterval: 3000,
|
||||||
|
snappy: false,
|
||||||
|
readTimeout: 1,
|
||||||
|
),
|
||||||
|
];
|
||||||
|
|
||||||
|
yield 'snappy' => [
|
||||||
|
new ClientConfig(
|
||||||
|
heartbeatInterval: 3000,
|
||||||
|
snappy: true,
|
||||||
|
readTimeout: 1,
|
||||||
|
),
|
||||||
|
];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
30
tests/Protocol/ErrorTypeTest.php
Normal file
30
tests/Protocol/ErrorTypeTest.php
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Protocol;
|
||||||
|
|
||||||
|
use Nsq\Protocol\ErrorType;
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
|
||||||
|
final class ErrorTypeTest extends TestCase
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @dataProvider data
|
||||||
|
*/
|
||||||
|
public function testConstructor(string $type, bool $isConnectionTerminated): void
|
||||||
|
{
|
||||||
|
$errorType = new ErrorType($type);
|
||||||
|
self::assertSame($isConnectionTerminated, $errorType->terminateConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return \Generator<string, array<int, bool|string>>
|
||||||
|
*/
|
||||||
|
public function data(): \Generator
|
||||||
|
{
|
||||||
|
foreach ((new \ReflectionClass(ErrorType::class))->getConstants() as $constant => $isTerminated) {
|
||||||
|
yield $constant => [$constant, $isTerminated];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user