3 Commits

Author SHA1 Message Date
f5ab37c579 rename test data 2021-03-10 23:33:15 +03:00
809f967fb1 remove NsqTest.php 2021-03-10 23:33:09 +03:00
1b5e1ffb95 tests 2021-03-10 23:24:13 +03:00
40 changed files with 366 additions and 1320 deletions

1
.env
View File

@ -1 +0,0 @@
NSQ_VERSION=1.2.1

View File

@ -14,7 +14,7 @@ jobs:
os: os:
- ubuntu-latest - ubuntu-latest
php: php:
- '8.1' - '8.0'
dependencies: dependencies:
- lowest - lowest
- highest - highest
@ -59,6 +59,9 @@ jobs:
run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest
if: ${{ matrix.dependencies == 'lowest' }} if: ${{ matrix.dependencies == 'lowest' }}
- name: Install nsq bin
run: curl -L https://github.com/nsqio/nsq/releases/download/v1.2.0/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz | tar xz --strip 1
- name: Run tests - name: Run tests
run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml
@ -77,7 +80,7 @@ jobs:
- name: Setup PHP - name: Setup PHP
uses: shivammathur/setup-php@v2 uses: shivammathur/setup-php@v2
with: with:
php-version: '8.1' php-version: '8.0'
env: env:
update: true update: true
@ -108,7 +111,7 @@ jobs:
- name: Setup PHP - name: Setup PHP
uses: shivammathur/setup-php@v2 uses: shivammathur/setup-php@v2
with: with:
php-version: '8.1' php-version: '8.0'
extensions: snappy-kjdev/php-ext-snappy@0.2.1 extensions: snappy-kjdev/php-ext-snappy@0.2.1
env: env:
update: true update: true
@ -140,7 +143,7 @@ jobs:
- name: Setup PHP - name: Setup PHP
uses: shivammathur/setup-php@v2 uses: shivammathur/setup-php@v2
with: with:
php-version: '8.1' php-version: '8.0'
extensions: snappy-kjdev/php-ext-snappy@0.2.1 extensions: snappy-kjdev/php-ext-snappy@0.2.1
env: env:
update: true update: true
@ -161,3 +164,48 @@ jobs:
- name: Run script - name: Run script
run: vendor/bin/psalm --output-format=github run: vendor/bin/psalm --output-format=github
infection:
name: Infection
runs-on: ubuntu-latest
services:
nsqd:
image: nsqio/nsq:v1.2.0
options: --entrypoint /nsqd
ports:
- 4150:4150
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.0'
coverage: pcov
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Install nsq bin
run: curl -L https://github.com/nsqio/nsq/releases/download/v1.2.0/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz | tar xz --strip 1
- name: Run script
env:
STRYKER_DASHBOARD_API_KEY: ${{ secrets.STRYKER_DASHBOARD_API_KEY }}
run: |
git fetch --depth=1 origin $GITHUB_BASE_REF
php vendor/bin/infection -j2 --git-diff-filter=A --git-diff-base=origin/$GITHUB_BASE_REF --logger-github --ignore-msi-with-no-mutations --only-covered

13
.gitignore vendored
View File

@ -1,6 +1,17 @@
/vendor/ /vendor/
/composer.lock /composer.lock
/.php-cs-fixer.cache /.php_cs.cache
/.phpunit.result.cache /.phpunit.result.cache
/infection.log /infection.log
# Nsq
bin/nsq_stat
bin/nsq_tail
bin/nsq_to_file
bin/nsq_to_http
bin/nsq_to_nsq
bin/nsqadmin
bin/nsqd
bin/nsqlookupd
bin/to_nsq

View File

@ -1,45 +0,0 @@
<?php
$finder = PhpCsFixer\Finder::create()
->in(__DIR__)
->exclude('vendor');
return (new PhpCsFixer\Config())
->setRiskyAllowed(true)
->setRules([
'@PhpCsFixer' => true,
'@PhpCsFixer:risky' => true,
'@PSR12' => true,
'@PSR12:risky' => true,
'blank_line_before_statement' => [
'statements' => [
'continue',
'do',
'exit',
'goto',
'if',
'return',
'switch',
'throw',
'try',
],
],
'declare_strict_types' => true,
'global_namespace_import' => [
'import_classes' => false,
'import_constants' => false,
'import_functions' => false,
],
'php_unit_internal_class' => false,
'php_unit_test_case_static_method_calls' => ['call_type' => 'self'],
'php_unit_test_class_requires_covers' => false,
'phpdoc_to_comment' => false,
'yoda_style' => true,
'trailing_comma_in_multiline' => [
'after_heredoc' => true,
'elements' => ['arrays', 'arguments', 'parameters'],
],
'types_spaces' => ['space' => 'single'],
])
->setFinder($finder)
->setCacheFile(__DIR__.'/.php-cs-fixer.cache');

28
.php_cs.dist Normal file
View File

@ -0,0 +1,28 @@
#!/usr/bin/env php
<?php
return (new PhpCsFixer\Config())
->setRiskyAllowed(true)
->setRules([
'@PhpCsFixer' => true,
'@PhpCsFixer:risky' => true,
'@PSR12' => true,
'@PSR12:risky' => true,
'braces' => [
'allow_single_line_closure' => true,
],
'blank_line_before_statement' => [
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try'],
],
'declare_strict_types' => true,
'global_namespace_import' => ['import_classes' => false, 'import_constants' => false, 'import_functions' => false],
'php_unit_internal_class' => false,
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
'php_unit_test_class_requires_covers' => false,
'phpdoc_to_comment' => false,
'yoda_style' => true,
])
->setFinder(
PhpCsFixer\Finder::create()
->in(__DIR__)
);

View File

@ -1,20 +0,0 @@
all: install composer-validate php-cs-fixer psalm phpstan phpunit
install:
composer install
psalm:
php vendor/bin/psalm
phpstan:
php vendor/bin/phpstan analyse
phpunit:
php vendor/bin/phpunit
php-cs-fixer:
php vendor/bin/php-cs-fixer fix
composer-validate:
composer validate

View File

