Add ClientConfig and ConnectionConfig, Feature Negotiation and Authentication
This commit is contained in:
10
README.md
10
README.md
@ -30,13 +30,13 @@ Features
|
|||||||
|
|
||||||
- [x] PUB
|
- [x] PUB
|
||||||
- [x] SUB
|
- [x] SUB
|
||||||
- [ ] Feature Negotiation
|
- [X] Feature Negotiation
|
||||||
- [ ] Discovery
|
- [ ] Discovery
|
||||||
- [ ] Backoff
|
- [ ] Backoff
|
||||||
- [ ] TLS
|
- [ ] TLS
|
||||||
- [ ] Snappy
|
- [ ] Snappy
|
||||||
- [X] Sampling
|
- [X] Sampling
|
||||||
- [ ] AUTH
|
- [X] AUTH
|
||||||
|
|
||||||
Usage
|
Usage
|
||||||
-----
|
-----
|
||||||
@ -71,7 +71,7 @@ use Nsq\Subscriber;
|
|||||||
$consumer = new Consumer('tcp://nsqd:4150');
|
$consumer = new Consumer('tcp://nsqd:4150');
|
||||||
$subscriber = new Subscriber($consumer);
|
$subscriber = new Subscriber($consumer);
|
||||||
|
|
||||||
$generator = $subscriber->subscribe('topic', 'channel', timeout: 5);
|
$generator = $subscriber->subscribe('topic', 'channel');
|
||||||
foreach ($generator as $message) {
|
foreach ($generator as $message) {
|
||||||
if ($message instanceof Message) {
|
if ($message instanceof Message) {
|
||||||
$payload = $message->body;
|
$payload = $message->body;
|
||||||
@ -89,10 +89,6 @@ foreach ($generator as $message) {
|
|||||||
// We can also communicate with generator through send
|
// We can also communicate with generator through send
|
||||||
// for example:
|
// for example:
|
||||||
|
|
||||||
// Dynamically change timeout
|
|
||||||
$generator->send(Subscriber::CHANGE_TIMEOUT);
|
|
||||||
$generator->send(10.0); // float required
|
|
||||||
|
|
||||||
// Gracefully close connection (loop will be ended)
|
// Gracefully close connection (loop will be ended)
|
||||||
$generator->send(Subscriber::STOP);
|
$generator->send(Subscriber::STOP);
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
"ergebnis/composer-normalize": "9999999-dev",
|
"ergebnis/composer-normalize": "9999999-dev",
|
||||||
"friendsofphp/php-cs-fixer": "^2.18",
|
"friendsofphp/php-cs-fixer": "^2.18",
|
||||||
"infection/infection": "^0.20.2",
|
"infection/infection": "^0.20.2",
|
||||||
|
"nyholm/nsa": "^1.2",
|
||||||
"phpstan/phpstan": "^0.12.68",
|
"phpstan/phpstan": "^0.12.68",
|
||||||
"phpstan/phpstan-phpunit": "^0.12.17",
|
"phpstan/phpstan-phpunit": "^0.12.17",
|
||||||
"phpstan/phpstan-strict-rules": "^0.12.9",
|
"phpstan/phpstan-strict-rules": "^0.12.9",
|
||||||
|
115
src/Config/ClientConfig.php
Normal file
115
src/Config/ClientConfig.php
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Config;
|
||||||
|
|
||||||
|
use Composer\InstalledVersions;
|
||||||
|
use InvalidArgumentException;
|
||||||
|
use JsonSerializable;
|
||||||
|
use function gethostname;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is used for configuring the clients for nsq. Immutable properties must be set when creating the object and
|
||||||
|
* are sent to NSQ for feature specification or negotiation. Keep in mind that some features might require some
|
||||||
|
* configuration on the server-side and could be not available.
|
||||||
|
*
|
||||||
|
* @psalm-immutable
|
||||||
|
*/
|
||||||
|
final class ClientConfig implements JsonSerializable
|
||||||
|
{
|
||||||
|
/** @psalm-suppress ImpureFunctionCall */
|
||||||
|
public function __construct(
|
||||||
|
/*
|
||||||
|
* The secret used for authorization, if the server requires it. This value will be ignored if the server
|
||||||
|
* does not require authorization.
|
||||||
|
*/
|
||||||
|
public ?string $authSecret = null,
|
||||||
|
|
||||||
|
// The timeout for establishing a connection in seconds.
|
||||||
|
public int $connectTimeout = 10,
|
||||||
|
|
||||||
|
// An identifier used to disambiguate this client (i.e. something specific to the consumer)
|
||||||
|
public string $clientId = '',
|
||||||
|
|
||||||
|
// Enable deflate compression for this connection. A client cannot enable both [snappy] and [deflate].
|
||||||
|
public bool $deflate = false,
|
||||||
|
/*
|
||||||
|
* Configure the deflate compression level for this connection.
|
||||||
|
*
|
||||||
|
* Valid range: `1 <= deflate_level <= configured_max`
|
||||||
|
*
|
||||||
|
* Higher values mean better compression but more CPU usage for nsqd.
|
||||||
|
*/
|
||||||
|
public int $deflateLevel = 6,
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Milliseconds between heartbeats.
|
||||||
|
*
|
||||||
|
* Valid range: `1000 <= heartbeat_interval <= configured_max` (`-1` disables heartbeats)
|
||||||
|
*/
|
||||||
|
public int $heartbeatInterval = 30000,
|
||||||
|
|
||||||
|
// The hostname where the client is deployed
|
||||||
|
public string $hostname = '',
|
||||||
|
|
||||||
|
// Configure the server-side message timeout in milliseconds for messages delivered to this client.
|
||||||
|
public int $msgTimeout = 60000,
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The sample rate for incoming data to deliver a percentage of all messages received to this connection.
|
||||||
|
* This only applies to subscribing connections. The valid range is between 0 and 99, where 0 means that all
|
||||||
|
* data is sent (this is the default). 1 means that 1% of the data is sent.
|
||||||
|
*/
|
||||||
|
public int $sampleRate = 0,
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
||||||
|
* it will send back a JSON payload of supported features and metadata.
|
||||||
|
*/
|
||||||
|
public bool $featureNegotiation = true,
|
||||||
|
|
||||||
|
// Enable TLS for this connection
|
||||||
|
public bool $tls = false,
|
||||||
|
|
||||||
|
// Enable snappy compression for this connection. A client cannot enable both [snappy] and [deflate].
|
||||||
|
public bool $snappy = false,
|
||||||
|
|
||||||
|
// The read timeout for connection sockets and for awaiting responses from nsq.
|
||||||
|
public int $readTimeout = 5,
|
||||||
|
|
||||||
|
// A string identifying the agent for this client in the spirit of HTTP.
|
||||||
|
public string $userAgent = '',
|
||||||
|
) {
|
||||||
|
if ('' === $this->hostname) {
|
||||||
|
$this->hostname = (static fn (mixed $h): string => \is_string($h) ? $h : '')(gethostname());
|
||||||
|
}
|
||||||
|
|
||||||
|
if ('' === $this->userAgent) {
|
||||||
|
$this->userAgent = 'nsqphp/'.InstalledVersions::getPrettyVersion('nsq/nsq');
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($this->snappy && $this->deflate) {
|
||||||
|
throw new InvalidArgumentException('Client cannot enable both [snappy] and [deflate]');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @phpstan-ignore-next-line
|
||||||
|
*/
|
||||||
|
public function jsonSerialize(): array
|
||||||
|
{
|
||||||
|
return [
|
||||||
|
'client_id' => $this->clientId,
|
||||||
|
'deflate' => $this->deflate,
|
||||||
|
'deflate_level' => $this->deflateLevel,
|
||||||
|
'feature_negotiation' => $this->featureNegotiation,
|
||||||
|
'heartbeat_interval' => $this->heartbeatInterval,
|
||||||
|
'hostname' => $this->hostname,
|
||||||
|
'msg_timeout' => $this->msgTimeout,
|
||||||
|
'sample_rate' => $this->sampleRate,
|
||||||
|
'tls_v1' => $this->tls,
|
||||||
|
'user_agent' => $this->userAgent,
|
||||||
|
];
|
||||||
|
}
|
||||||
|
}
|
81
src/Config/ConnectionConfig.php
Normal file
81
src/Config/ConnectionConfig.php
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Config;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The configuration object that holds the config status for a single Connection.
|
||||||
|
*
|
||||||
|
* @psalm-immutable
|
||||||
|
*/
|
||||||
|
final class ConnectionConfig
|
||||||
|
{
|
||||||
|
public function __construct(
|
||||||
|
// Whether or not authorization is required by nsqd.
|
||||||
|
public bool $authRequired,
|
||||||
|
|
||||||
|
// Whether deflate compression is enabled for this connection or not.
|
||||||
|
public bool $deflate,
|
||||||
|
// The deflate level. This value can be ignored if [deflate] is `false`.
|
||||||
|
public int $deflateLevel,
|
||||||
|
|
||||||
|
// The maximum deflate level supported by the server.
|
||||||
|
public int $maxDeflateLevel,
|
||||||
|
|
||||||
|
// The maximum value for message timeout.
|
||||||
|
public int $maxMsgTimeout,
|
||||||
|
/*
|
||||||
|
* Each nsqd is configurable with a max-rdy-count. If the consumer sends a RDY count that is outside
|
||||||
|
* of the acceptable range its connection will be forcefully closed.
|
||||||
|
*/
|
||||||
|
public int $maxRdyCount,
|
||||||
|
|
||||||
|
// The effective message timeout.
|
||||||
|
public int $msgTimeout,
|
||||||
|
|
||||||
|
// The size in bytes of the buffer nsqd will use when writing to this client.
|
||||||
|
public int $outputBufferSize,
|
||||||
|
// The timeout after which any data that nsqd has buffered will be flushed to this client.
|
||||||
|
public int $outputBufferTimeout,
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The sample rate for incoming data to deliver a percentage of all messages received to this connection.
|
||||||
|
* This only applies to subscribing connections. The valid range is between 0 and 99, where 0 means that all
|
||||||
|
* data is sent (this is the default). 1 means that 1% of the data is sent.
|
||||||
|
*/
|
||||||
|
public int $sampleRate,
|
||||||
|
|
||||||
|
// Whether snappy compression is enabled for this connection or not.
|
||||||
|
public bool $snappy,
|
||||||
|
|
||||||
|
// Whether TLS is enabled for this connection or not.
|
||||||
|
public bool $tls,
|
||||||
|
|
||||||
|
// The nsqd version.
|
||||||
|
public string $version,
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @phpstan-ignore-next-line
|
||||||
|
*/
|
||||||
|
public static function fromArray(array $array): self
|
||||||
|
{
|
||||||
|
return new self(
|
||||||
|
authRequired: $array['auth_required'],
|
||||||
|
deflate: $array['deflate'],
|
||||||
|
deflateLevel: $array['deflate_level'],
|
||||||
|
maxDeflateLevel: $array['max_deflate_level'],
|
||||||
|
maxMsgTimeout: $array['max_msg_timeout'],
|
||||||
|
maxRdyCount: $array['max_rdy_count'],
|
||||||
|
msgTimeout: $array['msg_timeout'],
|
||||||
|
outputBufferSize: $array['output_buffer_size'],
|
||||||
|
outputBufferTimeout: $array['output_buffer_timeout'],
|
||||||
|
sampleRate: $array['sample_rate'],
|
||||||
|
snappy: $array['snappy'],
|
||||||
|
tls: $array['tls_v1'],
|
||||||
|
version: $array['version'],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
@ -4,7 +4,9 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq;
|
namespace Nsq;
|
||||||
|
|
||||||
use Composer\InstalledVersions;
|
use Nsq\Config\ClientConfig;
|
||||||
|
use Nsq\Config\ConnectionConfig;
|
||||||
|
use Nsq\Exception\AuthenticationRequired;
|
||||||
use Nsq\Exception\ConnectionFail;
|
use Nsq\Exception\ConnectionFail;
|
||||||
use Nsq\Exception\UnexpectedResponse;
|
use Nsq\Exception\UnexpectedResponse;
|
||||||
use Nsq\Reconnect\ExponentialStrategy;
|
use Nsq\Reconnect\ExponentialStrategy;
|
||||||
@ -17,6 +19,7 @@ use Socket\Raw\Exception;
|
|||||||
use Socket\Raw\Factory;
|
use Socket\Raw\Factory;
|
||||||
use Socket\Raw\Socket;
|
use Socket\Raw\Socket;
|
||||||
use function addcslashes;
|
use function addcslashes;
|
||||||
|
use function http_build_query;
|
||||||
use function implode;
|
use function implode;
|
||||||
use function json_encode;
|
use function json_encode;
|
||||||
use function pack;
|
use function pack;
|
||||||
@ -37,38 +40,21 @@ abstract class Connection
|
|||||||
|
|
||||||
private ReconnectStrategy $reconnect;
|
private ReconnectStrategy $reconnect;
|
||||||
|
|
||||||
/**
|
private ClientConfig $clientConfig;
|
||||||
* @var array{
|
|
||||||
* client_id: string,
|
private ?ConnectionConfig $connectionConfig = null;
|
||||||
* hostname: string,
|
|
||||||
* user_agent: string,
|
|
||||||
* heartbeat_interval: int|null,
|
|
||||||
* }
|
|
||||||
*/
|
|
||||||
private array $features;
|
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
string $address,
|
string $address,
|
||||||
string $clientId = null,
|
ClientConfig $clientConfig = null,
|
||||||
string $hostname = null,
|
|
||||||
string $userAgent = null,
|
|
||||||
int $heartbeatInterval = null,
|
|
||||||
int $sampleRate = 0,
|
|
||||||
ReconnectStrategy $reconnectStrategy = null,
|
ReconnectStrategy $reconnectStrategy = null,
|
||||||
LoggerInterface $logger = null,
|
LoggerInterface $logger = null,
|
||||||
) {
|
) {
|
||||||
$this->address = $address;
|
$this->address = $address;
|
||||||
|
|
||||||
$this->features = [
|
|
||||||
'client_id' => $clientId ?? '',
|
|
||||||
'hostname' => $hostname ?? (static fn (mixed $h): string => \is_string($h) ? $h : '')(gethostname()),
|
|
||||||
'user_agent' => $userAgent ?? 'nsqphp/'.InstalledVersions::getPrettyVersion('nsq/nsq'),
|
|
||||||
'heartbeat_interval' => $heartbeatInterval,
|
|
||||||
'sample_rate' => $sampleRate,
|
|
||||||
];
|
|
||||||
|
|
||||||
$this->logger = $logger ?? new NullLogger();
|
$this->logger = $logger ?? new NullLogger();
|
||||||
$this->reconnect = $reconnectStrategy ?? new ExponentialStrategy(logger: $this->logger);
|
$this->reconnect = $reconnectStrategy ?? new ExponentialStrategy(logger: $this->logger);
|
||||||
|
$this->clientConfig = $clientConfig ?? new ClientConfig();
|
||||||
}
|
}
|
||||||
|
|
||||||
public function connect(): void
|
public function connect(): void
|
||||||
@ -87,9 +73,27 @@ abstract class Connection
|
|||||||
|
|
||||||
$this->socket->write(' V2');
|
$this->socket->write(' V2');
|
||||||
|
|
||||||
$body = json_encode($this->features, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
$body = json_encode($this->clientConfig, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
||||||
|
|
||||||
$this->command('IDENTIFY', data: $body)->response()->okOrFail();
|
$response = $this->command('IDENTIFY', data: $body)->response();
|
||||||
|
|
||||||
|
if ($this->clientConfig->featureNegotiation) {
|
||||||
|
$this->connectionConfig = ConnectionConfig::fromArray($response->toArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($this->connectionConfig->snappy || $this->connectionConfig->deflate) {
|
||||||
|
$this->response()->okOrFail();
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($this->connectionConfig->authRequired) {
|
||||||
|
if (null === $this->clientConfig->authSecret) {
|
||||||
|
throw new AuthenticationRequired('NSQ requires authorization, set ClientConfig::$authSecret before connecting');
|
||||||
|
}
|
||||||
|
|
||||||
|
$authResponse = $this->command('AUTH', data: $this->clientConfig->authSecret)->response()->toArray();
|
||||||
|
|
||||||
|
$this->logger->info('Authorization response: '.http_build_query($authResponse));
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -120,16 +124,6 @@ abstract class Connection
|
|||||||
return null !== $this->socket;
|
return null !== $this->socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @psalm-suppress PossiblyFalseOperand
|
|
||||||
*/
|
|
||||||
protected function auth(string $secret): string
|
|
||||||
{
|
|
||||||
$size = pack('N', \strlen($secret));
|
|
||||||
|
|
||||||
return 'AUTH'.PHP_EOL.$size.$secret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param array<int, int|string>|string $params
|
* @param array<int, int|string>|string $params
|
||||||
*/
|
*/
|
||||||
@ -176,9 +170,11 @@ abstract class Connection
|
|||||||
// @codeCoverageIgnoreEnd
|
// @codeCoverageIgnoreEnd
|
||||||
}
|
}
|
||||||
|
|
||||||
public function receive(float $timeout = 0): ?Response
|
public function receive(float $timeout = null): ?Response
|
||||||
{
|
{
|
||||||
$socket = $this->socket();
|
$socket = $this->socket();
|
||||||
|
|
||||||
|
$timeout ??= $this->clientConfig->readTimeout;
|
||||||
$deadline = microtime(true) + $timeout;
|
$deadline = microtime(true) + $timeout;
|
||||||
|
|
||||||
if (!$this->hasMessage($timeout)) {
|
if (!$this->hasMessage($timeout)) {
|
||||||
@ -226,7 +222,7 @@ abstract class Connection
|
|||||||
|
|
||||||
protected function response(): Response
|
protected function response(): Response
|
||||||
{
|
{
|
||||||
return $this->receive(1) ?? throw UnexpectedResponse::null();
|
return $this->receive() ?? throw UnexpectedResponse::null();
|
||||||
}
|
}
|
||||||
|
|
||||||
private function socket(): Socket
|
private function socket(): Socket
|
||||||
|
@ -30,11 +30,6 @@ final class Consumer extends Connection
|
|||||||
$this->rdy = $count;
|
$this->rdy = $count;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function consume(float $timeout): ?Message
|
|
||||||
{
|
|
||||||
return $this->receive($timeout)?->toMessage($this);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finish a message (indicate successful processing).
|
* Finish a message (indicate successful processing).
|
||||||
*/
|
*/
|
||||||
|
11
src/Exception/AuthenticationRequired.php
Normal file
11
src/Exception/AuthenticationRequired.php
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Exception;
|
||||||
|
|
||||||
|
use RuntimeException;
|
||||||
|
|
||||||
|
final class AuthenticationRequired extends RuntimeException implements NsqException
|
||||||
|
{
|
||||||
|
}
|
@ -7,6 +7,9 @@ namespace Nsq;
|
|||||||
use Nsq\Exception\NsqError;
|
use Nsq\Exception\NsqError;
|
||||||
use Nsq\Exception\UnexpectedResponse;
|
use Nsq\Exception\UnexpectedResponse;
|
||||||
use PHPinnacle\Buffer\ByteBuffer;
|
use PHPinnacle\Buffer\ByteBuffer;
|
||||||
|
use function json_decode;
|
||||||
|
use function sprintf;
|
||||||
|
use const JSON_THROW_ON_ERROR;
|
||||||
|
|
||||||
final class Response
|
final class Response
|
||||||
{
|
{
|
||||||
@ -50,6 +53,20 @@ final class Response
|
|||||||
return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes();
|
return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @phpstan-ignore-next-line
|
||||||
|
*/
|
||||||
|
public function toArray(): array
|
||||||
|
{
|
||||||
|
if (self::TYPE_RESPONSE !== $this->type) {
|
||||||
|
// @codeCoverageIgnoreStart
|
||||||
|
throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type));
|
||||||
|
// @codeCoverageIgnoreEnd
|
||||||
|
}
|
||||||
|
|
||||||
|
return json_decode($this->buffer->bytes(), true, flags: JSON_THROW_ON_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
public function toMessage(Consumer $reader): Message
|
public function toMessage(Consumer $reader): Message
|
||||||
{
|
{
|
||||||
if (self::TYPE_MESSAGE !== $this->type) {
|
if (self::TYPE_MESSAGE !== $this->type) {
|
||||||
|
@ -5,15 +5,10 @@ declare(strict_types=1);
|
|||||||
namespace Nsq;
|
namespace Nsq;
|
||||||
|
|
||||||
use Generator;
|
use Generator;
|
||||||
use InvalidArgumentException;
|
|
||||||
use function get_debug_type;
|
|
||||||
use function sprintf;
|
|
||||||
|
|
||||||
final class Subscriber
|
final class Subscriber
|
||||||
{
|
{
|
||||||
public const STOP = 0;
|
public const STOP = 0;
|
||||||
public const CHANGE_TIMEOUT = 1;
|
|
||||||
public const TIMEOUT = 2;
|
|
||||||
|
|
||||||
private Consumer $reader;
|
private Consumer $reader;
|
||||||
|
|
||||||
@ -25,32 +20,18 @@ final class Subscriber
|
|||||||
/**
|
/**
|
||||||
* @psalm-return Generator<int, Message|float|null, int|float|null, void>
|
* @psalm-return Generator<int, Message|float|null, int|float|null, void>
|
||||||
*/
|
*/
|
||||||
public function subscribe(string $topic, string $channel, float $timeout = 0): Generator
|
public function subscribe(string $topic, string $channel): Generator
|
||||||
{
|
{
|
||||||
$this->reader->sub($topic, $channel);
|
$this->reader->sub($topic, $channel);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
$this->reader->rdy(1);
|
$this->reader->rdy(1);
|
||||||
|
|
||||||
$command = yield $this->reader->consume($timeout);
|
$command = yield $this->reader->receive()?->toMessage($this->reader);
|
||||||
|
|
||||||
if (self::STOP === $command) {
|
if (self::STOP === $command) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (self::CHANGE_TIMEOUT === $command) {
|
|
||||||
$newTimeout = yield null;
|
|
||||||
|
|
||||||
if (!\is_float($newTimeout)) {
|
|
||||||
throw new InvalidArgumentException(sprintf('Timeout must be float, "%s" given.', get_debug_type($newTimeout)));
|
|
||||||
}
|
|
||||||
|
|
||||||
$timeout = $newTimeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (self::TIMEOUT === $command) {
|
|
||||||
yield $timeout;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->reader->disconnect();
|
$this->reader->disconnect();
|
||||||
|
@ -2,10 +2,12 @@
|
|||||||
|
|
||||||
declare(strict_types=1);
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
use Nsq\Config\ClientConfig;
|
||||||
use Nsq\Consumer;
|
use Nsq\Consumer;
|
||||||
use Nsq\Message;
|
use Nsq\Message;
|
||||||
use Nsq\Producer;
|
use Nsq\Producer;
|
||||||
use Nsq\Subscriber;
|
use Nsq\Subscriber;
|
||||||
|
use Nyholm\NSA;
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
|
||||||
final class NsqTest extends TestCase
|
final class NsqTest extends TestCase
|
||||||
@ -17,10 +19,13 @@ final class NsqTest extends TestCase
|
|||||||
|
|
||||||
$consumer = new Consumer(
|
$consumer = new Consumer(
|
||||||
address: 'tcp://localhost:4150',
|
address: 'tcp://localhost:4150',
|
||||||
|
clientConfig: new ClientConfig(
|
||||||
heartbeatInterval: 1000,
|
heartbeatInterval: 1000,
|
||||||
|
readTimeout: 1,
|
||||||
|
),
|
||||||
);
|
);
|
||||||
$subscriber = new Subscriber($consumer);
|
$subscriber = new Subscriber($consumer);
|
||||||
$generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__, 1);
|
$generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__);
|
||||||
|
|
||||||
/** @var null|Message $message */
|
/** @var null|Message $message */
|
||||||
$message = $generator->current();
|
$message = $generator->current();
|
||||||
@ -65,8 +70,13 @@ final class NsqTest extends TestCase
|
|||||||
$message = $generator->current();
|
$message = $generator->current();
|
||||||
self::assertNull($message);
|
self::assertNull($message);
|
||||||
|
|
||||||
$generator->send(Subscriber::CHANGE_TIMEOUT);
|
NSA::setProperty(
|
||||||
$generator->send(10.0);
|
NSA::getProperty($consumer, 'clientConfig'),
|
||||||
|
'readTimeout',
|
||||||
|
10,
|
||||||
|
);
|
||||||
|
|
||||||
|
$generator->next();
|
||||||
|
|
||||||
/** @var null|Message $message */
|
/** @var null|Message $message */
|
||||||
$message = $generator->current();
|
$message = $generator->current();
|
||||||
|
@ -1,45 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
use Nsq\Consumer;
|
|
||||||
use Nsq\Subscriber;
|
|
||||||
use PHPUnit\Framework\TestCase;
|
|
||||||
|
|
||||||
final class SubscriberTest extends TestCase
|
|
||||||
{
|
|
||||||
private Subscriber $subscriber;
|
|
||||||
|
|
||||||
protected function setUp(): void
|
|
||||||
{
|
|
||||||
$consumer = new Consumer(
|
|
||||||
address: 'tcp://localhost:4150',
|
|
||||||
);
|
|
||||||
|
|
||||||
$this->subscriber = new Subscriber($consumer);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testChangeInterval(): void
|
|
||||||
{
|
|
||||||
$generator = $this->subscriber->subscribe(__FUNCTION__, __FUNCTION__, 0.1);
|
|
||||||
|
|
||||||
self::assertSame(0.1, $generator->send(Subscriber::TIMEOUT));
|
|
||||||
$generator->next();
|
|
||||||
|
|
||||||
$generator->send(Subscriber::CHANGE_TIMEOUT);
|
|
||||||
$generator->send(0.2);
|
|
||||||
|
|
||||||
self::assertSame(0.2, $generator->send(Subscriber::TIMEOUT));
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testInvalidChangeInterval(): void
|
|
||||||
{
|
|
||||||
$this->expectException(InvalidArgumentException::class);
|
|
||||||
$this->expectExceptionMessage('Timeout must be float, "string" given.');
|
|
||||||
|
|
||||||
$generator = $this->subscriber->subscribe(__FUNCTION__, __FUNCTION__);
|
|
||||||
$generator->send(Subscriber::CHANGE_TIMEOUT);
|
|
||||||
// @phpstan-ignore-next-line
|
|
||||||
$generator->send('bla');
|
|
||||||
}
|
|
||||||
}
|
|
Reference in New Issue
Block a user