Compare commits
4 Commits
875cb8b542
...
443473f53d
Author | SHA1 | Date | |
---|---|---|---|
443473f53d | |||
57b7715c2e | |||
b126094e74 | |||
15296f4b61 |
@ -8,6 +8,9 @@ return (new PhpCsFixer\Config())
|
|||||||
'@PhpCsFixer:risky' => true,
|
'@PhpCsFixer:risky' => true,
|
||||||
'@PSR12' => true,
|
'@PSR12' => true,
|
||||||
'@PSR12:risky' => true,
|
'@PSR12:risky' => true,
|
||||||
|
'braces' => [
|
||||||
|
'allow_single_line_closure' => true,
|
||||||
|
],
|
||||||
'blank_line_before_statement' => [
|
'blank_line_before_statement' => [
|
||||||
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try'],
|
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try'],
|
||||||
],
|
],
|
||||||
|
@ -13,12 +13,13 @@
|
|||||||
"require": {
|
"require": {
|
||||||
"php": "^8.0.1",
|
"php": "^8.0.1",
|
||||||
"ext-json": "*",
|
"ext-json": "*",
|
||||||
"clue/socket-raw": "^1.5",
|
"amphp/socket": "^1.1",
|
||||||
"composer/semver": "^3.2",
|
"composer/semver": "^3.2",
|
||||||
"phpinnacle/buffer": "^1.2",
|
"phpinnacle/buffer": "^1.2",
|
||||||
"psr/log": "^1.1"
|
"psr/log": "^1.1"
|
||||||
},
|
},
|
||||||
"require-dev": {
|
"require-dev": {
|
||||||
|
"amphp/log": "^1.1",
|
||||||
"dg/bypass-finals": "^1.3",
|
"dg/bypass-finals": "^1.3",
|
||||||
"ergebnis/composer-normalize": "9999999-dev",
|
"ergebnis/composer-normalize": "9999999-dev",
|
||||||
"friendsofphp/php-cs-fixer": "^2.18",
|
"friendsofphp/php-cs-fixer": "^2.18",
|
||||||
|
@ -5,7 +5,8 @@ services:
|
|||||||
image: nsqio/nsq:v1.2.0
|
image: nsqio/nsq:v1.2.0
|
||||||
labels:
|
labels:
|
||||||
ru.grachevko.dhu: 'nsqd'
|
ru.grachevko.dhu: 'nsqd'
|
||||||
command: /nsqd
|
command: /nsqd -log-level debug
|
||||||
|
# command: /nsqd
|
||||||
ports:
|
ports:
|
||||||
- 4150:4150
|
- 4150:4150
|
||||||
- 4151:4151
|
- 4151:4151
|
||||||
|
49
examples/consumer.php
Normal file
49
examples/consumer.php
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
require __DIR__.'/../vendor/autoload.php';
|
||||||
|
|
||||||
|
use Amp\ByteStream;
|
||||||
|
use Amp\Log\ConsoleFormatter;
|
||||||
|
use Amp\Log\StreamHandler;
|
||||||
|
use Amp\Loop;
|
||||||
|
use Amp\Promise;
|
||||||
|
use Amp\Success;
|
||||||
|
use Monolog\Logger;
|
||||||
|
use Monolog\Processor\PsrLogMessageProcessor;
|
||||||
|
use Nsq\Config\ClientConfig;
|
||||||
|
use Nsq\Consumer;
|
||||||
|
use Nsq\Protocol\Message;
|
||||||
|
use function Amp\call;
|
||||||
|
|
||||||
|
Loop::run(static function () {
|
||||||
|
$handler = new StreamHandler(ByteStream\getStdout());
|
||||||
|
$handler->setFormatter(new ConsoleFormatter());
|
||||||
|
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
|
||||||
|
|
||||||
|
$consumer = new Consumer(
|
||||||
|
'tcp://localhost:4150',
|
||||||
|
clientConfig: new ClientConfig(
|
||||||
|
deflate: false,
|
||||||
|
snappy: false,
|
||||||
|
),
|
||||||
|
logger: $logger,
|
||||||
|
);
|
||||||
|
|
||||||
|
yield $consumer->connect();
|
||||||
|
|
||||||
|
yield $consumer->listen(
|
||||||
|
topic: 'local',
|
||||||
|
channel: 'local',
|
||||||
|
onMessage: static function (Message $message) use ($logger): Promise {
|
||||||
|
return call(function () use ($message, $logger): Generator {
|
||||||
|
$logger->info('Received: {body}', ['body' => $message->body]);
|
||||||
|
|
||||||
|
yield $message->finish();
|
||||||
|
|
||||||
|
return new Success(false);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
);
|
||||||
|
});
|
33
examples/producer.php
Normal file
33
examples/producer.php
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
require __DIR__.'/../vendor/autoload.php';
|
||||||
|
|
||||||
|
use Amp\ByteStream;
|
||||||
|
use Amp\Log\ConsoleFormatter;
|
||||||
|
use Amp\Log\StreamHandler;
|
||||||
|
use Amp\Loop;
|
||||||
|
use Monolog\Logger;
|
||||||
|
use Monolog\Processor\PsrLogMessageProcessor;
|
||||||
|
use Nsq\Config\ClientConfig;
|
||||||
|
use Nsq\Producer;
|
||||||
|
|
||||||
|
Loop::run(static function () {
|
||||||
|
$handler = new StreamHandler(ByteStream\getStdout());
|
||||||
|
$handler->setFormatter(new ConsoleFormatter());
|
||||||
|
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
|
||||||
|
|
||||||
|
$producer = new Producer(
|
||||||
|
'tcp://localhost:4150',
|
||||||
|
clientConfig: new ClientConfig(
|
||||||
|
deflate: false,
|
||||||
|
snappy: false,
|
||||||
|
),
|
||||||
|
logger: $logger,
|
||||||
|
);
|
||||||
|
|
||||||
|
yield $producer->connect();
|
||||||
|
|
||||||
|
yield $producer->pub(topic: 'topic', body: 'Message body!');
|
||||||
|
});
|
@ -1,68 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq;
|
|
||||||
|
|
||||||
use PHPinnacle\Buffer\ByteBuffer;
|
|
||||||
|
|
||||||
final class Buffer
|
|
||||||
{
|
|
||||||
private ByteBuffer $buffer;
|
|
||||||
|
|
||||||
public function __construct(string $initial = '')
|
|
||||||
{
|
|
||||||
$this->buffer = new ByteBuffer($initial);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function append(string $data): self
|
|
||||||
{
|
|
||||||
$this->buffer->append($data);
|
|
||||||
|
|
||||||
return $this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function consumeSize(): int
|
|
||||||
{
|
|
||||||
/** @see Bytes::BYTES_SIZE */
|
|
||||||
return $this->buffer->consumeUint32();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function consumeType(): int
|
|
||||||
{
|
|
||||||
/** @see Bytes::BYTES_TYPE */
|
|
||||||
return $this->buffer->consumeUint32();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function consumeTimestamp(): int
|
|
||||||
{
|
|
||||||
/** @see Bytes::BYTES_TIMESTAMP */
|
|
||||||
return $this->buffer->consumeInt64();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function consumeAttempts(): int
|
|
||||||
{
|
|
||||||
/** @see Bytes::BYTES_ATTEMPTS */
|
|
||||||
return $this->buffer->consumeUint16();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function consumeId(): string
|
|
||||||
{
|
|
||||||
return $this->buffer->consume(Bytes::BYTES_ID);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function size(): int
|
|
||||||
{
|
|
||||||
return $this->buffer->size();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function bytes(): string
|
|
||||||
{
|
|
||||||
return $this->buffer->bytes();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function flush(): string
|
|
||||||
{
|
|
||||||
return $this->buffer->flush();
|
|
||||||
}
|
|
||||||
}
|
|
@ -4,11 +4,18 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq;
|
namespace Nsq;
|
||||||
|
|
||||||
|
use Amp\ByteStream\InputStream;
|
||||||
|
use Amp\ByteStream\OutputStream;
|
||||||
|
use Amp\ByteStream\ZlibInputStream;
|
||||||
|
use Amp\ByteStream\ZlibOutputStream;
|
||||||
|
use Amp\Failure;
|
||||||
|
use Amp\Promise;
|
||||||
|
use Amp\Socket\Socket;
|
||||||
|
use Amp\Success;
|
||||||
use Nsq\Config\ClientConfig;
|
use Nsq\Config\ClientConfig;
|
||||||
use Nsq\Config\ConnectionConfig;
|
use Nsq\Config\ConnectionConfig;
|
||||||
use Nsq\Exception\AuthenticationRequired;
|
use Nsq\Exception\AuthenticationRequired;
|
||||||
use Nsq\Exception\BadResponse;
|
use Nsq\Exception\BadResponse;
|
||||||
use Nsq\Exception\ConnectionFail;
|
|
||||||
use Nsq\Exception\NotConnected;
|
use Nsq\Exception\NotConnected;
|
||||||
use Nsq\Exception\NsqError;
|
use Nsq\Exception\NsqError;
|
||||||
use Nsq\Exception\NsqException;
|
use Nsq\Exception\NsqException;
|
||||||
@ -16,201 +23,235 @@ use Nsq\Protocol\Error;
|
|||||||
use Nsq\Protocol\Frame;
|
use Nsq\Protocol\Frame;
|
||||||
use Nsq\Protocol\Message;
|
use Nsq\Protocol\Message;
|
||||||
use Nsq\Protocol\Response;
|
use Nsq\Protocol\Response;
|
||||||
use Nsq\Socket\DeflateSocket;
|
use Nsq\Stream\NsqInputStream;
|
||||||
use Nsq\Socket\RawSocket;
|
use Nsq\Stream\NullStream;
|
||||||
use Nsq\Socket\SnappySocket;
|
use Nsq\Stream\SnappyInputStream;
|
||||||
use Psr\Log\LoggerAwareTrait;
|
use Nsq\Stream\SnappyOutputStream;
|
||||||
|
use PHPinnacle\Buffer\ByteBuffer;
|
||||||
use Psr\Log\LoggerInterface;
|
use Psr\Log\LoggerInterface;
|
||||||
use Psr\Log\NullLogger;
|
use Psr\Log\NullLogger;
|
||||||
|
use function Amp\call;
|
||||||
|
use function Amp\Socket\connect;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @internal
|
* @internal
|
||||||
*/
|
*/
|
||||||
abstract class Connection
|
abstract class Connection
|
||||||
{
|
{
|
||||||
use LoggerAwareTrait;
|
private ?Socket $socket = null;
|
||||||
|
|
||||||
|
private InputStream $inputStream;
|
||||||
|
|
||||||
|
private OutputStream $outputStream;
|
||||||
|
|
||||||
|
private ByteBuffer $buffer;
|
||||||
|
|
||||||
|
protected ?ConnectionConfig $connectionConfig = null;
|
||||||
|
|
||||||
protected ClientConfig $clientConfig;
|
protected ClientConfig $clientConfig;
|
||||||
|
|
||||||
private NsqSocket $socket;
|
protected LoggerInterface $logger;
|
||||||
|
|
||||||
private ConnectionConfig $connectionConfig;
|
final public function __construct(
|
||||||
|
|
||||||
private bool $closed = false;
|
|
||||||
|
|
||||||
public function __construct(
|
|
||||||
private string $address,
|
private string $address,
|
||||||
ClientConfig $clientConfig = null,
|
ClientConfig $clientConfig = null,
|
||||||
LoggerInterface $logger = null,
|
?LoggerInterface $logger = null,
|
||||||
) {
|
) {
|
||||||
$this->logger = $logger ?? new NullLogger();
|
$this->buffer = new ByteBuffer();
|
||||||
|
$this->inputStream = $this->outputStream = new NullStream();
|
||||||
$this->clientConfig = $clientConfig ?? new ClientConfig();
|
$this->clientConfig = $clientConfig ?? new ClientConfig();
|
||||||
|
$this->logger = $logger ?? new NullLogger();
|
||||||
|
}
|
||||||
|
|
||||||
$socket = new RawSocket($this->address, $this->logger);
|
public function __destruct()
|
||||||
$socket->write(' V2');
|
{
|
||||||
|
$this->close();
|
||||||
|
}
|
||||||
|
|
||||||
$this->socket = new NsqSocket($socket);
|
/**
|
||||||
|
* @return Promise<void>
|
||||||
|
*/
|
||||||
|
public function connect(): Promise
|
||||||
|
{
|
||||||
|
return call(function (): \Generator {
|
||||||
|
$this->socket = $this->outputStream = yield connect($this->address);
|
||||||
|
$this->inputStream = new NsqInputStream($this->socket);
|
||||||
|
|
||||||
$this->connectionConfig = ConnectionConfig::fromArray(
|
yield $this->outputStream->write(' V2');
|
||||||
$this
|
|
||||||
->command('IDENTIFY', data: $this->clientConfig->toString())
|
|
||||||
->readResponse()
|
|
||||||
->toArray()
|
|
||||||
);
|
|
||||||
|
|
||||||
if ($this->connectionConfig->snappy) {
|
yield $this->command('IDENTIFY', data: $this->clientConfig->toString());
|
||||||
$this->socket = new NsqSocket(
|
/** @var Response $response */
|
||||||
new SnappySocket(
|
$response = yield $this->readResponse();
|
||||||
$socket,
|
$this->connectionConfig = ConnectionConfig::fromArray($response->toArray());
|
||||||
$this->logger,
|
|
||||||
),
|
|
||||||
);
|
|
||||||
|
|
||||||
$this->checkIsOK();
|
if ($this->connectionConfig->snappy) {
|
||||||
}
|
$this->inputStream = new NsqInputStream(
|
||||||
|
new SnappyInputStream($this->inputStream, $this->logger),
|
||||||
|
);
|
||||||
|
$this->outputStream = new SnappyOutputStream($this->outputStream);
|
||||||
|
|
||||||
if ($this->connectionConfig->deflate) {
|
$this->checkIsOK();
|
||||||
$this->socket = new NsqSocket(
|
|
||||||
new DeflateSocket(
|
|
||||||
$socket,
|
|
||||||
),
|
|
||||||
);
|
|
||||||
|
|
||||||
$this->checkIsOK();
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($this->connectionConfig->authRequired) {
|
|
||||||
if (null === $this->clientConfig->authSecret) {
|
|
||||||
throw new AuthenticationRequired();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$authResponse = $this
|
if ($this->connectionConfig->deflate) {
|
||||||
->command('AUTH', data: $this->clientConfig->authSecret)
|
$this->inputStream = new NsqInputStream(
|
||||||
->readResponse()
|
new ZlibInputStream($this->socket, ZLIB_ENCODING_DEFLATE, [
|
||||||
->toArray()
|
'level' => $this->connectionConfig->deflateLevel,
|
||||||
;
|
]),
|
||||||
|
);
|
||||||
|
$this->outputStream = new ZlibOutputStream($this->socket, ZLIB_ENCODING_DEFLATE, [
|
||||||
|
'level' => $this->connectionConfig->deflateLevel,
|
||||||
|
]);
|
||||||
|
|
||||||
$this->logger->info('Authorization response: '.http_build_query($authResponse));
|
$this->checkIsOK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ($this->connectionConfig->authRequired) {
|
||||||
|
if (null === $this->clientConfig->authSecret) {
|
||||||
|
yield $this->close();
|
||||||
|
|
||||||
|
throw new AuthenticationRequired();
|
||||||
|
}
|
||||||
|
|
||||||
|
yield $this->command('AUTH', data: $this->clientConfig->authSecret);
|
||||||
|
$response = yield $this->readResponse();
|
||||||
|
|
||||||
|
$this->logger->info('Authorization response: '.http_build_query($response->toArray()));
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cleanly close your connection (no more messages are sent).
|
* Cleanly close your connection (no more messages are sent).
|
||||||
|
*
|
||||||
|
* @return Promise<void>
|
||||||
*/
|
*/
|
||||||
public function close(): void
|
public function close(): Promise
|
||||||
{
|
{
|
||||||
if ($this->closed) {
|
if (null === $this->socket) {
|
||||||
return;
|
return new Success();
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
return call(function (): \Generator {
|
||||||
$this->command('CLS');
|
yield $this->command('CLS');
|
||||||
$this->socket->close();
|
|
||||||
} catch (\Throwable $e) {
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->closed = true;
|
if (null !== $this->socket) {
|
||||||
|
$this->socket->close();
|
||||||
|
|
||||||
|
$this->socket = null;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public function isClosed(): bool
|
public function isClosed(): bool
|
||||||
{
|
{
|
||||||
return $this->closed;
|
return null === $this->socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param array<int, int|string>|string $params
|
* @param array<int, int|string>|string $params
|
||||||
|
*
|
||||||
|
* @return Promise<void>
|
||||||
*/
|
*/
|
||||||
protected function command(string $command, array | string $params = [], string $data = null): self
|
protected function command(string $command, array | string $params = [], string $data = null): Promise
|
||||||
{
|
{
|
||||||
if ($this->closed) {
|
if (null === $this->socket) {
|
||||||
throw new NotConnected('Connection closed.');
|
return new Failure(new NotConnected());
|
||||||
}
|
}
|
||||||
|
|
||||||
$command = [] === $params
|
$command = implode(' ', [$command, ...((array) $params)]);
|
||||||
? $command
|
|
||||||
: implode(' ', [$command, ...((array) $params)]);
|
|
||||||
|
|
||||||
$this->logger->info('Command [{command}] with data [{data}]', ['command' => $command, 'data' => $data ?? 'null']);
|
$buffer = $this->buffer->append($command.PHP_EOL);
|
||||||
|
|
||||||
$this->socket->write($command, $data);
|
if (null !== $data) {
|
||||||
|
$buffer->appendUint32(\strlen($data));
|
||||||
|
$buffer->append($data);
|
||||||
|
}
|
||||||
|
|
||||||
return $this;
|
$this->logger->debug('Sending: {bytes}', ['bytes' => $buffer->bytes()]);
|
||||||
|
|
||||||
|
return $this->outputStream->write($buffer->flush());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function hasMessage(float $timeout): bool
|
/**
|
||||||
|
* @return Promise<Frame>
|
||||||
|
*/
|
||||||
|
protected function readFrame(): Promise
|
||||||
{
|
{
|
||||||
if ($this->closed) {
|
return call(function (): \Generator {
|
||||||
throw new NotConnected('Connection closed.');
|
$bytes = yield $this->inputStream->read();
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
$this->logger->debug('Receiving: {bytes}', ['bytes' => $bytes]);
|
||||||
return false !== $this->socket->wait($timeout);
|
|
||||||
} catch (ConnectionFail $e) {
|
|
||||||
$this->close();
|
|
||||||
|
|
||||||
throw $e;
|
if (null === $bytes) {
|
||||||
}
|
throw new NotConnected();
|
||||||
}
|
|
||||||
|
|
||||||
protected function readFrame(): Frame
|
|
||||||
{
|
|
||||||
if ($this->closed) {
|
|
||||||
throw new NotConnected('Connection closed.');
|
|
||||||
}
|
|
||||||
|
|
||||||
$buffer = $this->socket->read();
|
|
||||||
|
|
||||||
$this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL));
|
|
||||||
|
|
||||||
return match ($type = $buffer->consumeType()) {
|
|
||||||
0 => new Response($buffer->flush()),
|
|
||||||
1 => new Error($buffer->flush()),
|
|
||||||
2 => new Message(
|
|
||||||
timestamp: $buffer->consumeTimestamp(),
|
|
||||||
attempts: $buffer->consumeAttempts(),
|
|
||||||
id: $buffer->consumeId(),
|
|
||||||
body: $buffer->flush(),
|
|
||||||
consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'),
|
|
||||||
),
|
|
||||||
default => throw new NsqException('Unexpected frame type: '.$type)
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
protected function checkIsOK(): void
|
|
||||||
{
|
|
||||||
$response = $this->readResponse();
|
|
||||||
|
|
||||||
if ($response->isHeartBeat()) {
|
|
||||||
$this->command('NOP');
|
|
||||||
|
|
||||||
$this->checkIsOK();
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!$response->isOk()) {
|
|
||||||
throw new BadResponse($response);
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->logger->info('Ok checked.');
|
|
||||||
}
|
|
||||||
|
|
||||||
private function readResponse(): Response
|
|
||||||
{
|
|
||||||
$frame = $this->readFrame();
|
|
||||||
|
|
||||||
if ($frame instanceof Response) {
|
|
||||||
return $frame;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($frame instanceof Error) {
|
|
||||||
if ($frame->type->terminateConnection) {
|
|
||||||
$this->close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new NsqError($frame);
|
$buffer = $this->buffer->append($bytes);
|
||||||
}
|
|
||||||
|
|
||||||
throw new NsqException('Unreachable statement.');
|
$frame = match ($type = $buffer->consumeUint32()) {
|
||||||
|
0 => new Response($buffer->flush()),
|
||||||
|
1 => new Error($buffer->flush()),
|
||||||
|
2 => new Message(
|
||||||
|
timestamp: $buffer->consumeInt64(),
|
||||||
|
attempts: $buffer->consumeUint16(),
|
||||||
|
id: $buffer->consume(Bytes::BYTES_ID),
|
||||||
|
body: $buffer->flush(),
|
||||||
|
consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'),
|
||||||
|
),
|
||||||
|
default => throw new NsqException('Unexpected frame type: '.$type)
|
||||||
|
};
|
||||||
|
|
||||||
|
if ($frame instanceof Response && $frame->isHeartBeat()) {
|
||||||
|
yield $this->command('NOP');
|
||||||
|
|
||||||
|
return $this->readFrame();
|
||||||
|
}
|
||||||
|
|
||||||
|
return $frame;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Promise<void>
|
||||||
|
*/
|
||||||
|
protected function checkIsOK(): Promise
|
||||||
|
{
|
||||||
|
return call(function (): \Generator {
|
||||||
|
/** @var Response $response */
|
||||||
|
$response = yield $this->readResponse();
|
||||||
|
|
||||||
|
if (!$response->isOk()) {
|
||||||
|
throw new BadResponse($response);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->logger->debug('Ok checked.');
|
||||||
|
|
||||||
|
return call(static function (): void {});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Promise<Response>
|
||||||
|
*/
|
||||||
|
private function readResponse(): Promise
|
||||||
|
{
|
||||||
|
return call(function (): \Generator {
|
||||||
|
$frame = yield $this->readFrame();
|
||||||
|
|
||||||
|
if ($frame instanceof Error) {
|
||||||
|
if ($frame->type->terminateConnection) {
|
||||||
|
$this->close();
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new NsqError($frame);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$frame instanceof Response) {
|
||||||
|
throw new NsqException('Unreachable statement.');
|
||||||
|
}
|
||||||
|
|
||||||
|
return $frame;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
145
src/Consumer.php
145
src/Consumer.php
@ -4,108 +4,108 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq;
|
namespace Nsq;
|
||||||
|
|
||||||
use Generator;
|
use Amp\Failure;
|
||||||
use Nsq\Config\ClientConfig;
|
use Amp\Promise;
|
||||||
|
use Amp\Success;
|
||||||
use Nsq\Exception\NsqError;
|
use Nsq\Exception\NsqError;
|
||||||
use Nsq\Exception\NsqException;
|
use Nsq\Exception\NsqException;
|
||||||
use Nsq\Protocol\Error;
|
use Nsq\Protocol\Error;
|
||||||
use Nsq\Protocol\Message;
|
use Nsq\Protocol\Message;
|
||||||
use Nsq\Protocol\Response;
|
use Nsq\Protocol\Response;
|
||||||
use Psr\Log\LoggerInterface;
|
use function Amp\asyncCall;
|
||||||
|
use function Amp\call;
|
||||||
|
|
||||||
final class Consumer extends Connection
|
final class Consumer extends Connection
|
||||||
{
|
{
|
||||||
private int $rdy = 0;
|
private int $rdy = 0;
|
||||||
|
|
||||||
public function __construct(
|
/**
|
||||||
private string $topic,
|
* @return Promise<void>
|
||||||
private string $channel,
|
*/
|
||||||
string $address,
|
public function listen(
|
||||||
ClientConfig $clientConfig = null,
|
string $topic,
|
||||||
LoggerInterface $logger = null
|
string $channel,
|
||||||
) {
|
callable $onMessage,
|
||||||
parent::__construct($address, $clientConfig, $logger);
|
): Promise {
|
||||||
|
return call(function () use ($topic, $channel, $onMessage): \Generator {
|
||||||
|
yield $this->command('SUB', [$topic, $channel]);
|
||||||
|
yield $this->checkIsOK();
|
||||||
|
|
||||||
|
asyncCall(function () use ($onMessage): \Generator {
|
||||||
|
yield $this->rdy(2500);
|
||||||
|
|
||||||
|
while ($message = yield $this->readMessage()) {
|
||||||
|
$command = yield $onMessage($message);
|
||||||
|
|
||||||
|
if (true === $command) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($this->rdy < 1000) {
|
||||||
|
yield $this->rdy(2500);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Success();
|
||||||
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @psalm-return Generator<int, Message|float|null, int|null, void>
|
* @return Promise<Message>
|
||||||
*/
|
*/
|
||||||
public function generator(): \Generator
|
public function readMessage(): Promise
|
||||||
{
|
{
|
||||||
$this->command('SUB', [$this->topic, $this->channel])->checkIsOK();
|
return call(function (): \Generator {
|
||||||
|
$frame = yield $this->readFrame();
|
||||||
|
|
||||||
while (true) {
|
if ($frame instanceof Message) {
|
||||||
$this->rdy(1);
|
return $frame;
|
||||||
|
|
||||||
$timeout = $this->clientConfig->readTimeout;
|
|
||||||
|
|
||||||
do {
|
|
||||||
$deadline = microtime(true) + $timeout;
|
|
||||||
|
|
||||||
$message = $this->hasMessage($timeout) ? $this->readMessage() : null;
|
|
||||||
|
|
||||||
$timeout = ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime;
|
|
||||||
} while (0 < $timeout && null === $message);
|
|
||||||
|
|
||||||
$command = yield $message;
|
|
||||||
|
|
||||||
if (0 === $command) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->close();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function readMessage(): ?Message
|
|
||||||
{
|
|
||||||
$frame = $this->readFrame();
|
|
||||||
|
|
||||||
if ($frame instanceof Message) {
|
|
||||||
return $frame;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($frame instanceof Response && $frame->isHeartBeat()) {
|
|
||||||
$this->command('NOP');
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($frame instanceof Error) {
|
|
||||||
if ($frame->type->terminateConnection) {
|
|
||||||
$this->close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new NsqError($frame);
|
if ($frame instanceof Error) {
|
||||||
}
|
if ($frame->type->terminateConnection) {
|
||||||
|
yield $this->close();
|
||||||
|
}
|
||||||
|
|
||||||
throw new NsqException('Unreachable statement.');
|
throw new NsqError($frame);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new NsqException('Unreachable statement.');
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update RDY state (indicate you are ready to receive N messages).
|
* Update RDY state (indicate you are ready to receive N messages).
|
||||||
|
*
|
||||||
|
* @return Promise<void>
|
||||||
*/
|
*/
|
||||||
public function rdy(int $count): void
|
public function rdy(int $count): Promise
|
||||||
{
|
{
|
||||||
if ($this->rdy === $count) {
|
if ($this->rdy === $count) {
|
||||||
return;
|
return call(static function (): void {});
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->command('RDY', (string) $count);
|
|
||||||
|
|
||||||
$this->rdy = $count;
|
$this->rdy = $count;
|
||||||
|
|
||||||
|
return $this->command('RDY', (string) $count);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finish a message (indicate successful processing).
|
* Finish a message (indicate successful processing).
|
||||||
*
|
*
|
||||||
|
* @return Promise<void>
|
||||||
|
*
|
||||||
* @internal
|
* @internal
|
||||||
*/
|
*/
|
||||||
public function fin(string $id): void
|
public function fin(string $id): Promise
|
||||||
{
|
{
|
||||||
$this->command('FIN', $id);
|
$promise = $this->command('FIN', $id);
|
||||||
|
$promise->onResolve(function (): void {
|
||||||
|
--$this->rdy;
|
||||||
|
});
|
||||||
|
|
||||||
--$this->rdy;
|
return $promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -114,22 +114,29 @@ final class Consumer extends Connection
|
|||||||
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
|
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
|
||||||
* behaves identically to an explicit REQ.
|
* behaves identically to an explicit REQ.
|
||||||
*
|
*
|
||||||
|
* @return Promise<void>
|
||||||
|
*
|
||||||
* @internal
|
* @internal
|
||||||
*/
|
*/
|
||||||
public function req(string $id, int $timeout): void
|
public function req(string $id, int $timeout): Promise
|
||||||
{
|
{
|
||||||
$this->command('REQ', [$id, $timeout]);
|
$promise = $this->command('REQ', [$id, $timeout]);
|
||||||
|
$promise->onResolve(function (): void {
|
||||||
|
--$this->rdy;
|
||||||
|
});
|
||||||
|
|
||||||
--$this->rdy;
|
return $promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reset the timeout for an in-flight message.
|
* Reset the timeout for an in-flight message.
|
||||||
*
|
*
|
||||||
|
* @return Promise<void>
|
||||||
|
*
|
||||||
* @internal
|
* @internal
|
||||||
*/
|
*/
|
||||||
public function touch(string $id): void
|
public function touch(string $id): Promise
|
||||||
{
|
{
|
||||||
$this->command('TOUCH', $id);
|
return $this->command('TOUCH', $id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,9 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Exception;
|
|
||||||
|
|
||||||
final class NullReceived extends NsqException
|
|
||||||
{
|
|
||||||
}
|
|
@ -1,78 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq;
|
|
||||||
|
|
||||||
use Nsq\Exception\ConnectionFail;
|
|
||||||
use Nsq\Socket\Socket;
|
|
||||||
use PHPinnacle\Buffer\ByteBuffer;
|
|
||||||
use Throwable;
|
|
||||||
|
|
||||||
final class NsqSocket
|
|
||||||
{
|
|
||||||
private Buffer $input;
|
|
||||||
|
|
||||||
private ByteBuffer $output;
|
|
||||||
|
|
||||||
public function __construct(
|
|
||||||
private Socket $socket,
|
|
||||||
) {
|
|
||||||
$this->input = new Buffer();
|
|
||||||
$this->output = new ByteBuffer();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function write(string $command, string $data = null): void
|
|
||||||
{
|
|
||||||
$this->output->append($command.PHP_EOL);
|
|
||||||
|
|
||||||
if (null !== $data) {
|
|
||||||
$this->output->appendUint32(\strlen($data));
|
|
||||||
$this->output->append($data);
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->socket->write($this->output->flush());
|
|
||||||
}
|
|
||||||
|
|
||||||
public function wait(float $timeout): bool
|
|
||||||
{
|
|
||||||
return $this->socket->selectRead($timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function read(): Buffer
|
|
||||||
{
|
|
||||||
$buffer = $this->input;
|
|
||||||
|
|
||||||
$size = Bytes::BYTES_SIZE;
|
|
||||||
|
|
||||||
do {
|
|
||||||
$buffer->append(
|
|
||||||
$this->socket->read($size),
|
|
||||||
);
|
|
||||||
|
|
||||||
$size -= $buffer->size();
|
|
||||||
} while ($buffer->size() < Bytes::BYTES_SIZE);
|
|
||||||
|
|
||||||
if ('' === $buffer->bytes()) {
|
|
||||||
throw new ConnectionFail('Probably connection closed.');
|
|
||||||
}
|
|
||||||
|
|
||||||
$size = $buffer->consumeSize();
|
|
||||||
|
|
||||||
do {
|
|
||||||
$buffer->append(
|
|
||||||
$this->socket->read($size - $buffer->size()),
|
|
||||||
);
|
|
||||||
} while ($buffer->size() < $size);
|
|
||||||
|
|
||||||
return $buffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function close(): void
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
$this->socket->close();
|
|
||||||
} catch (Throwable) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -4,38 +4,56 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq;
|
namespace Nsq;
|
||||||
|
|
||||||
|
use Amp\Promise;
|
||||||
use PHPinnacle\Buffer\ByteBuffer;
|
use PHPinnacle\Buffer\ByteBuffer;
|
||||||
|
use function Amp\call;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @psalm-suppress PropertyNotSetInConstructor
|
* @psalm-suppress PropertyNotSetInConstructor
|
||||||
*/
|
*/
|
||||||
final class Producer extends Connection
|
final class Producer extends Connection
|
||||||
{
|
{
|
||||||
public function pub(string $topic, string $body): void
|
/**
|
||||||
|
* @return Promise<void>
|
||||||
|
*/
|
||||||
|
public function pub(string $topic, string $body): Promise
|
||||||
{
|
{
|
||||||
$this->command('PUB', $topic, $body)->checkIsOK();
|
return call(function () use ($topic, $body): \Generator {
|
||||||
|
yield $this->command('PUB', $topic, $body);
|
||||||
|
yield $this->checkIsOK();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @psalm-param array<int, mixed> $bodies
|
* @psalm-param array<int, mixed> $bodies
|
||||||
|
*
|
||||||
|
* @return Promise<void>
|
||||||
*/
|
*/
|
||||||
public function mpub(string $topic, array $bodies): void
|
public function mpub(string $topic, array $bodies): Promise
|
||||||
{
|
{
|
||||||
static $buffer;
|
return call(function () use ($topic, $bodies): \Generator {
|
||||||
$buffer ??= new ByteBuffer();
|
$buffer = new ByteBuffer();
|
||||||
|
|
||||||
$buffer->appendUint32(\count($bodies));
|
$buffer->appendUint32(\count($bodies));
|
||||||
|
|
||||||
foreach ($bodies as $body) {
|
foreach ($bodies as $body) {
|
||||||
$buffer->appendUint32(\strlen($body));
|
$buffer->appendUint32(\strlen($body));
|
||||||
$buffer->append($body);
|
$buffer->append($body);
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->command('MPUB', $topic, $buffer->flush())->checkIsOK();
|
yield $this->command('MPUB', $topic, $buffer->flush());
|
||||||
|
yield $this->checkIsOK();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public function dpub(string $topic, string $body, int $delay): void
|
/**
|
||||||
|
* @return Promise<void>
|
||||||
|
*/
|
||||||
|
public function dpub(string $topic, string $body, int $delay): Promise
|
||||||
{
|
{
|
||||||
$this->command('DPUB', [$topic, $delay], $body)->checkIsOK();
|
return call(function () use ($topic, $body, $delay): \Generator {
|
||||||
|
yield $this->command('DPUB', [$topic, $delay], $body);
|
||||||
|
yield $this->checkIsOK();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,8 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq\Protocol;
|
namespace Nsq\Protocol;
|
||||||
|
|
||||||
|
use Amp\Failure;
|
||||||
|
use Amp\Promise;
|
||||||
use Nsq\Bytes;
|
use Nsq\Bytes;
|
||||||
use Nsq\Consumer;
|
use Nsq\Consumer;
|
||||||
use Nsq\Exception\MessageAlreadyFinished;
|
use Nsq\Exception\MessageAlreadyFinished;
|
||||||
@ -57,32 +59,43 @@ final class Message extends Frame
|
|||||||
return $this->finished;
|
return $this->finished;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function finish(): void
|
/**
|
||||||
|
* @return Promise<void>
|
||||||
|
*/
|
||||||
|
public function finish(): Promise
|
||||||
{
|
{
|
||||||
if ($this->finished) {
|
if ($this->finished) {
|
||||||
throw MessageAlreadyFinished::finish($this);
|
return new Failure(MessageAlreadyFinished::finish($this));
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->consumer->fin($this->id);
|
|
||||||
$this->finished = true;
|
$this->finished = true;
|
||||||
|
|
||||||
|
return $this->consumer->fin($this->id);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function requeue(int $timeout): void
|
/**
|
||||||
|
* @return Promise<void>
|
||||||
|
*/
|
||||||
|
public function requeue(int $timeout): Promise
|
||||||
{
|
{
|
||||||
if ($this->finished) {
|
if ($this->finished) {
|
||||||
throw MessageAlreadyFinished::requeue($this);
|
return new Failure(MessageAlreadyFinished::requeue($this));
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->consumer->req($this->id, $timeout);
|
|
||||||
$this->finished = true;
|
$this->finished = true;
|
||||||
|
|
||||||
|
return $this->consumer->req($this->id, $timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function touch(): void
|
/**
|
||||||
|
* @return Promise<void>
|
||||||
|
*/
|
||||||
|
public function touch(): Promise
|
||||||
{
|
{
|
||||||
if ($this->finished) {
|
if ($this->finished) {
|
||||||
throw MessageAlreadyFinished::touch($this);
|
return new Failure(MessageAlreadyFinished::touch($this));
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->consumer->touch($this->id);
|
return $this->consumer->touch($this->id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,62 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Reconnect;
|
|
||||||
|
|
||||||
use Nsq\Exception\ConnectionFail;
|
|
||||||
use Psr\Log\LoggerAwareTrait;
|
|
||||||
use Psr\Log\LoggerInterface;
|
|
||||||
use Psr\Log\NullLogger;
|
|
||||||
|
|
||||||
final class ExponentialStrategy implements ReconnectStrategy
|
|
||||||
{
|
|
||||||
use LoggerAwareTrait;
|
|
||||||
|
|
||||||
private int $delay;
|
|
||||||
|
|
||||||
private int $nextTryAfter;
|
|
||||||
|
|
||||||
private int $attempt = 0;
|
|
||||||
|
|
||||||
private TimeProvider $timeProvider;
|
|
||||||
|
|
||||||
public function __construct(
|
|
||||||
private int $minDelay = 8,
|
|
||||||
private int $maxDelay = 32,
|
|
||||||
TimeProvider $timeProvider = null,
|
|
||||||
LoggerInterface $logger = null,
|
|
||||||
) {
|
|
||||||
$this->delay = 0;
|
|
||||||
$this->timeProvider = $timeProvider ?? new RealTimeProvider();
|
|
||||||
$this->nextTryAfter = $this->timeProvider->time();
|
|
||||||
$this->logger = $logger ?? new NullLogger();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function connect(callable $callable): void
|
|
||||||
{
|
|
||||||
$currentTime = $this->timeProvider->time();
|
|
||||||
|
|
||||||
if ($currentTime < $this->nextTryAfter) {
|
|
||||||
throw new ConnectionFail('Time to reconnect has not yet come');
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
$callable();
|
|
||||||
} catch (\Throwable $e) {
|
|
||||||
$nextDelay = 0 === $this->delay ? $this->minDelay : $this->delay * 2;
|
|
||||||
$this->delay = $nextDelay > $this->maxDelay ? $this->maxDelay : $nextDelay;
|
|
||||||
$this->nextTryAfter = $currentTime + $this->delay;
|
|
||||||
|
|
||||||
$this->logger->warning('Reconnect #{attempt} after {delay}s', ['attempt' => ++$this->attempt, 'delay' => $this->delay]);
|
|
||||||
|
|
||||||
throw $e;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->delay = 0;
|
|
||||||
$this->attempt = 0;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,13 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Reconnect;
|
|
||||||
|
|
||||||
final class RealTimeProvider implements TimeProvider
|
|
||||||
{
|
|
||||||
public function time(): int
|
|
||||||
{
|
|
||||||
return time();
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,15 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Reconnect;
|
|
||||||
|
|
||||||
use Nsq\Exception\ConnectionFail;
|
|
||||||
|
|
||||||
interface ReconnectStrategy
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* @throws ConnectionFail
|
|
||||||
*/
|
|
||||||
public function connect(callable $callable): void;
|
|
||||||
}
|
|
@ -1,10 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Reconnect;
|
|
||||||
|
|
||||||
interface TimeProvider
|
|
||||||
{
|
|
||||||
public function time(): int;
|
|
||||||
}
|
|
@ -1,45 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Socket;
|
|
||||||
|
|
||||||
final class DeflateSocket implements Socket
|
|
||||||
{
|
|
||||||
public function __construct(
|
|
||||||
private Socket $socket,
|
|
||||||
) {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function write(string $data): void
|
|
||||||
{
|
|
||||||
throw new \LogicException('not implemented.');
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function read(int $length): string
|
|
||||||
{
|
|
||||||
throw new \LogicException('not implemented.');
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function close(): void
|
|
||||||
{
|
|
||||||
throw new \LogicException('not implemented.');
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function selectRead(float $timeout): bool
|
|
||||||
{
|
|
||||||
return $this->socket->selectRead($timeout);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,81 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Socket;
|
|
||||||
|
|
||||||
use Nsq\Exception\ConnectionFail;
|
|
||||||
use Psr\Log\LoggerInterface;
|
|
||||||
use Psr\Log\NullLogger;
|
|
||||||
use Socket\Raw\Exception;
|
|
||||||
use Socket\Raw\Factory;
|
|
||||||
use Socket\Raw\Socket as ClueSocket;
|
|
||||||
use Throwable;
|
|
||||||
|
|
||||||
final class RawSocket implements Socket
|
|
||||||
{
|
|
||||||
private ClueSocket $socket;
|
|
||||||
|
|
||||||
private LoggerInterface $logger;
|
|
||||||
|
|
||||||
public function __construct(string $address, LoggerInterface $logger = null)
|
|
||||||
{
|
|
||||||
$this->socket = (new Factory())->createClient($address);
|
|
||||||
$this->logger = $logger ?? new NullLogger();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function selectRead(float $timeout): bool
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
return false !== $this->socket->selectRead($timeout);
|
|
||||||
} // @codeCoverageIgnoreStart
|
|
||||||
catch (Exception $e) {
|
|
||||||
throw ConnectionFail::fromThrowable($e);
|
|
||||||
}
|
|
||||||
// @codeCoverageIgnoreEnd
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function close(): void
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
$this->socket->close();
|
|
||||||
} catch (Throwable) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function write(string $data): void
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
$this->socket->write($data);
|
|
||||||
} // @codeCoverageIgnoreStart
|
|
||||||
catch (Exception $e) {
|
|
||||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
|
||||||
|
|
||||||
throw ConnectionFail::fromThrowable($e);
|
|
||||||
}
|
|
||||||
// @codeCoverageIgnoreEnd
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function read(int $length): string
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
return $this->socket->read($length);
|
|
||||||
} // @codeCoverageIgnoreStart
|
|
||||||
catch (Exception $e) {
|
|
||||||
throw ConnectionFail::fromThrowable($e);
|
|
||||||
}
|
|
||||||
// @codeCoverageIgnoreEnd
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,162 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Socket;
|
|
||||||
|
|
||||||
use PHPinnacle\Buffer\ByteBuffer;
|
|
||||||
use Psr\Log\LoggerInterface;
|
|
||||||
|
|
||||||
final class SnappySocket implements Socket
|
|
||||||
{
|
|
||||||
private ByteBuffer $output;
|
|
||||||
|
|
||||||
private ByteBuffer $input;
|
|
||||||
|
|
||||||
public function __construct(
|
|
||||||
private Socket $socket,
|
|
||||||
private LoggerInterface $logger,
|
|
||||||
) {
|
|
||||||
if (
|
|
||||||
!\function_exists('snappy_compress')
|
|
||||||
|| !\function_exists('snappy_uncompress')
|
|
||||||
|| !\extension_loaded('snappy')
|
|
||||||
) {
|
|
||||||
throw new \LogicException('Snappy extension not installed.');
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->output = new ByteBuffer();
|
|
||||||
$this->input = new ByteBuffer();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function write(string $data): void
|
|
||||||
{
|
|
||||||
$identifierFrame = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
|
|
||||||
$compressedFrame = 0x00;
|
|
||||||
$uncompressedFrame = 0x01; // 11
|
|
||||||
$maxChunkLength = 65536;
|
|
||||||
|
|
||||||
$byteBuffer = new ByteBuffer();
|
|
||||||
foreach ($identifierFrame as $bite) {
|
|
||||||
$byteBuffer->appendUint8($bite);
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach (str_split($data, $maxChunkLength) as $chunk) {
|
|
||||||
$compressedChunk = snappy_compress($chunk);
|
|
||||||
|
|
||||||
[$chunk, $chunkType] = \strlen($compressedChunk) <= 0.875 * \strlen($data)
|
|
||||||
? [$compressedChunk, $compressedFrame]
|
|
||||||
: [$data, $uncompressedFrame];
|
|
||||||
|
|
||||||
/** @var string $checksum */
|
|
||||||
$checksum = hash('crc32c', $data, true);
|
|
||||||
/** @phpstan-ignore-next-line */
|
|
||||||
$checksum = unpack('N', $checksum)[1];
|
|
||||||
$maskedChecksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff;
|
|
||||||
|
|
||||||
$size = (\strlen($chunk) + 4) << 8;
|
|
||||||
|
|
||||||
$byteBuffer->append(pack('V', $chunkType + $size));
|
|
||||||
$byteBuffer->append(pack('V', $maskedChecksum));
|
|
||||||
$byteBuffer->append($chunk);
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->socket->write($byteBuffer->flush());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function read(int $length): string
|
|
||||||
{
|
|
||||||
$output = $this->output;
|
|
||||||
$input = $this->input;
|
|
||||||
|
|
||||||
$this->logger->debug('Snappy requested {length} bytes.', ['length' => $length]);
|
|
||||||
|
|
||||||
while ($output->size() < $length) {
|
|
||||||
$this->logger->debug('Snappy enter loop');
|
|
||||||
|
|
||||||
/** @phpstan-ignore-next-line */
|
|
||||||
$chunkType = unpack('V', $this->socket->read(4))[1];
|
|
||||||
|
|
||||||
$size = $chunkType >> 8;
|
|
||||||
$chunkType &= 0xff;
|
|
||||||
|
|
||||||
$this->logger->debug('Snappy receive chunk [{chunk}], size [{size}]', [
|
|
||||||
'chunk' => $chunkType,
|
|
||||||
'size' => $size,
|
|
||||||
]);
|
|
||||||
|
|
||||||
do {
|
|
||||||
$input->append(
|
|
||||||
$this->socket->read($size),
|
|
||||||
);
|
|
||||||
|
|
||||||
$size -= $input->size();
|
|
||||||
} while ($input->size() < $size);
|
|
||||||
|
|
||||||
switch ($chunkType) {
|
|
||||||
case 0xff:
|
|
||||||
$this->logger->debug('Snappy identifier chunk');
|
|
||||||
|
|
||||||
$input->discard(6); // discard identifier body
|
|
||||||
|
|
||||||
break;
|
|
||||||
case 0x00: // 'compressed',
|
|
||||||
$this->logger->debug('Snappy compressed chunk');
|
|
||||||
|
|
||||||
$data = $input
|
|
||||||
->discard(4) // discard checksum
|
|
||||||
->flush()
|
|
||||||
;
|
|
||||||
|
|
||||||
$this->logger->debug('Snappy compressed data [{data}]', ['data' => $data]);
|
|
||||||
|
|
||||||
$output->append(snappy_uncompress($data));
|
|
||||||
|
|
||||||
break;
|
|
||||||
case 0x01: // 'uncompressed',
|
|
||||||
$this->logger->debug('Snappy uncompressed chunk');
|
|
||||||
|
|
||||||
$data = $input
|
|
||||||
->discard(4) // discard checksum
|
|
||||||
->flush()
|
|
||||||
;
|
|
||||||
|
|
||||||
$this->logger->debug('Snappy uncompressed data [{data}]', ['data' => $data]);
|
|
||||||
|
|
||||||
$output->append($data);
|
|
||||||
|
|
||||||
break;
|
|
||||||
case 0xfe:// 'padding',
|
|
||||||
$this->logger->debug('Snappy padding chunk');
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->logger->debug('Snappy return message [{message}]', ['message' => $output->read($length)]);
|
|
||||||
|
|
||||||
return $output->consume($length);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function close(): void
|
|
||||||
{
|
|
||||||
$this->socket->close();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function selectRead(float $timeout): bool
|
|
||||||
{
|
|
||||||
return !$this->input->empty() || $this->socket->selectRead($timeout);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,27 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Socket;
|
|
||||||
|
|
||||||
use Nsq\Exception\ConnectionFail;
|
|
||||||
|
|
||||||
interface Socket
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* @throws ConnectionFail
|
|
||||||
*/
|
|
||||||
public function write(string $data): void;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @throws ConnectionFail
|
|
||||||
*/
|
|
||||||
public function read(int $length): string;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @throws ConnectionFail
|
|
||||||
*/
|
|
||||||
public function selectRead(float $timeout): bool;
|
|
||||||
|
|
||||||
public function close(): void;
|
|
||||||
}
|
|
57
src/Stream/NsqInputStream.php
Normal file
57
src/Stream/NsqInputStream.php
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Stream;
|
||||||
|
|
||||||
|
use Amp\ByteStream\InputStream;
|
||||||
|
use Amp\Promise;
|
||||||
|
use Nsq\Bytes;
|
||||||
|
use Nsq\Exception\NotConnected;
|
||||||
|
use PHPinnacle\Buffer\ByteBuffer;
|
||||||
|
use function Amp\call;
|
||||||
|
|
||||||
|
final class NsqInputStream implements InputStream
|
||||||
|
{
|
||||||
|
private ByteBuffer $buffer;
|
||||||
|
|
||||||
|
public function __construct(
|
||||||
|
private InputStream $inputStream,
|
||||||
|
) {
|
||||||
|
$this->buffer = new ByteBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function read(): Promise
|
||||||
|
{
|
||||||
|
return call(function (): \Generator {
|
||||||
|
$buffer = $this->buffer;
|
||||||
|
|
||||||
|
while ($buffer->size() < Bytes::BYTES_SIZE) {
|
||||||
|
$bytes = yield $this->inputStream->read();
|
||||||
|
|
||||||
|
if (null === $bytes) {
|
||||||
|
throw new NotConnected();
|
||||||
|
}
|
||||||
|
|
||||||
|
$buffer->append($bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
$size = $buffer->consumeUint32();
|
||||||
|
|
||||||
|
while ($buffer->size() < $size) {
|
||||||
|
$bytes = yield $this->inputStream->read();
|
||||||
|
|
||||||
|
if (null === $bytes) {
|
||||||
|
throw new NotConnected();
|
||||||
|
}
|
||||||
|
|
||||||
|
$buffer->append($bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $buffer->consume($size);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
42
src/Stream/NullStream.php
Normal file
42
src/Stream/NullStream.php
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Stream;
|
||||||
|
|
||||||
|
use Amp\ByteStream\InputStream;
|
||||||
|
use Amp\ByteStream\OutputStream;
|
||||||
|
use Amp\Failure;
|
||||||
|
use Amp\Promise;
|
||||||
|
use Nsq\Exception\NotConnected;
|
||||||
|
|
||||||
|
final class NullStream implements InputStream, OutputStream
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function read(): Promise
|
||||||
|
{
|
||||||
|
return new Failure(new NotConnected());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*
|
||||||
|
* @return Promise<void>
|
||||||
|
*/
|
||||||
|
public function write(string $data): Promise
|
||||||
|
{
|
||||||
|
return new Failure(new NotConnected());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*
|
||||||
|
* @return Promise<void>
|
||||||
|
*/
|
||||||
|
public function end(string $finalData = ''): Promise
|
||||||
|
{
|
||||||
|
return new Failure(new NotConnected());
|
||||||
|
}
|
||||||
|
}
|
106
src/Stream/SnappyInputStream.php
Normal file
106
src/Stream/SnappyInputStream.php
Normal file
@ -0,0 +1,106 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Stream;
|
||||||
|
|
||||||
|
use Amp\ByteStream\InputStream;
|
||||||
|
use Amp\Promise;
|
||||||
|
use Nsq\Exception\NotConnected;
|
||||||
|
use PHPinnacle\Buffer\ByteBuffer;
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use function Amp\call;
|
||||||
|
|
||||||
|
final class SnappyInputStream implements InputStream
|
||||||
|
{
|
||||||
|
private ByteBuffer $buffer;
|
||||||
|
|
||||||
|
public function __construct(
|
||||||
|
private InputStream $inputStream,
|
||||||
|
private LoggerInterface $logger,
|
||||||
|
) {
|
||||||
|
if (!\function_exists('snappy_uncompress')) {
|
||||||
|
throw new \LogicException('Snappy extension not installed.');
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->buffer = new ByteBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public function read(): Promise
|
||||||
|
{
|
||||||
|
return call(function (): \Generator {
|
||||||
|
$buffer = $this->buffer;
|
||||||
|
|
||||||
|
while ($buffer->size() < 4) {
|
||||||
|
$bytes = yield $this->inputStream->read();
|
||||||
|
|
||||||
|
if (null === $bytes) {
|
||||||
|
throw new NotConnected();
|
||||||
|
}
|
||||||
|
|
||||||
|
$buffer->append($bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @phpstan-ignore-next-line */
|
||||||
|
$chunkType = unpack('V', $buffer->consume(4))[1];
|
||||||
|
|
||||||
|
$size = $chunkType >> 8;
|
||||||
|
$chunkType &= 0xff;
|
||||||
|
|
||||||
|
$this->logger->debug('Snappy receive chunk [{chunk}], size [{size}]', [
|
||||||
|
'chunk' => $chunkType,
|
||||||
|
'size' => $size,
|
||||||
|
]);
|
||||||
|
|
||||||
|
while ($buffer->size() < $size) {
|
||||||
|
$bytes = yield $this->inputStream->read();
|
||||||
|
|
||||||
|
if (null === $bytes) {
|
||||||
|
throw new NotConnected();
|
||||||
|
}
|
||||||
|
|
||||||
|
$buffer->append($bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
switch ($chunkType) {
|
||||||
|
case 0xff:
|
||||||
|
$this->logger->debug('Snappy identifier chunk');
|
||||||
|
|
||||||
|
$buffer->discard(6); // discard identifier body
|
||||||
|
|
||||||
|
break;
|
||||||
|
case 0x00: // 'compressed',
|
||||||
|
$this->logger->debug('Snappy compressed chunk');
|
||||||
|
|
||||||
|
$data = $buffer
|
||||||
|
->discard(4) // discard checksum
|
||||||
|
->consume($size)
|
||||||
|
;
|
||||||
|
|
||||||
|
$this->logger->debug('Snappy compressed data [{data}]', ['data' => $data]);
|
||||||
|
|
||||||
|
return snappy_uncompress($data);
|
||||||
|
case 0x01: // 'uncompressed',
|
||||||
|
$this->logger->debug('Snappy uncompressed chunk');
|
||||||
|
|
||||||
|
$data = $buffer
|
||||||
|
->discard(4) // discard checksum
|
||||||
|
->consume($size)
|
||||||
|
;
|
||||||
|
|
||||||
|
$this->logger->debug('Snappy uncompressed data [{data}]', ['data' => $data]);
|
||||||
|
|
||||||
|
return $data;
|
||||||
|
case 0xfe:// 'padding',
|
||||||
|
$this->logger->debug('Snappy padding chunk');
|
||||||
|
|
||||||
|
$buffer->discard($size); // TODO ?
|
||||||
|
}
|
||||||
|
|
||||||
|
return $this->read();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
74
src/Stream/SnappyOutputStream.php
Normal file
74
src/Stream/SnappyOutputStream.php
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace Nsq\Stream;
|
||||||
|
|
||||||
|
use Amp\ByteStream\OutputStream;
|
||||||
|
use Amp\Promise;
|
||||||
|
use PHPinnacle\Buffer\ByteBuffer;
|
||||||
|
|
||||||
|
final class SnappyOutputStream implements OutputStream
|
||||||
|
{
|
||||||
|
private ByteBuffer $buffer;
|
||||||
|
|
||||||
|
public function __construct(
|
||||||
|
private OutputStream $outputStream,
|
||||||
|
) {
|
||||||
|
if (!\function_exists('snappy_compress')) {
|
||||||
|
throw new \LogicException('Snappy extension not installed.');
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->buffer = new ByteBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*
|
||||||
|
* @return Promise<void>
|
||||||
|
*/
|
||||||
|
public function write(string $data): Promise
|
||||||
|
{
|
||||||
|
$identifierFrame = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
|
||||||
|
$compressedFrame = 0x00;
|
||||||
|
$uncompressedFrame = 0x01; // 11
|
||||||
|
$maxChunkLength = 65536;
|
||||||
|
|
||||||
|
$buffer = $this->buffer;
|
||||||
|
foreach ($identifierFrame as $bite) {
|
||||||
|
$buffer->appendUint8($bite);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (str_split($data, $maxChunkLength) as $chunk) {
|
||||||
|
$compressedChunk = snappy_compress($chunk);
|
||||||
|
|
||||||
|
[$chunk, $chunkType] = \strlen($compressedChunk) <= 0.875 * \strlen($data)
|
||||||
|
? [$compressedChunk, $compressedFrame]
|
||||||
|
: [$data, $uncompressedFrame];
|
||||||
|
|
||||||
|
/** @var string $checksum */
|
||||||
|
$checksum = hash('crc32c', $data, true);
|
||||||
|
/** @phpstan-ignore-next-line */
|
||||||
|
$checksum = unpack('N', $checksum)[1];
|
||||||
|
$maskedChecksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff;
|
||||||
|
|
||||||
|
$size = (\strlen($chunk) + 4) << 8;
|
||||||
|
|
||||||
|
$buffer->append(pack('V', $chunkType + $size));
|
||||||
|
$buffer->append(pack('V', $maskedChecksum));
|
||||||
|
$buffer->append($chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $this->outputStream->write($buffer->flush());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*
|
||||||
|
* @return Promise<void>
|
||||||
|
*/
|
||||||
|
public function end(string $finalData = ''): Promise
|
||||||
|
{
|
||||||
|
return $this->outputStream->end($finalData);
|
||||||
|
}
|
||||||
|
}
|
@ -1,90 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
use Nsq\Exception\ConnectionFail;
|
|
||||||
use Nsq\Reconnect\ExponentialStrategy;
|
|
||||||
use Nsq\Reconnect\TimeProvider;
|
|
||||||
use PHPUnit\Framework\TestCase;
|
|
||||||
|
|
||||||
final class ExponentialStrategyTest extends TestCase
|
|
||||||
{
|
|
||||||
public function testTimeNotYetCome(): void
|
|
||||||
{
|
|
||||||
$timeProvider = new FakeTimeProvider();
|
|
||||||
$strategy = new ExponentialStrategy(
|
|
||||||
minDelay: 8,
|
|
||||||
maxDelay: 32,
|
|
||||||
timeProvider: $timeProvider,
|
|
||||||
);
|
|
||||||
|
|
||||||
$successConnect = static function (int $time = null) use ($strategy, $timeProvider): void {
|
|
||||||
$timeProvider($time);
|
|
||||||
|
|
||||||
$strategy->connect(static function (): void {
|
|
||||||
});
|
|
||||||
};
|
|
||||||
$failConnect = static function (int $time = null) use ($strategy, $timeProvider): void {
|
|
||||||
$timeProvider($time);
|
|
||||||
|
|
||||||
try {
|
|
||||||
$strategy->connect(function (): void {
|
|
||||||
throw new ConnectionFail('Time come but failed');
|
|
||||||
});
|
|
||||||
} catch (ConnectionFail $e) {
|
|
||||||
self::assertSame('Time come but failed', $e->getMessage());
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
self::fail('Expecting exception with message "Time come but failed"');
|
|
||||||
};
|
|
||||||
$timeNotCome = static function (int $time = null) use ($strategy, $timeProvider): void {
|
|
||||||
$timeProvider($time);
|
|
||||||
|
|
||||||
try {
|
|
||||||
$strategy->connect(function (): void {
|
|
||||||
throw new ConnectionFail('');
|
|
||||||
});
|
|
||||||
} catch (ConnectionFail $e) {
|
|
||||||
self::assertSame('Time to reconnect has not yet come', $e->getMessage());
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
self::fail('Was expecting exception with message "Time to reconnect has not yet come"');
|
|
||||||
};
|
|
||||||
|
|
||||||
$failConnect(0);
|
|
||||||
$timeNotCome(7);
|
|
||||||
$failConnect(8);
|
|
||||||
$timeNotCome(22);
|
|
||||||
$timeNotCome(13);
|
|
||||||
$failConnect(24);
|
|
||||||
$successConnect(56);
|
|
||||||
$failConnect();
|
|
||||||
$timeNotCome();
|
|
||||||
$timeNotCome(63);
|
|
||||||
$failConnect(64);
|
|
||||||
|
|
||||||
$this->expectException(ConnectionFail::class);
|
|
||||||
$this->expectExceptionMessage('Time to reconnect has not yet come');
|
|
||||||
|
|
||||||
$successConnect();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class FakeTimeProvider implements TimeProvider
|
|
||||||
{
|
|
||||||
public int $time = 0;
|
|
||||||
|
|
||||||
public function time(): int
|
|
||||||
{
|
|
||||||
return $this->time;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function __invoke(int $time = null): void
|
|
||||||
{
|
|
||||||
$this->time = $time ?? $this->time;
|
|
||||||
}
|
|
||||||
}
|
|
@ -2,10 +2,12 @@
|
|||||||
|
|
||||||
declare(strict_types=1);
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
use Amp\Success;
|
||||||
use Nsq\Consumer;
|
use Nsq\Consumer;
|
||||||
use Nsq\Exception\MessageAlreadyFinished;
|
use Nsq\Exception\MessageAlreadyFinished;
|
||||||
use Nsq\Protocol\Message;
|
use Nsq\Protocol\Message;
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use function Amp\Promise\wait;
|
||||||
|
|
||||||
final class MessageTest extends TestCase
|
final class MessageTest extends TestCase
|
||||||
{
|
{
|
||||||
@ -16,14 +18,14 @@ final class MessageTest extends TestCase
|
|||||||
{
|
{
|
||||||
self::assertFalse($message->isFinished());
|
self::assertFalse($message->isFinished());
|
||||||
|
|
||||||
$message->finish();
|
wait($message->finish());
|
||||||
|
|
||||||
self::assertTrue($message->isFinished());
|
self::assertTrue($message->isFinished());
|
||||||
|
|
||||||
$this->expectException(MessageAlreadyFinished::class);
|
$this->expectException(MessageAlreadyFinished::class);
|
||||||
$this->expectExceptionMessage('Can\'t finish message as it already finished.');
|
$this->expectExceptionMessage('Can\'t finish message as it already finished.');
|
||||||
|
|
||||||
$message->finish();
|
wait($message->finish());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -33,14 +35,14 @@ final class MessageTest extends TestCase
|
|||||||
{
|
{
|
||||||
self::assertFalse($message->isFinished());
|
self::assertFalse($message->isFinished());
|
||||||
|
|
||||||
$message->requeue(1);
|
wait($message->requeue(1));
|
||||||
|
|
||||||
self::assertTrue($message->isFinished());
|
self::assertTrue($message->isFinished());
|
||||||
|
|
||||||
$this->expectException(MessageAlreadyFinished::class);
|
$this->expectException(MessageAlreadyFinished::class);
|
||||||
$this->expectExceptionMessage('Can\'t requeue message as it already finished.');
|
$this->expectExceptionMessage('Can\'t requeue message as it already finished.');
|
||||||
|
|
||||||
$message->requeue(5);
|
wait($message->requeue(5));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -50,12 +52,12 @@ final class MessageTest extends TestCase
|
|||||||
{
|
{
|
||||||
self::assertFalse($message->isFinished());
|
self::assertFalse($message->isFinished());
|
||||||
|
|
||||||
$message->finish();
|
wait($message->finish());
|
||||||
|
|
||||||
$this->expectException(MessageAlreadyFinished::class);
|
$this->expectException(MessageAlreadyFinished::class);
|
||||||
$this->expectExceptionMessage('Can\'t touch message as it already finished.');
|
$this->expectExceptionMessage('Can\'t touch message as it already finished.');
|
||||||
|
|
||||||
$message->touch();
|
wait($message->touch());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -63,6 +65,11 @@ final class MessageTest extends TestCase
|
|||||||
*/
|
*/
|
||||||
public function messages(): Generator
|
public function messages(): Generator
|
||||||
{
|
{
|
||||||
yield [new Message(0, 0, 'id', 'body', $this->createStub(Consumer::class))];
|
$consumer = $this->createMock(Consumer::class);
|
||||||
|
$consumer->method('fin')->willReturn(new Success());
|
||||||
|
$consumer->method('touch')->willReturn(new Success());
|
||||||
|
$consumer->method('req')->willReturn(new Success());
|
||||||
|
|
||||||
|
yield [new Message(0, 0, 'id', 'body', $consumer)];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,10 +3,6 @@
|
|||||||
declare(strict_types=1);
|
declare(strict_types=1);
|
||||||
|
|
||||||
use Nsq\Config\ClientConfig;
|
use Nsq\Config\ClientConfig;
|
||||||
use Nsq\Consumer;
|
|
||||||
use Nsq\Producer;
|
|
||||||
use Nsq\Protocol\Message;
|
|
||||||
use Nyholm\NSA;
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
|
||||||
final class NsqTest extends TestCase
|
final class NsqTest extends TestCase
|
||||||
@ -16,78 +12,7 @@ final class NsqTest extends TestCase
|
|||||||
*/
|
*/
|
||||||
public function test(ClientConfig $clientConfig): void
|
public function test(ClientConfig $clientConfig): void
|
||||||
{
|
{
|
||||||
$producer = new Producer('tcp://localhost:4150');
|
self::markTestSkipped('');
|
||||||
$producer->pub(__FUNCTION__, __FUNCTION__);
|
|
||||||
|
|
||||||
$consumer = new Consumer(
|
|
||||||
topic: 'test',
|
|
||||||
channel: 'test',
|
|
||||||
address: 'tcp://localhost:4150',
|
|
||||||
clientConfig: $clientConfig,
|
|
||||||
);
|
|
||||||
$generator = $consumer->generator();
|
|
||||||
|
|
||||||
/** @var null|Message $message */
|
|
||||||
$message = $generator->current();
|
|
||||||
|
|
||||||
self::assertInstanceOf(Message::class, $message);
|
|
||||||
self::assertSame(__FUNCTION__, $message->body);
|
|
||||||
$message->finish();
|
|
||||||
|
|
||||||
$generator->next();
|
|
||||||
self::assertNull($generator->current());
|
|
||||||
|
|
||||||
$producer->mpub(__FUNCTION__, [
|
|
||||||
'First mpub message.',
|
|
||||||
'Second mpub message.',
|
|
||||||
]);
|
|
||||||
|
|
||||||
$generator->next();
|
|
||||||
/** @var null|Message $message */
|
|
||||||
$message = $generator->current();
|
|
||||||
self::assertInstanceOf(Message::class, $message);
|
|
||||||
self::assertSame('First mpub message.', $message->body);
|
|
||||||
$message->finish();
|
|
||||||
|
|
||||||
$generator->next();
|
|
||||||
/** @var null|Message $message */
|
|
||||||
$message = $generator->current();
|
|
||||||
self::assertInstanceOf(Message::class, $message);
|
|
||||||
self::assertSame('Second mpub message.', $message->body);
|
|
||||||
$message->requeue(0);
|
|
||||||
|
|
||||||
$generator->next();
|
|
||||||
/** @var null|Message $message */
|
|
||||||
$message = $generator->current();
|
|
||||||
self::assertInstanceOf(Message::class, $message);
|
|
||||||
self::assertSame('Second mpub message.', $message->body);
|
|
||||||
$message->finish();
|
|
||||||
|
|
||||||
$producer->dpub(__FUNCTION__, 'Deferred message.', 2000);
|
|
||||||
|
|
||||||
$generator->next();
|
|
||||||
/** @var null|Message $message */
|
|
||||||
$message = $generator->current();
|
|
||||||
self::assertNull($message);
|
|
||||||
|
|
||||||
NSA::setProperty(
|
|
||||||
NSA::getProperty($consumer, 'clientConfig'),
|
|
||||||
'readTimeout',
|
|
||||||
10,
|
|
||||||
);
|
|
||||||
|
|
||||||
$generator->next();
|
|
||||||
|
|
||||||
/** @var null|Message $message */
|
|
||||||
$message = $generator->current();
|
|
||||||
self::assertInstanceOf(Message::class, $message);
|
|
||||||
self::assertSame('Deferred message.', $message->body);
|
|
||||||
$message->touch();
|
|
||||||
$message->finish();
|
|
||||||
|
|
||||||
self::assertFalse($consumer->isClosed());
|
|
||||||
$generator->send(0);
|
|
||||||
self::assertTrue($consumer->isClosed());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -5,6 +5,7 @@ declare(strict_types=1);
|
|||||||
use Nsq\Exception\NsqError;
|
use Nsq\Exception\NsqError;
|
||||||
use Nsq\Producer;
|
use Nsq\Producer;
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use function Amp\Promise\wait;
|
||||||
|
|
||||||
final class ProducerTest extends TestCase
|
final class ProducerTest extends TestCase
|
||||||
{
|
{
|
||||||
@ -17,7 +18,9 @@ final class ProducerTest extends TestCase
|
|||||||
$this->expectExceptionMessage($exceptionMessage);
|
$this->expectExceptionMessage($exceptionMessage);
|
||||||
|
|
||||||
$producer = new Producer('tcp://localhost:4150');
|
$producer = new Producer('tcp://localhost:4150');
|
||||||
$producer->pub($topic, $body);
|
|
||||||
|
wait($producer->connect());
|
||||||
|
wait($producer->pub($topic, $body));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Reference in New Issue
Block a user