@ -31,10 +31,10 @@ Features
- [x] PUB - [x] PUB
- [x] SUB - [x] SUB
- [X] Feature Negotiation - [X] Feature Negotiation
- [X] Discovery - [ ] Discovery
- [ ] Backoff - [ ] Backoff
- [X] TLS - [ ] TLS
- [X] Deflate - [ ] Deflate
- [X] Snappy - [X] Snappy
- [X] Sampling - [X] Sampling
- [X] AUTH - [X] AUTH
@ -80,28 +80,6 @@ $consumer = Consumer::create(
); );
``` ```
### Lookup
```php
use Nsq\Lookup;
use Nsq\Message;
$lookup = new Lookup('http://nsqlookupd0:4161');
$lookup = new Lookup(['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161']);
$callable = static function (Message $message): Generator {
yield $message->touch(); // Reset the timeout for an in-flight message
yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
yield $message->finish(); // Finish a message (indicate successful processing)
};
$lookup->subscribe(topic: 'topic', channel: 'channel', onMessage: $callable);
$lookup->subscribe(topic: 'anotherTopic', channel: 'channel', onMessage: $callable);
$lookup->unsubscribe(topic: 'local', channel: 'channel');
$lookup->stop(); // unsubscribe all
```
### Integrations ### Integrations
- [Symfony](https://github.com/nsqphp/NsqBundle) - [Symfony](https://github.com/nsqphp/NsqBundle)

0
bin/.gitkeep Normal file
View File

View File

@ -11,44 +11,34 @@
} }
], ],
"require": { "require": {
"php": "^8.1", "php": "^8.0.1",
"ext-json": "*", "ext-json": "*",
"amphp/http-client": "^4.6",
"amphp/socket": "^1.1", "amphp/socket": "^1.1",
"composer/semver": "^3.2", "composer/semver": "^3.2",
"phpinnacle/buffer": "^1.2", "phpinnacle/buffer": "^1.2",
"psr/log": "^3.0" "psr/log": "^1.1"
}, },
"require-dev": { "require-dev": {
"amphp/log": "^1.1", "amphp/log": "^1.1",
"dg/bypass-finals": "^1.3", "dg/bypass-finals": "^1.3",
"ergebnis/composer-normalize": "^2.15", "ergebnis/composer-normalize": "9999999-dev",
"friendsofphp/php-cs-fixer": "^3.4", "friendsofphp/php-cs-fixer": "^2.18",
"infection/infection": "^0.20.2",
"nyholm/nsa": "^1.2", "nyholm/nsa": "^1.2",
"phpstan/phpstan": "^1.8", "phpstan/phpstan": "^0.12.68",
"phpstan/phpstan-phpunit": "^1.1", "phpstan/phpstan-phpunit": "^0.12.17",
"phpstan/phpstan-strict-rules": "^1.3", "phpstan/phpstan-strict-rules": "^0.12.9",
"phpunit/phpunit": "^9.5", "phpunit/phpunit": "^9.5",
"symfony/var-dumper": "^6.1",
"vimeo/psalm": "^4.4" "vimeo/psalm": "^4.4"
}, },
"config": { "config": {
"sort-packages": true, "sort-packages": true
"allow-plugins": {
"ergebnis/composer-normalize": true,
"infection/extension-installer": true
}
}, },
"autoload": { "autoload": {
"psr-4": { "psr-4": {
"Nsq\\": "src/" "Nsq\\": "src/"
} }
}, },
"autoload-dev": {
"files": [
"vendor/symfony/var-dumper/Resources/functions/dump.php"
]
},
"minimum-stability": "dev", "minimum-stability": "dev",
"prefer-stable": true, "prefer-stable": true,
"scripts": { "scripts": {

View File

@ -2,56 +2,23 @@ version: '3.7'
services: services:
nsqd: nsqd:
image: nsqio/nsq:v${NSQ_VERSION} image: nsqio/nsq:v1.2.0
labels: labels:
ru.grachevko.dhu: 'nsqd' ru.grachevko.dhu: 'nsqd'
command: >- command: /nsqd -log-level debug
nsqd # command: /nsqd
--log-level debug ports:
--lookupd-tcp-address nsqlookupd0:4160 - 4150:4150
--lookupd-tcp-address nsqlookupd1:4160 - 4151:4151
--lookupd-tcp-address nsqlookupd2:4160
nsqlookupd0:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd0'
command: /nsqlookupd -log-level debug
nsqlookupd1:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd1'
command: /nsqlookupd -log-level debug
nsqlookupd2:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd2'
command: /nsqlookupd -log-level debug
nsqadmin: nsqadmin:
image: nsqio/nsq:v${NSQ_VERSION} image: nsqio/nsq:v1.2.0
labels: labels:
ru.grachevko.dhu: 'nsqadmin' ru.grachevko.dhu: 'nsqadmin'
command: command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171
- nsqadmin ports:
- --http-address=0.0.0.0:4171 - 4171:4171
- --lookupd-http-address=nsqlookupd0:4161
- --lookupd-http-address=nsqlookupd1:4161
- --lookupd-http-address=nsqlookupd2:4161
depends_on:
- nsqlookupd0
- nsqlookupd1
- nsqlookupd2
tail: tail:
image: nsqio/nsq:v${NSQ_VERSION} image: nsqio/nsq:v1.2.0
command: >- command: nsq_tail -channel nsq_tail -topic local -nsqd-tcp-address nsqd:4150
nsq_tail
--channel nsq_tail
--topic local
--lookupd-http-address nsqlookupd1:4161
depends_on:
- nsqd
- nsqlookupd1

View File

@ -14,7 +14,6 @@ use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Consumer; use Nsq\Consumer;
use Nsq\Message; use Nsq\Message;
use function Amp\call; use function Amp\call;
Loop::run(static function () { Loop::run(static function () {

View File

@ -1,47 +0,0 @@
<?php
declare(strict_types=1);
require dirname(__DIR__).'/../vendor/autoload.php';
use Amp\ByteStream;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Loop;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Lookup;
use Nsq\Message;
Loop::run(static function () {
$handler = new StreamHandler(ByteStream\getStdout());
$handler->setFormatter(new ConsoleFormatter());
$logger = new Logger('consumer', [$handler], [new PsrLogMessageProcessor()]);
$callable = static function (Message $message) {
yield $message->finish();
};
$clientConfig = new ClientConfig();
$lookupConfig = new LookupConfig();
$watcherId = Loop::repeat(5000, function () {
yield Amp\Dns\resolver()->reloadConfig();
});
$lookup = Lookup::create(
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
$lookupConfig,
$logger,
);
$lookup->subscribe('local', 'local', $callable, $clientConfig);
Loop::delay(10000, function () use ($lookup, $watcherId) {
$lookup->stop();
Loop::cancel($watcherId);
});
});

View File

@ -1,102 +0,0 @@
<?php
declare(strict_types=1);
require dirname(__DIR__).'/../vendor/autoload.php';
use Amp\ByteStream;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Loop;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Lookup;
use Nsq\Producer;
use function Amp\asyncCall;
use function Amp\delay;
Loop::run(static function () {
$handler = new StreamHandler(ByteStream\getStdout());
$handler->setFormatter(new ConsoleFormatter());
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
$clientConfig = new ClientConfig();
/** @var Producer[] $producers */
$producers = [];
$lookupConfig = new LookupConfig();
$lookup = Lookup::create(
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
$lookupConfig,
$logger,
);
$isRunning = true;
asyncCall(static function () use ($lookup, $clientConfig, $logger, &$producers, &$isRunning) {
while ($isRunning) {
/** @var Lookup\Producer[] $nodes */
$nodes = yield $lookup->nodes();
foreach ($nodes as $node) {
$address = $node->toTcpUri();
if (array_key_exists($address, $producers)) {
continue;
}
asyncCall(function () use ($address, $clientConfig, $logger, &$producers) {
yield ($producers[$address] = Producer::create($address, $clientConfig, $logger))
->onClose(function () use (&$producers, $address) {
unset($producers[$address]);
})
->connect()
;
});
}
yield delay(5000);
yield Amp\Dns\resolver()->reloadConfig(); // for reload /etc/hosts
}
});
Loop::delay(5000, function () use (&$isRunning, $logger) {
$logger->info('Stopping producer.');
$isRunning = false;
});
$counter = 0;
while (true) {
if (!$isRunning) {
foreach ($producers as $producer) {
$producer->close();
}
break;
}
if ([] === $producers) {
yield delay(200);
continue;
}
$index = array_rand($producers);
$producer = $producers[$index];
if (!$producer->isConnected()) {
yield delay(100);
continue;
}
yield $producer->publish('local', 'This is message of count '.$counter++);
}
});

View File

@ -8,9 +8,3 @@ parameters:
paths: paths:
- src - src
- tests - tests
ignoreErrors:
-
message: '#no value type specified in iterable type array#'
paths:
- %currentWorkingDirectory%/src
- %currentWorkingDirectory%/tests

View File

@ -1,5 +1,6 @@
<?xml version="1.0"?> <?xml version="1.0"?>
<psalm <psalm
allowPhpStormGenerics="true"
ignoreInternalFunctionFalseReturn="false" ignoreInternalFunctionFalseReturn="false"
ignoreInternalFunctionNullReturn="false" ignoreInternalFunctionNullReturn="false"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

View File

@ -13,11 +13,8 @@ final class Buffer extends ByteBuffer
{ {
public function readUInt32LE(): int public function readUInt32LE(): int
{ {
$unpacked = unpack('V', $this->consume(4)); /** @phpstan-ignore-next-line */
return unpack('V', $this->consume(4))[1];
\assert(\is_array($unpacked) && \array_key_exists(1, $unpacked));
return $unpacked[1];
} }
public function consumeTimestamp(): int public function consumeTimestamp(): int

View File

@ -13,7 +13,7 @@ use Composer\InstalledVersions;
* *
* @psalm-immutable * @psalm-immutable
*/ */
final class ClientConfig final class ClientConfig implements \JsonSerializable
{ {
/** /**
* @psalm-suppress ImpureFunctionCall * @psalm-suppress ImpureFunctionCall
@ -26,26 +26,9 @@ final class ClientConfig
public ?string $authSecret = null, public ?string $authSecret = null,
/** /**
* The timeout for establishing a connection in milliseconds. * The timeout for establishing a connection in seconds.
*/ */
public int $connectTimeout = 10000, public int $connectTimeout = 10,
/**
* The max attempts for establishing a connection.
*/
public int $maxAttempts = 0,
/**
* Use tcp_nodelay for establishing a connection.
*/
public bool $tcpNoDelay = false,
public int $rdyCount = 100,
/**
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
* it will send back a JSON payload of supported features and metadata.
*/
public bool $featureNegotiation = true,
/** /**
* An identifier used to disambiguate this client (i.e. something specific to the consumer). * An identifier used to disambiguate this client (i.e. something specific to the consumer).
@ -90,6 +73,12 @@ final class ClientConfig
*/ */
public int $sampleRate = 0, public int $sampleRate = 0,
/**
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
* it will send back a JSON payload of supported features and metadata.
*/
public bool $featureNegotiation = true,
/** /**
* Enable TLS for this connection. * Enable TLS for this connection.
*/ */
@ -100,6 +89,11 @@ final class ClientConfig
*/ */
public bool $snappy = false, public bool $snappy = false,
/**
* The read timeout for connection sockets and for awaiting responses from nsq.
*/
public int $readTimeout = 5,
/** /**
* A string identifying the agent for this client in the spirit of HTTP. * A string identifying the agent for this client in the spirit of HTTP.
*/ */
@ -120,14 +114,12 @@ final class ClientConfig
} }
} }
public static function fromArray(array $array): self /**
* @phpstan-ignore-next-line
*/
public function jsonSerialize(): array
{ {
return new self(...array_intersect_key($array, get_class_vars(self::class))); return [
}
public function asNegotiationPayload(): string
{
$data = [
'client_id' => $this->clientId, 'client_id' => $this->clientId,
'deflate' => $this->deflate, 'deflate' => $this->deflate,
'deflate_level' => $this->deflateLevel, 'deflate_level' => $this->deflateLevel,
@ -140,7 +132,10 @@ final class ClientConfig
'tls_v1' => $this->tls, 'tls_v1' => $this->tls,
'user_agent' => $this->userAgent, 'user_agent' => $this->userAgent,
]; ];
}
return json_encode($data, JSON_THROW_ON_ERROR); public function toString(): string
{
return json_encode($this, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
} }
} }

View File

@ -8,10 +8,8 @@ namespace Nsq\Config;
* The configuration object that holds the config status for a single Connection. * The configuration object that holds the config status for a single Connection.
* *
* @psalm-immutable * @psalm-immutable
*
* @internal
*/ */
final class ServerConfig final class ConnectionConfig
{ {
public function __construct( public function __construct(
/** /**
@ -84,6 +82,9 @@ final class ServerConfig
) { ) {
} }
/**
* @phpstan-ignore-next-line
*/
public static function fromArray(array $array): self public static function fromArray(array $array): self
{ {
return new self( return new self(

View File

@ -1,13 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Config;
final class LookupConfig
{
public function __construct(
public int $pollingInterval = 10000,
) {
}
}

View File

@ -4,11 +4,9 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Amp\ByteStream\ClosedException;
use Amp\Failure;
use Amp\Promise; use Amp\Promise;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Config\ServerConfig; use Nsq\Config\ConnectionConfig;
use Nsq\Exception\AuthenticationRequired; use Nsq\Exception\AuthenticationRequired;
use Nsq\Exception\NsqException; use Nsq\Exception\NsqException;
use Nsq\Frame\Response; use Nsq\Frame\Response;
@ -17,8 +15,6 @@ use Nsq\Stream\NullStream;
use Nsq\Stream\SnappyStream; use Nsq\Stream\SnappyStream;
use Nsq\Stream\SocketStream; use Nsq\Stream\SocketStream;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use function Amp\asyncCall;
use function Amp\call; use function Amp\call;
/** /**
@ -26,45 +22,23 @@ use function Amp\call;
*/ */
abstract class Connection abstract class Connection
{ {
private Stream $stream; protected Stream $stream;
/**
* @var callable
*/
private $onConnectCallback;
/**
* @var callable
*/
private $onCloseCallback;
public function __construct( public function __construct(
/** private string $address,
* @readonly private ClientConfig $clientConfig,
*/ private LoggerInterface $logger,
public string $address,
protected ClientConfig $clientConfig,
protected LoggerInterface $logger,
) { ) {
$this->stream = new NullStream(); $this->stream = new NullStream();
$this->onConnectCallback = static function (): void {
};
$this->onCloseCallback = static function (): void {
};
} }
public function __destruct() public function __destruct()
{ {
$this->close(false); $this->close();
}
public function isConnected(): bool
{
return !$this->stream instanceof NullStream;
} }
/** /**
* @psalm-return Promise<void> * @return Promise<void>
*/ */
public function connect(): Promise public function connect(): Promise
{ {
@ -72,32 +46,16 @@ abstract class Connection
$buffer = new Buffer(); $buffer = new Buffer();
/** @var SocketStream $stream */ /** @var SocketStream $stream */
$stream = yield SocketStream::connect( $stream = yield SocketStream::connect($this->address);
$this->address,
$this->clientConfig->connectTimeout,
$this->clientConfig->maxAttempts,
$this->clientConfig->tcpNoDelay,
);
yield $stream->write(Command::magic()); yield $stream->write(Command::magic());
yield $stream->write(Command::identify($this->clientConfig->asNegotiationPayload())); yield $stream->write(Command::identify($this->clientConfig->toString()));
/** @var Response $response */ /** @var Response $response */
$response = yield $this->response($stream, $buffer); $response = yield $this->response($stream, $buffer);
$serverConfig = ServerConfig::fromArray($response->toArray()); $connectionConfig = ConnectionConfig::fromArray($response->toArray());
if ($serverConfig->tls) { if ($connectionConfig->snappy) {
yield $stream->setupTls();
/** @var Response $response */
$response = yield $this->response($stream, $buffer);
if (!$response->isOk()) {
throw new NsqException();
}
}
if ($serverConfig->snappy) {
$stream = new SnappyStream($stream, $buffer->flush()); $stream = new SnappyStream($stream, $buffer->flush());
/** @var Response $response */ /** @var Response $response */
@ -108,8 +66,8 @@ abstract class Connection
} }
} }
if ($serverConfig->deflate) { if ($connectionConfig->deflate) {
$stream = new GzipStream($stream, $serverConfig->deflateLevel, $buffer->flush()); $stream = new GzipStream($stream);
/** @var Response $response */ /** @var Response $response */
$response = yield $this->response($stream, $buffer); $response = yield $this->response($stream, $buffer);
@ -119,7 +77,7 @@ abstract class Connection
} }
} }
if ($serverConfig->authRequired) { if ($connectionConfig->authRequired) {
if (null === $this->clientConfig->authSecret) { if (null === $this->clientConfig->authSecret) {
throw new AuthenticationRequired(); throw new AuthenticationRequired();
} }
@ -133,103 +91,15 @@ abstract class Connection
} }
$this->stream = $stream; $this->stream = $stream;
($this->onConnectCallback)();
}); });
} }
public function close(bool $graceful = true): void public function close(): void
{ {
if (!$this->isConnected()) { // $this->stream->write(Command::cls());
return;
}
$logger = $this->logger; $this->stream->close();
[$stream, $this->stream] = [$this->stream, new NullStream()]; $this->stream = new NullStream();
if ($graceful) {
$this->logger->debug('Graceful disconnect.', [
'class' => static::class,
'address' => $this->address,
]);
asyncCall(static function () use ($stream, $logger): \Generator {
try {
yield $stream->write(Command::cls());
} catch (\Throwable $e) {
$logger->warning($e->getMessage(), ['exception' => $e]);
}
$stream->close();
});
return;
}
try {
$stream->close();
} catch (ClosedException) {
}
($this->onCloseCallback)();
}
public function onConnect(callable $callback): static
{
$previous = $this->onConnectCallback;
$this->onConnectCallback = static function () use ($previous, $callback): void {
$previous();
$callback();
};
return $this;
}
public function onClose(callable $callback): static
{
$previous = $this->onCloseCallback;
$this->onCloseCallback = static function () use ($previous, $callback): void {
$previous();
$callback();
};
return $this;
}
/**
* @psalm-return Promise<null|string>
*/
protected function read(): Promise
{
return call(function (): \Generator {
try {
return yield $this->stream->read();
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
$this->close(false);
return new Failure($e);
}
});
}
/**
* @psalm-return Promise<void>
*/
protected function write(string $data): Promise
{
return call(function () use ($data): \Generator {
try {
return yield $this->stream->write($data);
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
$this->close(false);
return new Failure($e);
}
});
} }
protected function handleError(Frame\Error $error): void protected function handleError(Frame\Error $error): void
@ -244,7 +114,7 @@ abstract class Connection
} }
/** /**
* @psalm-return Promise<Frame\Response> * @return Promise<Frame\Response>
*/ */
private function response(Stream $stream, Buffer $buffer): Promise private function response(Stream $stream, Buffer $buffer): Promise
{ {

View File

@ -6,19 +6,15 @@ namespace Nsq;
use Amp\Failure; use Amp\Failure;
use Amp\Promise; use Amp\Promise;
use Amp\Success;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Exception\ConsumerException; use Nsq\Exception\ConsumerException;
use Nsq\Frame\Response; use Nsq\Frame\Response;
use Nsq\Stream\NullStream;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger; use Psr\Log\NullLogger;
use function Amp\asyncCall; use function Amp\asyncCall;
use function Amp\call; use function Amp\call;
/**
* @internal
*/
final class Consumer extends Connection final class Consumer extends Connection
{ {
private int $rdy = 0; private int $rdy = 0;
@ -29,35 +25,20 @@ final class Consumer extends Connection
private $onMessage; private $onMessage;
public function __construct( public function __construct(
string $address, private string $address,
/** private string $topic,
* @readonly private string $channel,
*/
public string $topic,
/**
* @readonly
*/
public string $channel,
callable $onMessage, callable $onMessage,
ClientConfig $clientConfig, ClientConfig $clientConfig,
LoggerInterface $logger, private LoggerInterface $logger,
) { ) {
parent::__construct( parent::__construct(
$address, $this->address,
$clientConfig, $clientConfig,
$logger, $this->logger,
); );
$this->onMessage = $onMessage; $this->onMessage = $onMessage;
$context = compact('address', 'topic', 'channel');
$this->onConnect(function () use ($context): void {
$this->logger->debug('Consumer connected.', $context);
});
$this->onClose(function () use ($context): void {
$this->logger->debug('Consumer disconnected.', $context);
});
$this->logger->debug('Consumer created.', $context);
} }
public static function create( public static function create(
@ -80,7 +61,7 @@ final class Consumer extends Connection
public function connect(): Promise public function connect(): Promise
{ {
if ($this->isConnected()) { if (!$this->stream instanceof NullStream) {
return call(static function (): void { return call(static function (): void {
}); });
} }
@ -88,57 +69,58 @@ final class Consumer extends Connection
return call(function (): \Generator { return call(function (): \Generator {
yield parent::connect(); yield parent::connect();
$buffer = new Buffer(); $this->run();
});
}
private function run(): void
{
$buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator {
yield $this->stream->write(Command::sub($this->topic, $this->channel));
if (null !== ($chunk = yield $this->stream->read())) {
$buffer->append($chunk);
}
/** @var Response $response */
$response = Parser::parse($buffer);
if (!$response->isOk()) {
return new Failure(new ConsumerException('Fail subscription.'));
}
yield $this->rdy(2500);
/** @phpstan-ignore-next-line */
asyncCall(function () use ($buffer): \Generator { asyncCall(function () use ($buffer): \Generator {
yield $this->write(Command::sub($this->topic, $this->channel)); while (null !== $chunk = yield $this->stream->read()) {
if (null !== ($chunk = yield $this->read())) {
$buffer->append($chunk); $buffer->append($chunk);
}
/** @var Response $response */ while ($frame = Parser::parse($buffer)) {
$response = Parser::parse($buffer); switch (true) {
case $frame instanceof Frame\Response:
if (!$response->isOk()) { if ($frame->isHeartBeat()) {
return new Failure(new ConsumerException('Fail subscription.')); yield $this->stream->write(Command::nop());
}
yield $this->rdy(1);
/** @phpstan-ignore-next-line */
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->write(Command::nop());
break;
}
throw ConsumerException::response($frame);
case $frame instanceof Frame\Error:
$this->handleError($frame);
break; break;
case $frame instanceof Frame\Message: }
asyncCall($this->onMessage, Message::compose($frame, $this));
break; throw ConsumerException::response($frame);
} case $frame instanceof Frame\Error:
$this->handleError($frame);
if ($this->rdy !== $this->clientConfig->rdyCount) { break;
yield $this->rdy($this->clientConfig->rdyCount); case $frame instanceof Frame\Message:
} asyncCall($this->onMessage, Message::compose($frame, $this));
break;
} }
} }
}
$this->close(false); $this->stream = new NullStream();
});
}); });
}); });
} }
@ -146,53 +128,32 @@ final class Consumer extends Connection
/** /**
* Update RDY state (indicate you are ready to receive N messages). * Update RDY state (indicate you are ready to receive N messages).
* *
* @psalm-return Promise<bool> * @return Promise<void>
*/ */
public function rdy(int $count): Promise public function rdy(int $count): Promise
{ {
if (!$this->isConnected()) {
return new Success(false);
}
if ($this->rdy === $count) { if ($this->rdy === $count) {
return new Success(true); return call(static function (): void {
});
} }
$this->rdy = $count; $this->rdy = $count;
return call(function () use ($count): \Generator { return $this->stream->write(Command::rdy($count));
try {
yield $this->write(Command::rdy($count));
return true;
} catch (\Throwable) {
return false;
}
});
} }
/** /**
* Finish a message (indicate successful processing). * Finish a message (indicate successful processing).
* *
* @psalm-return Promise<bool> * @return Promise<void>
* *
* @internal * @internal
*/ */
public function fin(string $id): Promise public function fin(string $id): Promise
{ {
if (!$this->isConnected()) { --$this->rdy;
return new Success(false);
}
return call(function () use ($id): \Generator { return $this->stream->write(Command::fin($id));
try {
yield $this->write(Command::fin($id));
return true;
} catch (\Throwable) {
return false;
}
});
} }
/** /**
@ -201,48 +162,26 @@ final class Consumer extends Connection
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out * be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
* behaves identically to an explicit REQ. * behaves identically to an explicit REQ.
* *
* @psalm-return Promise<bool> * @return Promise<void>
* *
* @internal * @internal
*/ */
public function req(string $id, int $timeout): Promise public function req(string $id, int $timeout): Promise
{ {
if (!$this->isConnected()) { --$this->rdy;
return new Success(false);
}
return call(function () use ($id, $timeout): \Generator { return $this->stream->write(Command::req($id, $timeout));
try {
yield $this->write(Command::req($id, $timeout));
return true;
} catch (\Throwable) {
return false;
}
});
} }
/** /**
* Reset the timeout for an in-flight message. * Reset the timeout for an in-flight message.
* *
* @psalm-return Promise<bool> * @return Promise<void>
* *
* @internal * @internal
*/ */
public function touch(string $id): Promise public function touch(string $id): Promise
{ {
if (!$this->isConnected()) { return $this->stream->write(Command::touch($id));
return new Success(false);
}
return call(function () use ($id): \Generator {
try {
yield $this->write(Command::touch($id));
return true;
} catch (\Throwable) {
return false;
}
});
} }
} }

View File

@ -1,18 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Psr\Log\LogLevel;
final class LookupException extends NsqException
{
public function level(): string
{
return match ($this->getMessage()) {
'TOPIC_NOT_FOUND' => LogLevel::DEBUG,
default => LogLevel::WARNING,
};
}
}

View File

@ -1,9 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
final class StreamException extends NsqException
{
}

View File

@ -6,9 +6,10 @@ namespace Nsq;
abstract class Frame abstract class Frame
{ {
public const TYPE_RESPONSE = 0; public const TYPE_RESPONSE = 0,
public const TYPE_ERROR = 1; TYPE_ERROR = 1,
public const TYPE_MESSAGE = 2; TYPE_MESSAGE = 2
;
public function __construct(public int $type) public function __construct(public int $type)
{ {

View File

@ -30,7 +30,7 @@ final class Response extends Frame
} }
/** /**
* @psalm-return array<mixed, mixed> * @return array<mixed, mixed>
*/ */
public function toArray(): array public function toArray(): array
{ {

View File

@ -1,274 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Amp\Deferred;
use Amp\Dns\DnsException;
use Amp\Http\Client\DelegateHttpClient;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\NullCancellationToken;
use Amp\Promise;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Exception\LookupException;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function Amp\asyncCall;
use function Amp\call;
use function Amp\delay;
final class Lookup
{
/**
* @psalm-var array<string, array<string, Lookup\Producer>>
*/
private array $producers = [];
private array $consumers = [];
private array $running = [];
private array $topicWatchers = [];
public function __construct(
private array $addresses,
private LookupConfig $config,
private LoggerInterface $logger,
private DelegateHttpClient $httpClient,
) {
}
public static function create(
string | array $address,
LookupConfig $config = null,
LoggerInterface $logger = null,
DelegateHttpClient $httpClient = null,
): self {
return new self(
(array) $address,
$config ?? new LookupConfig(),
$logger ?? new NullLogger(),
$httpClient ?? HttpClientBuilder::buildDefault(),
);
}
/**
* @psalm-return Promise<Lookup\Producer[]>
*/
public function nodes(): Promise
{
return call(function (): \Generator {
$requestHandler = function (string $uri): \Generator {
/** @var Response $response */
$response = yield $this->httpClient->request(new Request($uri.'/nodes'), new NullCancellationToken());
try {
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
} catch (LookupException $e) {
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
return null;
}
};
$promises = [];
foreach ($this->addresses as $address) {
$promises[$address] = call($requestHandler, $address);
}
$nodes = [];
/** @var Lookup\Response $response */
foreach (yield $promises as $response) {
foreach ($response->producers as $producer) {
$nodes[$producer->toTcpUri()] = $producer;
}
}
return array_values($nodes);
});
}
public function stop(): void
{
$this->producers = [];
$this->consumers = [];
$this->running = [];
$this->topicWatchers = [];
$this->logger->info('Lookup stopped.');
}
/**
* @psalm-suppress InvalidPropertyAssignmentValue
*/
public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void
{
if (null !== ($this->running[$topic][$channel] ?? null)) {
throw new \InvalidArgumentException('Subscription already exists.');
}
$this->running[$topic][$channel] = true;
asyncCall(function () use ($topic, $channel, $onMessage, $config): \Generator {
while (true) {
if (null === ($this->running[$topic][$channel] ?? null)) {
return;
}
/** @phpstan-ignore-next-line */
$producers = $this->producers[$topic] ??= new Deferred();
if ($producers instanceof Deferred) {
/** @var array<string, Lookup\Producer> $producers */
$producers = yield $producers->promise();
}
foreach (array_diff_key($this->consumers, $producers) as $address => $producer) {
unset($this->consumers[$address]);
}
foreach ($producers as $address => $producer) {
if (null !== ($this->consumers[$address][$topic][$channel] ?? null)) {
continue;
}
$this->keepConnection(
Consumer::create(
$address,
$topic,
$channel,
$onMessage,
$config,
$this->logger,
),
);
}
yield delay($this->config->pollingInterval);
}
});
$this->watch($topic);
$this->logger->info('Subscribed.', compact('topic', 'channel'));
}
public function unsubscribe(string $topic, string $channel): void
{
if (null === ($this->running[$topic][$channel] ?? null)) {
$this->logger->debug('Not subscribed.', compact('topic', 'channel'));
return;
}
unset($this->running[$topic][$channel]);
if ([] === $this->running[$topic]) {
unset($this->running[$topic]);
}
$this->logger->info('Unsubscribed.', compact('topic', 'channel'));
}
private function keepConnection(Consumer $consumer): void
{
$this->consumers[$consumer->address][$consumer->topic][$consumer->channel] = $consumer;
asyncCall(function () use ($consumer): \Generator {
while (null !== ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
try {
yield $consumer->connect();
} catch (DnsException $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
unset(
$this->consumers[$consumer->address],
$this->producers[$consumer->topic][$consumer->address],
);
return;
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
yield delay($this->config->pollingInterval);
continue;
}
while (true) {
/** @phpstan-ignore-next-line */
if (null === ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
$consumer->close();
return;
}
if (!$consumer->isConnected()) {
break;
}
yield delay(500);
}
}
});
}
private function watch(string $topic): void
{
if (\array_key_exists($topic, $this->topicWatchers)) {
return;
}
$this->topicWatchers[$topic] = true;
asyncCall(function () use ($topic): \Generator {
$cancellationToken = new NullCancellationToken();
$requestHandler = function (string $uri) use ($topic, $cancellationToken): \Generator {
$this->logger->debug('Lookup', compact('topic'));
/** @var Response $response */
$response = yield $this->httpClient->request(new Request($uri.'/lookup?topic='.$topic), $cancellationToken);
try {
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
} catch (LookupException $e) {
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
return null;
}
};
while (\array_key_exists($topic, $this->running)) {
$promises = [];
foreach ($this->addresses as $address) {
$promises[$address] = call($requestHandler, $address);
}
/** @var Lookup\Response[] $responses */
$responses = yield $promises;
$producers = [];
foreach ($responses as $response) {
foreach ($response->producers as $producer) {
$producers[$producer->toTcpUri()] = $producer;
}
}
/** @phpstan-ignore-next-line */
if (($deferred = ($this->producers[$topic] ?? null)) instanceof Deferred) {
$deferred->resolve($producers);
}
$this->producers[$topic] = $producers;
yield delay($this->config->pollingInterval);
}
unset($this->topicWatchers[$topic]);
});
}
}

View File

@ -1,39 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Lookup;
final class Producer
{
public function __construct(
public string $broadcastAddress,
public string $hostname,
public string $remoteAddress,
public int $tcpPort,
public int $httpPort,
public string $version,
public array $tombstones,
public array $topics,
) {
}
public static function fromArray(array $array): self
{
return new self(
$array['broadcast_address'],
$array['hostname'],
$array['remote_address'],
$array['tcp_port'],
$array['http_port'],
$array['version'],
$array['tombstones'] ?? [],
$array['topics'] ?? [],
);
}
public function toTcpUri(): string
{
return sprintf('%s:%s', $this->broadcastAddress, $this->tcpPort);
}
}

View File

@ -1,34 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Lookup;
use Nsq\Exception\LookupException;
final class Response
{
/**
* @param string[] $channels
* @param Producer[] $producers
*/
public function __construct(
public array $channels,
public array $producers,
) {
}
public static function fromJson(string $json): self
{
$array = json_decode($json, true, flags: JSON_THROW_ON_ERROR);
if (\array_key_exists('message', $array)) {
throw new LookupException($array['message']);
}
return new self(
$array['channels'] ?? [],
array_map([Producer::class, 'fromArray'], $array['producers']),
);
}
}

View File

@ -6,6 +6,7 @@ namespace Nsq;
use Amp\Promise; use Amp\Promise;
use Nsq\Exception\MessageException; use Nsq\Exception\MessageException;
use function Amp\call;
final class Message final class Message
{ {
@ -31,51 +32,51 @@ final class Message
); );
} }
public function isProcessed(): bool
{
return $this->processed;
}
/** /**
* @psalm-return Promise<bool> * @return Promise<void>
*/ */
public function finish(): Promise public function finish(): Promise
{ {
$this->markAsProcessedOrFail(); return call(function (): \Generator {
if ($this->processed) {
throw MessageException::processed($this);
}
return $this->consumer->fin($this->id); yield $this->consumer->fin($this->id);
$this->processed = true;
});
} }
/** /**
* @psalm-param positive-int|0 $timeout * @return Promise<void>
*
* @psalm-return Promise<bool>
*/ */
public function requeue(int $timeout): Promise public function requeue(int $timeout): Promise
{ {
$this->markAsProcessedOrFail(); return call(function () use ($timeout): \Generator {
if ($this->processed) {
throw MessageException::processed($this);
}
return $this->consumer->req($this->id, $timeout); yield $this->consumer->req($this->id, $timeout);
$this->processed = true;
});
} }
/** /**
* @psalm-return Promise<bool> * @return Promise<void>
*/ */
public function touch(): Promise public function touch(): Promise
{ {
if ($this->processed) { return call(function (): \Generator {
throw MessageException::processed($this); if ($this->processed) {
} throw MessageException::processed($this);
}
return $this->consumer->touch($this->id); yield $this->consumer->touch($this->id);
}
private function markAsProcessedOrFail(): void $this->processed = true;
{ });
if ($this->processed) {
throw MessageException::processed($this);
}
$this->processed = true;
} }
} }

View File

@ -5,38 +5,16 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Amp\Promise; use Amp\Promise;
use Amp\Success;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Exception\NsqException; use Nsq\Exception\NsqException;
use Nsq\Stream\NullStream;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger; use Psr\Log\NullLogger;
use function Amp\asyncCall; use function Amp\asyncCall;
use function Amp\call; use function Amp\call;
final class Producer extends Connection final class Producer extends Connection
{ {
public function __construct(
string $address,
ClientConfig $clientConfig,
LoggerInterface $logger,
) {
parent::__construct(
$address,
$clientConfig,
$logger,
);
$context = compact('address');
$this->onConnect(function () use ($context): void {
$this->logger->debug('Producer connected.', $context);
});
$this->onClose(function () use ($context): void {
$this->logger->debug('Producer disconnected.', $context);
});
$this->logger->debug('Producer created.', $context);
}
public static function create( public static function create(
string $address, string $address,
ClientConfig $clientConfig = null, ClientConfig $clientConfig = null,
@ -51,7 +29,7 @@ final class Producer extends Connection
public function connect(): Promise public function connect(): Promise
{ {
if ($this->isConnected()) { if (!$this->stream instanceof NullStream) {
return call(static function (): void { return call(static function (): void {
}); });
} }
@ -59,72 +37,63 @@ final class Producer extends Connection
return call(function (): \Generator { return call(function (): \Generator {
yield parent::connect(); yield parent::connect();
$buffer = new Buffer(); $this->run();
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->write(Command::nop());
}
// Ok received
break;
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
default:
throw new NsqException('Unreachable statement.');
}
}
}
$this->close(false);
});
}); });
} }
/** /**
* @param array<int, string>|string $body * @param array<int, string>|string $body
* *
* @psalm-param positive-int|0 $delay * @return Promise<void>
*
* @psalm-return Promise<bool>
*/ */
public function publish(string $topic, string | array $body, int $delay = null): Promise public function publish(string $topic, string | array $body, int $delay = 0): Promise
{ {
if (!$this->isConnected()) { if (0 < $delay) {
return new Success(false); return call(
function (array $bodies) use ($topic, $delay): \Generator {
foreach ($bodies as $body) {
yield $this->stream->write(Command::dpub($topic, $body, $delay));
}
},
(array) $body,
);
} }
return call( $command = \is_array($body)
function (iterable $commands): \Generator { ? Command::mpub($topic, $body)
try { : Command::pub($topic, $body);
foreach ($commands as $command) {
yield $this->write($command);
}
return true; return $this->stream->write($command);
} catch (\Throwable) { }
return false;
} private function run(): void
}, {
(static function () use ($topic, $body, $delay): \Generator { $buffer = new Buffer();
if (\is_array($body) && null === $delay) {
yield Command::mpub($topic, $body); asyncCall(function () use ($buffer): \Generator {
} elseif (null !== $delay) { while (null !== $chunk = yield $this->stream->read()) {
foreach ((array) $body as $content) { $buffer->append($chunk);
yield Command::dpub($topic, $content, $delay);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->stream->write(Command::nop());
}
// Ok received
break;
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
default:
throw new NsqException('Unreachable statement.');
} }
} else {
yield Command::pub($topic, $body);
} }
})(), }
);
$this->stream = new NullStream();
});
} }
} }

View File

@ -4,23 +4,19 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Amp\ByteStream\ClosedException;
use Amp\Promise; use Amp\Promise;
interface Stream interface Stream
{ {
/** /**
* @psalm-return Promise<null|string> * @return Promise<null|string>
*/ */
public function read(): Promise; public function read(): Promise;
/** /**
* @psalm-return Promise<void> * @return Promise<void>
*/ */
public function write(string $data): Promise; public function write(string $data): Promise;
/**
* @throws ClosedException
*/
public function close(): void; public function close(): void;
} }

View File

@ -5,38 +5,14 @@ declare(strict_types=1);
namespace Nsq\Stream; namespace Nsq\Stream;
use Amp\Promise; use Amp\Promise;
use Nsq\Buffer; use Nsq\Exception\NsqException;
use Nsq\Exception\StreamException;
use Nsq\Stream; use Nsq\Stream;
use function Amp\call;
class GzipStream implements Stream class GzipStream implements Stream
{ {
private ?\InflateContext $inflate = null; public function __construct(private Stream $stream)
private ?\DeflateContext $deflate = null;
private Buffer $buffer;
public function __construct(private Stream $stream, private int $level, string $bytes = '')
{ {
/** @var false|\InflateContext $inflate */ throw new NsqException('GzipStream not implemented yet.');
$inflate = @inflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
/** @var \DeflateContext|false $deflate */
$deflate = @deflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
if (false === $inflate) {
throw new StreamException('Failed initializing inflate context');
}
if (false === $deflate) {
throw new StreamException('Failed initializing deflate context');
}
$this->inflate = $inflate;
$this->deflate = $deflate;
$this->buffer = new Buffer($bytes);
} }
/** /**
@ -44,33 +20,7 @@ class GzipStream implements Stream
*/ */
public function read(): Promise public function read(): Promise
{ {
return call(function (): \Generator { return $this->stream->read();
if (null === $this->inflate) {
return null;
}
if ($this->buffer->empty()) {
$chunk = yield $this->stream->read();
if (null !== $chunk) {
$this->buffer->append($chunk);
}
}
$data = $this->buffer->flush();
if ('' === $data) {
return null;
}
/** @psalm-suppress UndefinedFunction,InvalidArgument */
$decompressed = inflate_add($this->inflate, $data, ZLIB_SYNC_FLUSH);
if (false === $decompressed) {
throw new StreamException('Failed adding data to deflate context');
}
return $decompressed;
});
} }
/** /**
@ -78,27 +28,11 @@ class GzipStream implements Stream
*/ */
public function write(string $data): Promise public function write(string $data): Promise
{ {
if (null === $this->deflate) { return $this->stream->write($data);
throw new StreamException('The stream has already been closed');
}
/** @psalm-suppress UndefinedFunction,InvalidArgument */
$compressed = deflate_add($this->deflate, $data, ZLIB_SYNC_FLUSH);
if (false === $compressed) {
throw new StreamException('Failed adding data to deflate context');
}
return $this->stream->write($compressed);
} }
/**
* {@inheritDoc}
*/
public function close(): void public function close(): void
{ {
$this->stream->close(); $this->stream->close();
$this->inflate = null;
$this->deflate = null;
} }
} }

View File

@ -4,11 +4,10 @@ declare(strict_types=1);
namespace Nsq\Stream; namespace Nsq\Stream;
use Amp\Failure;
use Amp\Promise; use Amp\Promise;
use Amp\Success; use Amp\Success;
use Nsq\Exception\NsqException;
use Nsq\Stream; use Nsq\Stream;
use function Amp\call;
final class NullStream implements Stream final class NullStream implements Stream
{ {
@ -25,7 +24,8 @@ final class NullStream implements Stream
*/ */
public function write(string $data): Promise public function write(string $data): Promise
{ {
return new Failure(new NsqException('Connection closed.')); return call(static function (): void {
});
} }
/** /**

View File

@ -8,25 +8,24 @@ use Amp\Promise;
use Nsq\Buffer; use Nsq\Buffer;
use Nsq\Exception\SnappyException; use Nsq\Exception\SnappyException;
use Nsq\Stream; use Nsq\Stream;
use function Amp\call; use function Amp\call;
class SnappyStream implements Stream class SnappyStream implements Stream
{ {
private const IDENTIFIER = [0xFF, 0x06, 0x00, 0x00, 0x73, 0x4E, 0x61, 0x50, 0x70, 0x59]; private const IDENTIFIER = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
private const SIZE_HEADER = 4; private const SIZE_HEADER = 4;
private const SIZE_CHECKSUM = 4; private const SIZE_CHECKSUM = 4;
private const SIZE_CHUNK = 65536; private const SIZE_CHUNK = 65536;
private const TYPE_IDENTIFIER = 0xFF; private const TYPE_IDENTIFIER = 0xff;
private const TYPE_COMPRESSED = 0x00; private const TYPE_COMPRESSED = 0x00;
private const TYPE_UNCOMPRESSED = 0x01; private const TYPE_UNCOMPRESSED = 0x01;
private const TYPE_PADDING = 0xFE; private const TYPE_PADDING = 0xfe;
private Buffer $buffer; private Buffer $buffer;
public function __construct(private Stream $stream, string $bytes = '') public function __construct(private Stream $stream, string $bytes = '')
{ {
if (!\function_exists('snappy_uncompress') || !\function_exists('snappy_compress')) { if (!\function_exists('snappy_uncompress')) {
throw SnappyException::notInstalled(); throw SnappyException::notInstalled();
} }
@ -46,7 +45,7 @@ class SnappyStream implements Stream
$type = $this->buffer->readUInt32LE(); $type = $this->buffer->readUInt32LE();
$size = $type >> 8; $size = $type >> 8;
$type &= 0xFF; $type &= 0xff;
while ($this->buffer->size() < $size && null !== ($chunk = yield $this->stream->read())) { while ($this->buffer->size() < $size && null !== ($chunk = yield $this->stream->read())) {
$this->buffer->append($chunk); $this->buffer->append($chunk);
@ -60,7 +59,6 @@ class SnappyStream implements Stream
case self::TYPE_COMPRESSED: case self::TYPE_COMPRESSED:
$this->buffer->discard(self::SIZE_CHECKSUM); $this->buffer->discard(self::SIZE_CHECKSUM);
/** @psalm-suppress UndefinedFunction */
return snappy_uncompress($this->buffer->consume($size - self::SIZE_HEADER)); return snappy_uncompress($this->buffer->consume($size - self::SIZE_HEADER));
case self::TYPE_UNCOMPRESSED: case self::TYPE_UNCOMPRESSED:
$this->buffer->discard(self::SIZE_CHECKSUM); $this->buffer->discard(self::SIZE_CHECKSUM);
@ -80,7 +78,6 @@ class SnappyStream implements Stream
public function write(string $data): Promise public function write(string $data): Promise
{ {
return call(function () use ($data): Promise { return call(function () use ($data): Promise {
/** @var string $result */
$result = pack('CCCCCCCCCC', ...self::IDENTIFIER); $result = pack('CCCCCCCCCC', ...self::IDENTIFIER);
foreach (str_split($data, self::SIZE_CHUNK) as $chunk) { foreach (str_split($data, self::SIZE_CHUNK) as $chunk) {
@ -96,27 +93,23 @@ class SnappyStream implements Stream
$this->stream->close(); $this->stream->close();
} }
/**
* @psalm-suppress PossiblyFalseArgument
*/
private function compress(string $uncompressed): string private function compress(string $uncompressed): string
{ {
/** @psalm-suppress UndefinedFunction */
$compressed = snappy_compress($uncompressed); $compressed = snappy_compress($uncompressed);
\assert(\is_string($compressed));
[$type, $data] = \strlen($compressed) <= 0.875 * \strlen($uncompressed) [$type, $data] = \strlen($compressed) <= 0.875 * \strlen($uncompressed)
? [self::TYPE_COMPRESSED, $compressed] ? [self::TYPE_COMPRESSED, $compressed]
: [self::TYPE_UNCOMPRESSED, $uncompressed]; : [self::TYPE_UNCOMPRESSED, $uncompressed];
/** @psalm-suppress PossiblyFalseArgument */ /** @phpstan-ignore-next-line */
$unpacked = unpack('N', hash('crc32c', $uncompressed, true)); $checksum = unpack('N', hash('crc32c', $uncompressed, true))[1];
\assert(\is_array($unpacked)); $checksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff;
$checksum = $unpacked[1];
$checksum = (($checksum >> 15) | ($checksum << 17)) + 0xA282EAD8 & 0xFFFFFFFF;
$size = (\strlen($data) + 4) << 8; $size = (\strlen($data) + 4) << 8;
/** @psalm-suppress PossiblyFalseOperand */
return pack('VV', $type + $size, $checksum).$data; return pack('VV', $type + $size, $checksum).$data;
} }
} }

View File

@ -5,22 +5,20 @@ declare(strict_types=1);
namespace Nsq\Stream; namespace Nsq\Stream;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\ClientTlsContext;
use Amp\Socket\ConnectContext; use Amp\Socket\ConnectContext;
use Amp\Socket\EncryptableSocket; use Amp\Socket\Socket;
use Nsq\Stream; use Nsq\Stream;
use function Amp\call; use function Amp\call;
use function Amp\Socket\connect; use function Amp\Socket\connect;
class SocketStream implements Stream class SocketStream implements Stream
{ {
public function __construct(private EncryptableSocket $socket) public function __construct(private Socket $socket)
{ {
} }
/** /**
* @psalm-return Promise<self> * @return Promise<self>
*/ */
public static function connect(string $uri, int $timeout = 0, int $attempts = 0, bool $noDelay = false): Promise public static function connect(string $uri, int $timeout = 0, int $attempts = 0, bool $noDelay = false): Promise
{ {
@ -39,17 +37,12 @@ class SocketStream implements Stream
$context = $context->withTcpNoDelay(); $context = $context->withTcpNoDelay();
} }
$context = $context->withTlsContext(
(new ClientTlsContext(''))
->withoutPeerVerification(),
);
return new self(yield connect($uri, $context)); return new self(yield connect($uri, $context));
}); });
} }
/** /**
* @psalm-return Promise<null|string> * @return Promise<null|string>
*/ */
public function read(): Promise public function read(): Promise
{ {
@ -57,26 +50,15 @@ class SocketStream implements Stream
} }
/** /**
* @psalm-return Promise<void> * @return Promise<void>
*/ */
public function write(string $data): Promise public function write(string $data): Promise
{ {
return $this->socket->write($data); return $this->socket->write($data);
} }
/**
* {@inheritDoc}
*/
public function close(): void public function close(): void
{ {
$this->socket->close(); $this->socket->close();
} }
/**
* @psalm-return Promise<void>
*/
public function setupTls(): Promise
{
return $this->socket->setupTls();
}
} }

View File

@ -7,11 +7,6 @@ use PHPUnit\Framework\TestCase;
final class ClientConfigTest extends TestCase final class ClientConfigTest extends TestCase
{ {
public function testNegotiationPayload(): void
{
self::assertJson((new ClientConfig())->asNegotiationPayload());
}
public function testInvalidCompression(): void public function testInvalidCompression(): void
{ {
$this->expectException(InvalidArgumentException::class); $this->expectException(InvalidArgumentException::class);
@ -19,42 +14,4 @@ final class ClientConfigTest extends TestCase
new ClientConfig(deflate: true, snappy: true); new ClientConfig(deflate: true, snappy: true);
} }
/**
* @dataProvider array
*/
public function testFromArray(array $data, array $expected): void
{
self::assertSame($expected, get_object_vars(ClientConfig::fromArray($data)));
}
public function array(): Generator
{
$default = get_object_vars(new ClientConfig());
yield 'Empty array' => [[], $default];
yield 'With wrong keys' => [['bla' => 'bla'], $default];
$custom = [
'authSecret' => 'SomeSecret',
'connectTimeout' => 100,
'maxAttempts' => 10,
'tcpNoDelay' => true,
'rdyCount' => 1,
'featureNegotiation' => true,
'clientId' => 'SomeGorgeousClientId',
'deflate' => true,
'deflateLevel' => 1,
'heartbeatInterval' => 31111,
'hostname' => gethostname(),
'msgTimeout' => 59999,
'sampleRate' => 25,
'tls' => true,
'snappy' => false,
'userAgent' => 'nsqphp/test',
];
yield 'Full filled' => [$custom, $custom];
}
} }

View File

@ -55,7 +55,6 @@ final class MessageTest extends TestCase
*/ */
public function messages(): Generator public function messages(): Generator
{ {
/** @phpstan-ignore-next-line */
$consumer = $this->createMock(Consumer::class); $consumer = $this->createMock(Consumer::class);
$consumer->method('fin')->willReturn(new Success()); $consumer->method('fin')->willReturn(new Success());
$consumer->method('touch')->willReturn(new Success()); $consumer->method('touch')->willReturn(new Success());

View File

@ -1,37 +0,0 @@
<?php
declare(strict_types=1);
use Nsq\Config\ClientConfig;
use PHPUnit\Framework\TestCase;
final class NsqTest extends TestCase
{
/**
* @dataProvider configs
*/
public function test(ClientConfig $clientConfig): void
{
self::markTestSkipped('');
}
/**
* @return Generator<string, array<int, ClientConfig>>
*/
public function configs(): Generator
{
yield 'default' => [
new ClientConfig(
heartbeatInterval: 3000,
snappy: false,
),
];
yield 'snappy' => [
new ClientConfig(
heartbeatInterval: 3000,
snappy: true,
),
];
}
}

View File

@ -3,12 +3,44 @@
declare(strict_types=1); declare(strict_types=1);
use Amp\Loop; use Amp\Loop;
use Amp\Process\Process;
use Nsq\Exception\ServerException; use Nsq\Exception\ServerException;
use Nsq\Producer; use Nsq\Producer;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use function Amp\ByteStream\buffer;
use function Amp\Promise\wait;
final class ProducerTest extends TestCase final class ProducerTest extends TestCase
{ {
/**
* @param array<int, string>|string $body
*
* @dataProvider data
*/
public function testPublish(array | string $body, string $expected): void
{
$process = new Process(
sprintf('bin/nsq_tail -topic %s -channel default -nsqd-tcp-address localhost:4150 -n 1', __FUNCTION__),
);
wait($process->start());
$producer = Producer::create('tcp://localhost:4150');
wait($producer->connect());
wait($producer->publish(__FUNCTION__, $body));
wait($process->join());
self::assertSame($expected, wait(buffer($process->getStdout())));
}
/**
* @return Generator<int, array{0: string|array, 1: string}>
*/
public function data(): Generator
{
yield ['Test Message One!', 'Test Message One!'.PHP_EOL];
}
/** /**
* @dataProvider pubFails * @dataProvider pubFails
*/ */
@ -26,6 +58,9 @@ final class ProducerTest extends TestCase
}); });
} }
/**
* @return Generator<string, array>
*/
public function pubFails(): Generator public function pubFails(): Generator
{ {
yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0']; yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0'];