55 Commits

Author SHA1 Message Date
350f08c2c1 Fix and suppress static analyze errors 2022-09-11 23:07:46 +03:00
bd8d13692f Remove infection, revert dg/bypass-finals 2022-09-11 22:15:11 +03:00
9414042f57 Bump min req php ^8.1 2022-09-11 22:10:39 +03:00
c88fdb6354 Bump dependencies 2022-09-11 22:05:17 +03:00
3bf8444e06 tests: add testNegotiationPayload 2022-09-11 22:03:37 +03:00
d4b29c69db Fix some psalm/phpstan errors 2021-09-19 02:34:53 +03:00
381ba88f8d Simplify discovery producer example 2021-09-19 02:13:11 +03:00
43ab797ee0 Add Makefile 2021-09-19 02:10:09 +03:00
45048320a8 Lookup: graceful shutdown 2021-09-19 01:57:53 +03:00
be696f17b5 Lookup: reconnections 2021-09-19 01:30:30 +03:00
d0307b47e6 Fix wrong return type 2021-09-19 00:44:31 +03:00
bac144582e Use psalm-return instead of return for generics 2021-09-19 00:44:20 +03:00
5e43f9b0df Prevent multiple watchers for one topic 2021-09-19 00:42:26 +03:00
e1725ea140 Return bool from public api of Producer and Consumer to indicate of success of process and prevent throwing exception from it 2021-09-19 00:42:26 +03:00
b130b09a82 Decrease default ready count to 100 2021-09-18 23:17:53 +03:00
155e896543 Unset previous producers on lookup newer 2021-09-18 23:01:56 +03:00
2c07fb1aca Remove unnecessary Producer::run and Consumer::run 2021-09-18 22:55:44 +03:00
97b3d8206c Refactor logging 2021-09-18 22:55:44 +03:00
c8cd41777f Close connection on error while read or write 2021-09-18 22:46:03 +03:00
fcd1f256ff Add Connection::onCallback, chain callbacks onClose 2021-09-18 22:39:43 +03:00
530b03974e Graceful close connection 2021-09-18 22:17:43 +03:00
083bc44c9c Decouple Consumer and Producer from direct call Streams 2021-09-18 22:17:43 +03:00
aa3333bfba Stream::close can throw Amp\ByteStream\ClosedException 2021-09-18 22:14:56 +03:00
679573ad0a update readme 2021-09-15 01:54:35 +03:00
32f226942e Add discovery examples 2021-09-15 01:53:20 +03:00
dbe312ddf1 Lookup fix cancelling topic watcher 2021-09-15 01:35:26 +03:00
47194b30f3 Refactor Lookup 2021-09-15 01:26:47 +03:00
5bab748952 Add Lookup/Producer::toTcpUri 2021-09-15 01:26:20 +03:00
43b92e9bb9 cs fix 2021-09-15 01:26:06 +03:00
c505d62533 Add logging on Consumer disconnected 2021-09-15 01:26:00 +03:00
f3f67bedd3 add Connection::onClose 2021-09-15 01:25:47 +03:00
af4e86d219 php-cs-fixer add types_spaces rule 2021-09-15 01:25:27 +03:00
53d7813198 Add Connection::isConnected() 2021-09-13 23:49:52 +03:00
56cdda1a0d Add some logs 2021-09-13 23:49:15 +03:00
7984d09e83 Return Failure on try to write to NullStream 2021-09-13 23:48:17 +03:00
3c7686405d Dynamic log level on LookupException 2021-09-13 23:47:45 +03:00
6428a1ec33 Make Lookup/{Response,Producer} compatible to /nodes endpoint 2021-09-13 23:47:20 +03:00
65adecde3f Leave one nsq service in favor of use docker-compose scale 2021-09-13 23:46:24 +03:00
e3e83212c4 deflate 2021-09-04 01:56:49 +03:00
ca2c2ee633 Lookup (#14) 2021-09-04 01:55:34 +03:00
e9dce19e25 Fix: Message::touch must not mark message as processed 2021-07-08 18:35:31 +03:00
a913fb0907 Add rdyCount to ClientConfig 2021-07-08 17:41:59 +03:00
6c8c30a1bd Bump ergebnis/composer-normalize to 2.15 2021-06-17 18:23:56 +03:00
a08bccac45 phpstan: ignore Return type of call to method PHPUnit\Framework\TestCase::createMock() contains unresolvable type. 2021-06-17 16:28:24 +03:00
55480ab2c0 Bump friendsofphp/php-cs-fixer up to 3.0 2021-06-17 16:21:26 +03:00
4546c5085f Bump infection/infection up to 0.23 2021-06-17 16:18:55 +03:00
b6f4726002 Fix: ClientConfig::$connectionTimeout is a milliseconds 2021-06-17 15:46:51 +03:00
e3c64f6f09 tls 2021-06-09 20:08:15 +03:00
411fabb1f5 Fix: get default settings from real object 2021-06-09 18:20:46 +03:00
34847e2467 Mark ServerConfig as internal 2021-06-09 18:17:00 +03:00
6e50fa2258 Refactoring configs. Use connections settings on establishing connection. Create ClientConfig from array. 2021-06-09 18:15:45 +03:00
ca54c7ad80 Add symfony/var-dumper for dev 2021-06-09 18:13:17 +03:00
4c00fb0fd5 Message::requeue $timeout must be positive-int or zero 2021-06-09 17:08:46 +03:00
d3e1788d23 Refactoring: Simplify Message methods, add isProcessed method. 2021-06-09 16:33:18 +03:00
2fc7e37120 Producer::publish $delay must be positive-int or zero 2021-06-09 15:34:29 +03:00
40 changed files with 1327 additions and 373 deletions

1
.env Normal file
View File

@@ -0,0 +1 @@
NSQ_VERSION=1.2.1

View File

@@ -14,7 +14,7 @@ jobs:
os: os:
- ubuntu-latest - ubuntu-latest
php: php:
- '8.0' - '8.1'
dependencies: dependencies:
- lowest - lowest
- highest - highest
@@ -59,9 +59,6 @@ jobs:
run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest
if: ${{ matrix.dependencies == 'lowest' }} if: ${{ matrix.dependencies == 'lowest' }}
- name: Install nsq bin
run: curl -L https://github.com/nsqio/nsq/releases/download/v1.2.0/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz | tar xz --strip 1
- name: Run tests - name: Run tests
run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml
@@ -80,7 +77,7 @@ jobs:
- name: Setup PHP - name: Setup PHP
uses: shivammathur/setup-php@v2 uses: shivammathur/setup-php@v2
with: with:
php-version: '8.0' php-version: '8.1'
env: env:
update: true update: true
@@ -111,7 +108,7 @@ jobs:
- name: Setup PHP - name: Setup PHP
uses: shivammathur/setup-php@v2 uses: shivammathur/setup-php@v2
with: with:
php-version: '8.0' php-version: '8.1'
extensions: snappy-kjdev/php-ext-snappy@0.2.1 extensions: snappy-kjdev/php-ext-snappy@0.2.1
env: env:
update: true update: true
@@ -143,7 +140,7 @@ jobs:
- name: Setup PHP - name: Setup PHP
uses: shivammathur/setup-php@v2 uses: shivammathur/setup-php@v2
with: with:
php-version: '8.0' php-version: '8.1'
extensions: snappy-kjdev/php-ext-snappy@0.2.1 extensions: snappy-kjdev/php-ext-snappy@0.2.1
env: env:
update: true update: true
@@ -164,48 +161,3 @@ jobs:
- name: Run script - name: Run script
run: vendor/bin/psalm --output-format=github run: vendor/bin/psalm --output-format=github
infection:
name: Infection
runs-on: ubuntu-latest
services:
nsqd:
image: nsqio/nsq:v1.2.0
options: --entrypoint /nsqd
ports:
- 4150:4150
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.0'
coverage: pcov
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Install nsq bin
run: curl -L https://github.com/nsqio/nsq/releases/download/v1.2.0/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz | tar xz --strip 1
- name: Run script
env:
STRYKER_DASHBOARD_API_KEY: ${{ secrets.STRYKER_DASHBOARD_API_KEY }}
run: |
git fetch --depth=1 origin $GITHUB_BASE_REF
php vendor/bin/infection -j2 --git-diff-filter=A --git-diff-base=origin/$GITHUB_BASE_REF --logger-github --ignore-msi-with-no-mutations --only-covered

13
.gitignore vendored
View File

@@ -1,17 +1,6 @@
/vendor/ /vendor/
/composer.lock /composer.lock
/.php_cs.cache /.php-cs-fixer.cache
/.phpunit.result.cache /.phpunit.result.cache
/infection.log /infection.log
# Nsq
bin/nsq_stat
bin/nsq_tail
bin/nsq_to_file
bin/nsq_to_http
bin/nsq_to_nsq
bin/nsqadmin
bin/nsqd
bin/nsqlookupd
bin/to_nsq

45
.php-cs-fixer.php Normal file
View File

@@ -0,0 +1,45 @@
<?php
$finder = PhpCsFixer\Finder::create()
->in(__DIR__)
->exclude('vendor');
return (new PhpCsFixer\Config())
->setRiskyAllowed(true)
->setRules([
'@PhpCsFixer' => true,
'@PhpCsFixer:risky' => true,
'@PSR12' => true,
'@PSR12:risky' => true,
'blank_line_before_statement' => [
'statements' => [
'continue',
'do',
'exit',
'goto',
'if',
'return',
'switch',
'throw',
'try',
],
],
'declare_strict_types' => true,
'global_namespace_import' => [
'import_classes' => false,
'import_constants' => false,
'import_functions' => false,
],
'php_unit_internal_class' => false,
'php_unit_test_case_static_method_calls' => ['call_type' => 'self'],
'php_unit_test_class_requires_covers' => false,
'phpdoc_to_comment' => false,
'yoda_style' => true,
'trailing_comma_in_multiline' => [
'after_heredoc' => true,
'elements' => ['arrays', 'arguments', 'parameters'],
],
'types_spaces' => ['space' => 'single'],
])
->setFinder($finder)
->setCacheFile(__DIR__.'/.php-cs-fixer.cache');

View File

@@ -1,28 +0,0 @@
#!/usr/bin/env php
<?php
return (new PhpCsFixer\Config())
->setRiskyAllowed(true)
->setRules([
'@PhpCsFixer' => true,
'@PhpCsFixer:risky' => true,
'@PSR12' => true,
'@PSR12:risky' => true,
'braces' => [
'allow_single_line_closure' => true,
],
'blank_line_before_statement' => [
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try'],
],
'declare_strict_types' => true,
'global_namespace_import' => ['import_classes' => false, 'import_constants' => false, 'import_functions' => false],
'php_unit_internal_class' => false,
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
'php_unit_test_class_requires_covers' => false,
'phpdoc_to_comment' => false,
'yoda_style' => true,
])
->setFinder(
PhpCsFixer\Finder::create()
->in(__DIR__)
);

20
Makefile Normal file
View File

@@ -0,0 +1,20 @@
all: install composer-validate php-cs-fixer psalm phpstan phpunit
install:
composer install
psalm:
php vendor/bin/psalm
phpstan:
php vendor/bin/phpstan analyse
phpunit:
php vendor/bin/phpunit
php-cs-fixer:
php vendor/bin/php-cs-fixer fix
composer-validate:
composer validate

View File

@@ -31,10 +31,10 @@ Features
- [x] PUB - [x] PUB
- [x] SUB - [x] SUB
- [X] Feature Negotiation - [X] Feature Negotiation
- [ ] Discovery - [X] Discovery
- [ ] Backoff - [ ] Backoff
- [ ] TLS - [X] TLS
- [ ] Deflate - [X] Deflate
- [X] Snappy - [X] Snappy
- [X] Sampling - [X] Sampling
- [X] AUTH - [X] AUTH
@@ -80,6 +80,28 @@ $consumer = Consumer::create(
); );
``` ```
### Lookup
```php
use Nsq\Lookup;
use Nsq\Message;
$lookup = new Lookup('http://nsqlookupd0:4161');
$lookup = new Lookup(['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161']);
$callable = static function (Message $message): Generator {
yield $message->touch(); // Reset the timeout for an in-flight message
yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
yield $message->finish(); // Finish a message (indicate successful processing)
};
$lookup->subscribe(topic: 'topic', channel: 'channel', onMessage: $callable);
$lookup->subscribe(topic: 'anotherTopic', channel: 'channel', onMessage: $callable);
$lookup->unsubscribe(topic: 'local', channel: 'channel');
$lookup->stop(); // unsubscribe all
```
### Integrations ### Integrations
- [Symfony](https://github.com/nsqphp/NsqBundle) - [Symfony](https://github.com/nsqphp/NsqBundle)

View File

View File

@@ -11,34 +11,44 @@
} }
], ],
"require": { "require": {
"php": "^8.0.1", "php": "^8.1",
"ext-json": "*", "ext-json": "*",
"amphp/http-client": "^4.6",
"amphp/socket": "^1.1", "amphp/socket": "^1.1",
"composer/semver": "^3.2", "composer/semver": "^3.2",
"phpinnacle/buffer": "^1.2", "phpinnacle/buffer": "^1.2",
"psr/log": "^1.1" "psr/log": "^3.0"
}, },
"require-dev": { "require-dev": {
"amphp/log": "^1.1", "amphp/log": "^1.1",
"dg/bypass-finals": "^1.3", "dg/bypass-finals": "^1.3",
"ergebnis/composer-normalize": "9999999-dev", "ergebnis/composer-normalize": "^2.15",
"friendsofphp/php-cs-fixer": "^2.18", "friendsofphp/php-cs-fixer": "^3.4",
"infection/infection": "^0.20.2",
"nyholm/nsa": "^1.2", "nyholm/nsa": "^1.2",
"phpstan/phpstan": "^0.12.68", "phpstan/phpstan": "^1.8",
"phpstan/phpstan-phpunit": "^0.12.17", "phpstan/phpstan-phpunit": "^1.1",
"phpstan/phpstan-strict-rules": "^0.12.9", "phpstan/phpstan-strict-rules": "^1.3",
"phpunit/phpunit": "^9.5", "phpunit/phpunit": "^9.5",
"symfony/var-dumper": "^6.1",
"vimeo/psalm": "^4.4" "vimeo/psalm": "^4.4"
}, },
"config": { "config": {
"sort-packages": true "sort-packages": true,
"allow-plugins": {
"ergebnis/composer-normalize": true,
"infection/extension-installer": true
}
}, },
"autoload": { "autoload": {
"psr-4": { "psr-4": {
"Nsq\\": "src/" "Nsq\\": "src/"
} }
}, },
"autoload-dev": {
"files": [
"vendor/symfony/var-dumper/Resources/functions/dump.php"
]
},
"minimum-stability": "dev", "minimum-stability": "dev",
"prefer-stable": true, "prefer-stable": true,
"scripts": { "scripts": {

View File

@@ -2,23 +2,56 @@ version: '3.7'
services: services:
nsqd: nsqd:
image: nsqio/nsq:v1.2.0 image: nsqio/nsq:v${NSQ_VERSION}
labels: labels:
ru.grachevko.dhu: 'nsqd' ru.grachevko.dhu: 'nsqd'
command: /nsqd -log-level debug command: >-
# command: /nsqd nsqd
ports: --log-level debug
- 4150:4150 --lookupd-tcp-address nsqlookupd0:4160
- 4151:4151 --lookupd-tcp-address nsqlookupd1:4160
--lookupd-tcp-address nsqlookupd2:4160
nsqlookupd0:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd0'
command: /nsqlookupd -log-level debug
nsqlookupd1:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd1'
command: /nsqlookupd -log-level debug
nsqlookupd2:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd2'
command: /nsqlookupd -log-level debug
nsqadmin: nsqadmin:
image: nsqio/nsq:v1.2.0 image: nsqio/nsq:v${NSQ_VERSION}
labels: labels:
ru.grachevko.dhu: 'nsqadmin' ru.grachevko.dhu: 'nsqadmin'
command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171 command:
ports: - nsqadmin
- 4171:4171 - --http-address=0.0.0.0:4171
- --lookupd-http-address=nsqlookupd0:4161
- --lookupd-http-address=nsqlookupd1:4161
- --lookupd-http-address=nsqlookupd2:4161
depends_on:
- nsqlookupd0
- nsqlookupd1
- nsqlookupd2
tail: tail:
image: nsqio/nsq:v1.2.0 image: nsqio/nsq:v${NSQ_VERSION}
command: nsq_tail -channel nsq_tail -topic local -nsqd-tcp-address nsqd:4150 command: >-
nsq_tail
--channel nsq_tail
--topic local
--lookupd-http-address nsqlookupd1:4161
depends_on:
- nsqd
- nsqlookupd1

View File

@@ -0,0 +1,47 @@
<?php
declare(strict_types=1);
require dirname(__DIR__).'/../vendor/autoload.php';
use Amp\ByteStream;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Loop;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Lookup;
use Nsq\Message;
Loop::run(static function () {
$handler = new StreamHandler(ByteStream\getStdout());
$handler->setFormatter(new ConsoleFormatter());
$logger = new Logger('consumer', [$handler], [new PsrLogMessageProcessor()]);
$callable = static function (Message $message) {
yield $message->finish();
};
$clientConfig = new ClientConfig();
$lookupConfig = new LookupConfig();
$watcherId = Loop::repeat(5000, function () {
yield Amp\Dns\resolver()->reloadConfig();
});
$lookup = Lookup::create(
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
$lookupConfig,
$logger,
);
$lookup->subscribe('local', 'local', $callable, $clientConfig);
Loop::delay(10000, function () use ($lookup, $watcherId) {
$lookup->stop();
Loop::cancel($watcherId);
});
});

View File

@@ -0,0 +1,102 @@
<?php
declare(strict_types=1);
require dirname(__DIR__).'/../vendor/autoload.php';
use Amp\ByteStream;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Loop;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Lookup;
use Nsq\Producer;
use function Amp\asyncCall;
use function Amp\delay;
Loop::run(static function () {
$handler = new StreamHandler(ByteStream\getStdout());
$handler->setFormatter(new ConsoleFormatter());
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
$clientConfig = new ClientConfig();
/** @var Producer[] $producers */
$producers = [];
$lookupConfig = new LookupConfig();
$lookup = Lookup::create(
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
$lookupConfig,
$logger,
);
$isRunning = true;
asyncCall(static function () use ($lookup, $clientConfig, $logger, &$producers, &$isRunning) {
while ($isRunning) {
/** @var Lookup\Producer[] $nodes */
$nodes = yield $lookup->nodes();
foreach ($nodes as $node) {
$address = $node->toTcpUri();
if (array_key_exists($address, $producers)) {
continue;
}
asyncCall(function () use ($address, $clientConfig, $logger, &$producers) {
yield ($producers[$address] = Producer::create($address, $clientConfig, $logger))
->onClose(function () use (&$producers, $address) {
unset($producers[$address]);
})
->connect()
;
});
}
yield delay(5000);
yield Amp\Dns\resolver()->reloadConfig(); // for reload /etc/hosts
}
});
Loop::delay(5000, function () use (&$isRunning, $logger) {
$logger->info('Stopping producer.');
$isRunning = false;
});
$counter = 0;
while (true) {
if (!$isRunning) {
foreach ($producers as $producer) {
$producer->close();
}
break;
}
if ([] === $producers) {
yield delay(200);
continue;
}
$index = array_rand($producers);
$producer = $producers[$index];
if (!$producer->isConnected()) {
yield delay(100);
continue;
}
yield $producer->publish('local', 'This is message of count '.$counter++);
}
});

View File

@@ -14,6 +14,7 @@ use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Consumer; use Nsq\Consumer;
use Nsq\Message; use Nsq\Message;
use function Amp\call; use function Amp\call;
Loop::run(static function () { Loop::run(static function () {

View File

@@ -8,3 +8,9 @@ parameters:
paths: paths:
- src - src
- tests - tests
ignoreErrors:
-
message: '#no value type specified in iterable type array#'
paths:
- %currentWorkingDirectory%/src
- %currentWorkingDirectory%/tests

View File

@@ -1,6 +1,5 @@
<?xml version="1.0"?> <?xml version="1.0"?>
<psalm <psalm
allowPhpStormGenerics="true"
ignoreInternalFunctionFalseReturn="false" ignoreInternalFunctionFalseReturn="false"
ignoreInternalFunctionNullReturn="false" ignoreInternalFunctionNullReturn="false"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

View File

@@ -13,8 +13,11 @@ final class Buffer extends ByteBuffer
{ {
public function readUInt32LE(): int public function readUInt32LE(): int
{ {
/** @phpstan-ignore-next-line */ $unpacked = unpack('V', $this->consume(4));
return unpack('V', $this->consume(4))[1];
\assert(\is_array($unpacked) && \array_key_exists(1, $unpacked));
return $unpacked[1];
} }
public function consumeTimestamp(): int public function consumeTimestamp(): int

View File

@@ -13,7 +13,7 @@ use Composer\InstalledVersions;
* *
* @psalm-immutable * @psalm-immutable
*/ */
final class ClientConfig implements \JsonSerializable final class ClientConfig
{ {
/** /**
* @psalm-suppress ImpureFunctionCall * @psalm-suppress ImpureFunctionCall
@@ -26,9 +26,26 @@ final class ClientConfig implements \JsonSerializable
public ?string $authSecret = null, public ?string $authSecret = null,
/** /**
* The timeout for establishing a connection in seconds. * The timeout for establishing a connection in milliseconds.
*/ */
public int $connectTimeout = 10, public int $connectTimeout = 10000,
/**
* The max attempts for establishing a connection.
*/
public int $maxAttempts = 0,
/**
* Use tcp_nodelay for establishing a connection.
*/
public bool $tcpNoDelay = false,
public int $rdyCount = 100,
/**
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
* it will send back a JSON payload of supported features and metadata.
*/
public bool $featureNegotiation = true,
/** /**
* An identifier used to disambiguate this client (i.e. something specific to the consumer). * An identifier used to disambiguate this client (i.e. something specific to the consumer).
@@ -73,12 +90,6 @@ final class ClientConfig implements \JsonSerializable
*/ */
public int $sampleRate = 0, public int $sampleRate = 0,
/**
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
* it will send back a JSON payload of supported features and metadata.
*/
public bool $featureNegotiation = true,
/** /**
* Enable TLS for this connection. * Enable TLS for this connection.
*/ */
@@ -89,11 +100,6 @@ final class ClientConfig implements \JsonSerializable
*/ */
public bool $snappy = false, public bool $snappy = false,
/**
* The read timeout for connection sockets and for awaiting responses from nsq.
*/
public int $readTimeout = 5,
/** /**
* A string identifying the agent for this client in the spirit of HTTP. * A string identifying the agent for this client in the spirit of HTTP.
*/ */
@@ -114,12 +120,14 @@ final class ClientConfig implements \JsonSerializable
} }
} }
/** public static function fromArray(array $array): self
* @phpstan-ignore-next-line
*/
public function jsonSerialize(): array
{ {
return [ return new self(...array_intersect_key($array, get_class_vars(self::class)));
}
public function asNegotiationPayload(): string
{
$data = [
'client_id' => $this->clientId, 'client_id' => $this->clientId,
'deflate' => $this->deflate, 'deflate' => $this->deflate,
'deflate_level' => $this->deflateLevel, 'deflate_level' => $this->deflateLevel,
@@ -132,10 +140,7 @@ final class ClientConfig implements \JsonSerializable
'tls_v1' => $this->tls, 'tls_v1' => $this->tls,
'user_agent' => $this->userAgent, 'user_agent' => $this->userAgent,
]; ];
}
public function toString(): string return json_encode($data, JSON_THROW_ON_ERROR);
{
return json_encode($this, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
} }
} }

View File

@@ -0,0 +1,13 @@
<?php
declare(strict_types=1);
namespace Nsq\Config;
final class LookupConfig
{
public function __construct(
public int $pollingInterval = 10000,
) {
}
}

View File

@@ -8,8 +8,10 @@ namespace Nsq\Config;
* The configuration object that holds the config status for a single Connection. * The configuration object that holds the config status for a single Connection.
* *
* @psalm-immutable * @psalm-immutable
*
* @internal
*/ */
final class ConnectionConfig final class ServerConfig
{ {
public function __construct( public function __construct(
/** /**
@@ -82,9 +84,6 @@ final class ConnectionConfig
) { ) {
} }
/**
* @phpstan-ignore-next-line
*/
public static function fromArray(array $array): self public static function fromArray(array $array): self
{ {
return new self( return new self(

View File

@@ -4,9 +4,11 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Amp\ByteStream\ClosedException;
use Amp\Failure;
use Amp\Promise; use Amp\Promise;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Config\ConnectionConfig; use Nsq\Config\ServerConfig;
use Nsq\Exception\AuthenticationRequired; use Nsq\Exception\AuthenticationRequired;
use Nsq\Exception\NsqException; use Nsq\Exception\NsqException;
use Nsq\Frame\Response; use Nsq\Frame\Response;
@@ -15,6 +17,8 @@ use Nsq\Stream\NullStream;
use Nsq\Stream\SnappyStream; use Nsq\Stream\SnappyStream;
use Nsq\Stream\SocketStream; use Nsq\Stream\SocketStream;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use function Amp\asyncCall;
use function Amp\call; use function Amp\call;
/** /**
@@ -22,23 +26,45 @@ use function Amp\call;
*/ */
abstract class Connection abstract class Connection
{ {
protected Stream $stream; private Stream $stream;
/**
* @var callable
*/
private $onConnectCallback;
/**
* @var callable
*/
private $onCloseCallback;
public function __construct( public function __construct(
private string $address, /**
private ClientConfig $clientConfig, * @readonly
private LoggerInterface $logger, */
public string $address,
protected ClientConfig $clientConfig,
protected LoggerInterface $logger,
) { ) {
$this->stream = new NullStream(); $this->stream = new NullStream();
$this->onConnectCallback = static function (): void {
};
$this->onCloseCallback = static function (): void {
};
} }
public function __destruct() public function __destruct()
{ {
$this->close(); $this->close(false);
}
public function isConnected(): bool
{
return !$this->stream instanceof NullStream;
} }
/** /**
* @return Promise<void> * @psalm-return Promise<void>
*/ */
public function connect(): Promise public function connect(): Promise
{ {
@@ -46,16 +72,32 @@ abstract class Connection
$buffer = new Buffer(); $buffer = new Buffer();
/** @var SocketStream $stream */ /** @var SocketStream $stream */
$stream = yield SocketStream::connect($this->address); $stream = yield SocketStream::connect(
$this->address,
$this->clientConfig->connectTimeout,
$this->clientConfig->maxAttempts,
$this->clientConfig->tcpNoDelay,
);
yield $stream->write(Command::magic()); yield $stream->write(Command::magic());
yield $stream->write(Command::identify($this->clientConfig->toString())); yield $stream->write(Command::identify($this->clientConfig->asNegotiationPayload()));
/** @var Response $response */ /** @var Response $response */
$response = yield $this->response($stream, $buffer); $response = yield $this->response($stream, $buffer);
$connectionConfig = ConnectionConfig::fromArray($response->toArray()); $serverConfig = ServerConfig::fromArray($response->toArray());
if ($connectionConfig->snappy) { if ($serverConfig->tls) {
yield $stream->setupTls();
/** @var Response $response */
$response = yield $this->response($stream, $buffer);
if (!$response->isOk()) {
throw new NsqException();
}
}
if ($serverConfig->snappy) {
$stream = new SnappyStream($stream, $buffer->flush()); $stream = new SnappyStream($stream, $buffer->flush());
/** @var Response $response */ /** @var Response $response */
@@ -66,8 +108,8 @@ abstract class Connection
} }
} }
if ($connectionConfig->deflate) { if ($serverConfig->deflate) {
$stream = new GzipStream($stream); $stream = new GzipStream($stream, $serverConfig->deflateLevel, $buffer->flush());
/** @var Response $response */ /** @var Response $response */
$response = yield $this->response($stream, $buffer); $response = yield $this->response($stream, $buffer);
@@ -77,7 +119,7 @@ abstract class Connection
} }
} }
if ($connectionConfig->authRequired) { if ($serverConfig->authRequired) {
if (null === $this->clientConfig->authSecret) { if (null === $this->clientConfig->authSecret) {
throw new AuthenticationRequired(); throw new AuthenticationRequired();
} }
@@ -91,15 +133,103 @@ abstract class Connection
} }
$this->stream = $stream; $this->stream = $stream;
($this->onConnectCallback)();
}); });
} }
public function close(): void public function close(bool $graceful = true): void
{ {
// $this->stream->write(Command::cls()); if (!$this->isConnected()) {
return;
}
$this->stream->close(); $logger = $this->logger;
$this->stream = new NullStream(); [$stream, $this->stream] = [$this->stream, new NullStream()];
if ($graceful) {
$this->logger->debug('Graceful disconnect.', [
'class' => static::class,
'address' => $this->address,
]);
asyncCall(static function () use ($stream, $logger): \Generator {
try {
yield $stream->write(Command::cls());
} catch (\Throwable $e) {
$logger->warning($e->getMessage(), ['exception' => $e]);
}
$stream->close();
});
return;
}
try {
$stream->close();
} catch (ClosedException) {
}
($this->onCloseCallback)();
}
public function onConnect(callable $callback): static
{
$previous = $this->onConnectCallback;
$this->onConnectCallback = static function () use ($previous, $callback): void {
$previous();
$callback();
};
return $this;
}
public function onClose(callable $callback): static
{
$previous = $this->onCloseCallback;
$this->onCloseCallback = static function () use ($previous, $callback): void {
$previous();
$callback();
};
return $this;
}
/**
* @psalm-return Promise<null|string>
*/
protected function read(): Promise
{
return call(function (): \Generator {
try {
return yield $this->stream->read();
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
$this->close(false);
return new Failure($e);
}
});
}
/**
* @psalm-return Promise<void>
*/
protected function write(string $data): Promise
{
return call(function () use ($data): \Generator {
try {
return yield $this->stream->write($data);
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
$this->close(false);
return new Failure($e);
}
});
} }
protected function handleError(Frame\Error $error): void protected function handleError(Frame\Error $error): void
@@ -114,7 +244,7 @@ abstract class Connection
} }
/** /**
* @return Promise<Frame\Response> * @psalm-return Promise<Frame\Response>
*/ */
private function response(Stream $stream, Buffer $buffer): Promise private function response(Stream $stream, Buffer $buffer): Promise
{ {

View File

@@ -6,15 +6,19 @@ namespace Nsq;
use Amp\Failure; use Amp\Failure;
use Amp\Promise; use Amp\Promise;
use Amp\Success;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Exception\ConsumerException; use Nsq\Exception\ConsumerException;
use Nsq\Frame\Response; use Nsq\Frame\Response;
use Nsq\Stream\NullStream;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger; use Psr\Log\NullLogger;
use function Amp\asyncCall; use function Amp\asyncCall;
use function Amp\call; use function Amp\call;
/**
* @internal
*/
final class Consumer extends Connection final class Consumer extends Connection
{ {
private int $rdy = 0; private int $rdy = 0;
@@ -25,20 +29,35 @@ final class Consumer extends Connection
private $onMessage; private $onMessage;
public function __construct( public function __construct(
private string $address, string $address,
private string $topic, /**
private string $channel, * @readonly
*/
public string $topic,
/**
* @readonly
*/
public string $channel,
callable $onMessage, callable $onMessage,
ClientConfig $clientConfig, ClientConfig $clientConfig,
private LoggerInterface $logger, LoggerInterface $logger,
) { ) {
parent::__construct( parent::__construct(
$this->address, $address,
$clientConfig, $clientConfig,
$this->logger, $logger,
); );
$this->onMessage = $onMessage; $this->onMessage = $onMessage;
$context = compact('address', 'topic', 'channel');
$this->onConnect(function () use ($context): void {
$this->logger->debug('Consumer connected.', $context);
});
$this->onClose(function () use ($context): void {
$this->logger->debug('Consumer disconnected.', $context);
});
$this->logger->debug('Consumer created.', $context);
} }
public static function create( public static function create(
@@ -61,7 +80,7 @@ final class Consumer extends Connection
public function connect(): Promise public function connect(): Promise
{ {
if (!$this->stream instanceof NullStream) { if ($this->isConnected()) {
return call(static function (): void { return call(static function (): void {
}); });
} }
@@ -69,58 +88,57 @@ final class Consumer extends Connection
return call(function (): \Generator { return call(function (): \Generator {
yield parent::connect(); yield parent::connect();
$this->run(); $buffer = new Buffer();
});
}
private function run(): void
{
$buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator {
yield $this->stream->write(Command::sub($this->topic, $this->channel));
if (null !== ($chunk = yield $this->stream->read())) {
$buffer->append($chunk);
}
/** @var Response $response */
$response = Parser::parse($buffer);
if (!$response->isOk()) {
return new Failure(new ConsumerException('Fail subscription.'));
}
yield $this->rdy(2500);
/** @phpstan-ignore-next-line */
asyncCall(function () use ($buffer): \Generator { asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->stream->read()) { yield $this->write(Command::sub($this->topic, $this->channel));
if (null !== ($chunk = yield $this->read())) {
$buffer->append($chunk); $buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->stream->write(Command::nop());
break;
}
throw ConsumerException::response($frame);
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
case $frame instanceof Frame\Message:
asyncCall($this->onMessage, Message::compose($frame, $this));
break;
}
}
} }
$this->stream = new NullStream(); /** @var Response $response */
$response = Parser::parse($buffer);
if (!$response->isOk()) {
return new Failure(new ConsumerException('Fail subscription.'));
}
yield $this->rdy(1);
/** @phpstan-ignore-next-line */
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->write(Command::nop());
break;
}
throw ConsumerException::response($frame);
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
case $frame instanceof Frame\Message:
asyncCall($this->onMessage, Message::compose($frame, $this));
break;
}
if ($this->rdy !== $this->clientConfig->rdyCount) {
yield $this->rdy($this->clientConfig->rdyCount);
}
}
}
$this->close(false);
});
}); });
}); });
} }
@@ -128,32 +146,53 @@ final class Consumer extends Connection
/** /**
* Update RDY state (indicate you are ready to receive N messages). * Update RDY state (indicate you are ready to receive N messages).
* *
* @return Promise<void> * @psalm-return Promise<bool>
*/ */
public function rdy(int $count): Promise public function rdy(int $count): Promise
{ {
if (!$this->isConnected()) {
return new Success(false);
}
if ($this->rdy === $count) { if ($this->rdy === $count) {
return call(static function (): void { return new Success(true);
});
} }
$this->rdy = $count; $this->rdy = $count;
return $this->stream->write(Command::rdy($count)); return call(function () use ($count): \Generator {
try {
yield $this->write(Command::rdy($count));
return true;
} catch (\Throwable) {
return false;
}
});
} }
/** /**
* Finish a message (indicate successful processing). * Finish a message (indicate successful processing).
* *
* @return Promise<void> * @psalm-return Promise<bool>
* *
* @internal * @internal
*/ */
public function fin(string $id): Promise public function fin(string $id): Promise
{ {
--$this->rdy; if (!$this->isConnected()) {
return new Success(false);
}
return $this->stream->write(Command::fin($id)); return call(function () use ($id): \Generator {
try {
yield $this->write(Command::fin($id));
return true;
} catch (\Throwable) {
return false;
}
});
} }
/** /**
@@ -162,26 +201,48 @@ final class Consumer extends Connection
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out * be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
* behaves identically to an explicit REQ. * behaves identically to an explicit REQ.
* *
* @return Promise<void> * @psalm-return Promise<bool>
* *
* @internal * @internal
*/ */
public function req(string $id, int $timeout): Promise public function req(string $id, int $timeout): Promise
{ {
--$this->rdy; if (!$this->isConnected()) {
return new Success(false);
}
return $this->stream->write(Command::req($id, $timeout)); return call(function () use ($id, $timeout): \Generator {
try {
yield $this->write(Command::req($id, $timeout));
return true;
} catch (\Throwable) {
return false;
}
});
} }
/** /**
* Reset the timeout for an in-flight message. * Reset the timeout for an in-flight message.
* *
* @return Promise<void> * @psalm-return Promise<bool>
* *
* @internal * @internal
*/ */
public function touch(string $id): Promise public function touch(string $id): Promise
{ {
return $this->stream->write(Command::touch($id)); if (!$this->isConnected()) {
return new Success(false);
}
return call(function () use ($id): \Generator {
try {
yield $this->write(Command::touch($id));
return true;
} catch (\Throwable) {
return false;
}
});
} }
} }

View File

@@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Psr\Log\LogLevel;
final class LookupException extends NsqException
{
public function level(): string
{
return match ($this->getMessage()) {
'TOPIC_NOT_FOUND' => LogLevel::DEBUG,
default => LogLevel::WARNING,
};
}
}

View File

@@ -0,0 +1,9 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
final class StreamException extends NsqException
{
}

View File

@@ -6,10 +6,9 @@ namespace Nsq;
abstract class Frame abstract class Frame
{ {
public const TYPE_RESPONSE = 0, public const TYPE_RESPONSE = 0;
TYPE_ERROR = 1, public const TYPE_ERROR = 1;
TYPE_MESSAGE = 2 public const TYPE_MESSAGE = 2;
;
public function __construct(public int $type) public function __construct(public int $type)
{ {

View File

@@ -30,7 +30,7 @@ final class Response extends Frame
} }
/** /**
* @return array<mixed, mixed> * @psalm-return array<mixed, mixed>
*/ */
public function toArray(): array public function toArray(): array
{ {

274
src/Lookup.php Normal file
View File

@@ -0,0 +1,274 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Amp\Deferred;
use Amp\Dns\DnsException;
use Amp\Http\Client\DelegateHttpClient;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\NullCancellationToken;
use Amp\Promise;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Exception\LookupException;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function Amp\asyncCall;
use function Amp\call;
use function Amp\delay;
final class Lookup
{
/**
* @psalm-var array<string, array<string, Lookup\Producer>>
*/
private array $producers = [];
private array $consumers = [];
private array $running = [];
private array $topicWatchers = [];
public function __construct(
private array $addresses,
private LookupConfig $config,
private LoggerInterface $logger,
private DelegateHttpClient $httpClient,
) {
}
public static function create(
string | array $address,
LookupConfig $config = null,
LoggerInterface $logger = null,
DelegateHttpClient $httpClient = null,
): self {
return new self(
(array) $address,
$config ?? new LookupConfig(),
$logger ?? new NullLogger(),
$httpClient ?? HttpClientBuilder::buildDefault(),
);
}
/**
* @psalm-return Promise<Lookup\Producer[]>
*/
public function nodes(): Promise
{
return call(function (): \Generator {
$requestHandler = function (string $uri): \Generator {
/** @var Response $response */
$response = yield $this->httpClient->request(new Request($uri.'/nodes'), new NullCancellationToken());
try {
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
} catch (LookupException $e) {
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
return null;
}
};
$promises = [];
foreach ($this->addresses as $address) {
$promises[$address] = call($requestHandler, $address);
}
$nodes = [];
/** @var Lookup\Response $response */
foreach (yield $promises as $response) {
foreach ($response->producers as $producer) {
$nodes[$producer->toTcpUri()] = $producer;
}
}
return array_values($nodes);
});
}
public function stop(): void
{
$this->producers = [];
$this->consumers = [];
$this->running = [];
$this->topicWatchers = [];
$this->logger->info('Lookup stopped.');
}
/**
* @psalm-suppress InvalidPropertyAssignmentValue
*/
public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void
{
if (null !== ($this->running[$topic][$channel] ?? null)) {
throw new \InvalidArgumentException('Subscription already exists.');
}
$this->running[$topic][$channel] = true;
asyncCall(function () use ($topic, $channel, $onMessage, $config): \Generator {
while (true) {
if (null === ($this->running[$topic][$channel] ?? null)) {
return;
}
/** @phpstan-ignore-next-line */
$producers = $this->producers[$topic] ??= new Deferred();
if ($producers instanceof Deferred) {
/** @var array<string, Lookup\Producer> $producers */
$producers = yield $producers->promise();
}
foreach (array_diff_key($this->consumers, $producers) as $address => $producer) {
unset($this->consumers[$address]);
}
foreach ($producers as $address => $producer) {
if (null !== ($this->consumers[$address][$topic][$channel] ?? null)) {
continue;
}
$this->keepConnection(
Consumer::create(
$address,
$topic,
$channel,
$onMessage,
$config,
$this->logger,
),
);
}
yield delay($this->config->pollingInterval);
}
});
$this->watch($topic);
$this->logger->info('Subscribed.', compact('topic', 'channel'));
}
public function unsubscribe(string $topic, string $channel): void
{
if (null === ($this->running[$topic][$channel] ?? null)) {
$this->logger->debug('Not subscribed.', compact('topic', 'channel'));
return;
}
unset($this->running[$topic][$channel]);
if ([] === $this->running[$topic]) {
unset($this->running[$topic]);
}
$this->logger->info('Unsubscribed.', compact('topic', 'channel'));
}
private function keepConnection(Consumer $consumer): void
{
$this->consumers[$consumer->address][$consumer->topic][$consumer->channel] = $consumer;
asyncCall(function () use ($consumer): \Generator {
while (null !== ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
try {
yield $consumer->connect();
} catch (DnsException $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
unset(
$this->consumers[$consumer->address],
$this->producers[$consumer->topic][$consumer->address],
);
return;
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
yield delay($this->config->pollingInterval);
continue;
}
while (true) {
/** @phpstan-ignore-next-line */
if (null === ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
$consumer->close();
return;
}
if (!$consumer->isConnected()) {
break;
}
yield delay(500);
}
}
});
}
private function watch(string $topic): void
{
if (\array_key_exists($topic, $this->topicWatchers)) {
return;
}
$this->topicWatchers[$topic] = true;
asyncCall(function () use ($topic): \Generator {
$cancellationToken = new NullCancellationToken();
$requestHandler = function (string $uri) use ($topic, $cancellationToken): \Generator {
$this->logger->debug('Lookup', compact('topic'));
/** @var Response $response */
$response = yield $this->httpClient->request(new Request($uri.'/lookup?topic='.$topic), $cancellationToken);
try {
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
} catch (LookupException $e) {
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
return null;
}
};
while (\array_key_exists($topic, $this->running)) {
$promises = [];
foreach ($this->addresses as $address) {
$promises[$address] = call($requestHandler, $address);
}
/** @var Lookup\Response[] $responses */
$responses = yield $promises;
$producers = [];
foreach ($responses as $response) {
foreach ($response->producers as $producer) {
$producers[$producer->toTcpUri()] = $producer;
}
}
/** @phpstan-ignore-next-line */
if (($deferred = ($this->producers[$topic] ?? null)) instanceof Deferred) {
$deferred->resolve($producers);
}
$this->producers[$topic] = $producers;
yield delay($this->config->pollingInterval);
}
unset($this->topicWatchers[$topic]);
});
}
}

39
src/Lookup/Producer.php Normal file
View File

@@ -0,0 +1,39 @@
<?php
declare(strict_types=1);
namespace Nsq\Lookup;
final class Producer
{
public function __construct(
public string $broadcastAddress,
public string $hostname,
public string $remoteAddress,
public int $tcpPort,
public int $httpPort,
public string $version,
public array $tombstones,
public array $topics,
) {
}
public static function fromArray(array $array): self
{
return new self(
$array['broadcast_address'],
$array['hostname'],
$array['remote_address'],
$array['tcp_port'],
$array['http_port'],
$array['version'],
$array['tombstones'] ?? [],
$array['topics'] ?? [],
);
}
public function toTcpUri(): string
{
return sprintf('%s:%s', $this->broadcastAddress, $this->tcpPort);
}
}

34
src/Lookup/Response.php Normal file
View File

@@ -0,0 +1,34 @@
<?php
declare(strict_types=1);
namespace Nsq\Lookup;
use Nsq\Exception\LookupException;
final class Response
{
/**
* @param string[] $channels
* @param Producer[] $producers
*/
public function __construct(
public array $channels,
public array $producers,
) {
}
public static function fromJson(string $json): self
{
$array = json_decode($json, true, flags: JSON_THROW_ON_ERROR);
if (\array_key_exists('message', $array)) {
throw new LookupException($array['message']);
}
return new self(
$array['channels'] ?? [],
array_map([Producer::class, 'fromArray'], $array['producers']),
);
}
}

View File

@@ -6,7 +6,6 @@ namespace Nsq;
use Amp\Promise; use Amp\Promise;
use Nsq\Exception\MessageException; use Nsq\Exception\MessageException;
use function Amp\call;
final class Message final class Message
{ {
@@ -32,51 +31,51 @@ final class Message
); );
} }
public function isProcessed(): bool
{
return $this->processed;
}
/** /**
* @return Promise<void> * @psalm-return Promise<bool>
*/ */
public function finish(): Promise public function finish(): Promise
{ {
return call(function (): \Generator { $this->markAsProcessedOrFail();
if ($this->processed) {
throw MessageException::processed($this);
}
yield $this->consumer->fin($this->id); return $this->consumer->fin($this->id);
$this->processed = true;
});
} }
/** /**
* @return Promise<void> * @psalm-param positive-int|0 $timeout
*
* @psalm-return Promise<bool>
*/ */
public function requeue(int $timeout): Promise public function requeue(int $timeout): Promise
{ {
return call(function () use ($timeout): \Generator { $this->markAsProcessedOrFail();
if ($this->processed) {
throw MessageException::processed($this);
}
yield $this->consumer->req($this->id, $timeout); return $this->consumer->req($this->id, $timeout);
$this->processed = true;
});
} }
/** /**
* @return Promise<void> * @psalm-return Promise<bool>
*/ */
public function touch(): Promise public function touch(): Promise
{ {
return call(function (): \Generator { if ($this->processed) {
if ($this->processed) { throw MessageException::processed($this);
throw MessageException::processed($this); }
}
yield $this->consumer->touch($this->id); return $this->consumer->touch($this->id);
}
$this->processed = true; private function markAsProcessedOrFail(): void
}); {
if ($this->processed) {
throw MessageException::processed($this);
}
$this->processed = true;
} }
} }

View File

@@ -5,16 +5,38 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Amp\Promise; use Amp\Promise;
use Amp\Success;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Exception\NsqException; use Nsq\Exception\NsqException;
use Nsq\Stream\NullStream;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger; use Psr\Log\NullLogger;
use function Amp\asyncCall; use function Amp\asyncCall;
use function Amp\call; use function Amp\call;
final class Producer extends Connection final class Producer extends Connection
{ {
public function __construct(
string $address,
ClientConfig $clientConfig,
LoggerInterface $logger,
) {
parent::__construct(
$address,
$clientConfig,
$logger,
);
$context = compact('address');
$this->onConnect(function () use ($context): void {
$this->logger->debug('Producer connected.', $context);
});
$this->onClose(function () use ($context): void {
$this->logger->debug('Producer disconnected.', $context);
});
$this->logger->debug('Producer created.', $context);
}
public static function create( public static function create(
string $address, string $address,
ClientConfig $clientConfig = null, ClientConfig $clientConfig = null,
@@ -29,7 +51,7 @@ final class Producer extends Connection
public function connect(): Promise public function connect(): Promise
{ {
if (!$this->stream instanceof NullStream) { if ($this->isConnected()) {
return call(static function (): void { return call(static function (): void {
}); });
} }
@@ -37,63 +59,72 @@ final class Producer extends Connection
return call(function (): \Generator { return call(function (): \Generator {
yield parent::connect(); yield parent::connect();
$this->run(); $buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->write(Command::nop());
}
// Ok received
break;
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
default:
throw new NsqException('Unreachable statement.');
}
}
}
$this->close(false);
});
}); });
} }
/** /**
* @param array<int, string>|string $body * @param array<int, string>|string $body
* *
* @return Promise<void> * @psalm-param positive-int|0 $delay
*
* @psalm-return Promise<bool>
*/ */
public function publish(string $topic, string | array $body, int $delay = 0): Promise public function publish(string $topic, string | array $body, int $delay = null): Promise
{ {
if (0 < $delay) { if (!$this->isConnected()) {
return call( return new Success(false);
function (array $bodies) use ($topic, $delay): \Generator {
foreach ($bodies as $body) {
yield $this->stream->write(Command::dpub($topic, $body, $delay));
}
},
(array) $body,
);
} }
$command = \is_array($body) return call(
? Command::mpub($topic, $body) function (iterable $commands): \Generator {
: Command::pub($topic, $body); try {
foreach ($commands as $command) {
return $this->stream->write($command); yield $this->write($command);
}
private function run(): void
{
$buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->stream->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->stream->write(Command::nop());
}
// Ok received
break;
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
default:
throw new NsqException('Unreachable statement.');
} }
}
}
$this->stream = new NullStream(); return true;
}); } catch (\Throwable) {
return false;
}
},
(static function () use ($topic, $body, $delay): \Generator {
if (\is_array($body) && null === $delay) {
yield Command::mpub($topic, $body);
} elseif (null !== $delay) {
foreach ((array) $body as $content) {
yield Command::dpub($topic, $content, $delay);
}
} else {
yield Command::pub($topic, $body);
}
})(),
);
} }
} }

View File

@@ -4,19 +4,23 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Amp\ByteStream\ClosedException;
use Amp\Promise; use Amp\Promise;
interface Stream interface Stream
{ {
/** /**
* @return Promise<null|string> * @psalm-return Promise<null|string>
*/ */
public function read(): Promise; public function read(): Promise;
/** /**
* @return Promise<void> * @psalm-return Promise<void>
*/ */
public function write(string $data): Promise; public function write(string $data): Promise;
/**
* @throws ClosedException
*/
public function close(): void; public function close(): void;
} }

View File

@@ -5,14 +5,38 @@ declare(strict_types=1);
namespace Nsq\Stream; namespace Nsq\Stream;
use Amp\Promise; use Amp\Promise;
use Nsq\Exception\NsqException; use Nsq\Buffer;
use Nsq\Exception\StreamException;
use Nsq\Stream; use Nsq\Stream;
use function Amp\call;
class GzipStream implements Stream class GzipStream implements Stream
{ {
public function __construct(private Stream $stream) private ?\InflateContext $inflate = null;
private ?\DeflateContext $deflate = null;
private Buffer $buffer;
public function __construct(private Stream $stream, private int $level, string $bytes = '')
{ {
throw new NsqException('GzipStream not implemented yet.'); /** @var false|\InflateContext $inflate */
$inflate = @inflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
/** @var \DeflateContext|false $deflate */
$deflate = @deflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
if (false === $inflate) {
throw new StreamException('Failed initializing inflate context');
}
if (false === $deflate) {
throw new StreamException('Failed initializing deflate context');
}
$this->inflate = $inflate;
$this->deflate = $deflate;
$this->buffer = new Buffer($bytes);
} }
/** /**
@@ -20,7 +44,33 @@ class GzipStream implements Stream
*/ */
public function read(): Promise public function read(): Promise
{ {
return $this->stream->read(); return call(function (): \Generator {
if (null === $this->inflate) {
return null;
}
if ($this->buffer->empty()) {
$chunk = yield $this->stream->read();
if (null !== $chunk) {
$this->buffer->append($chunk);
}
}
$data = $this->buffer->flush();
if ('' === $data) {
return null;
}
/** @psalm-suppress UndefinedFunction,InvalidArgument */
$decompressed = inflate_add($this->inflate, $data, ZLIB_SYNC_FLUSH);
if (false === $decompressed) {
throw new StreamException('Failed adding data to deflate context');
}
return $decompressed;
});
} }
/** /**
@@ -28,11 +78,27 @@ class GzipStream implements Stream
*/ */
public function write(string $data): Promise public function write(string $data): Promise
{ {
return $this->stream->write($data); if (null === $this->deflate) {
throw new StreamException('The stream has already been closed');
}
/** @psalm-suppress UndefinedFunction,InvalidArgument */
$compressed = deflate_add($this->deflate, $data, ZLIB_SYNC_FLUSH);
if (false === $compressed) {
throw new StreamException('Failed adding data to deflate context');
}
return $this->stream->write($compressed);
} }
/**
* {@inheritDoc}
*/
public function close(): void public function close(): void
{ {
$this->stream->close(); $this->stream->close();
$this->inflate = null;
$this->deflate = null;
} }
} }

View File

@@ -4,10 +4,11 @@ declare(strict_types=1);
namespace Nsq\Stream; namespace Nsq\Stream;
use Amp\Failure;
use Amp\Promise; use Amp\Promise;
use Amp\Success; use Amp\Success;
use Nsq\Exception\NsqException;
use Nsq\Stream; use Nsq\Stream;
use function Amp\call;
final class NullStream implements Stream final class NullStream implements Stream
{ {
@@ -24,8 +25,7 @@ final class NullStream implements Stream
*/ */
public function write(string $data): Promise public function write(string $data): Promise
{ {
return call(static function (): void { return new Failure(new NsqException('Connection closed.'));
});
} }
/** /**

View File

@@ -8,24 +8,25 @@ use Amp\Promise;
use Nsq\Buffer; use Nsq\Buffer;
use Nsq\Exception\SnappyException; use Nsq\Exception\SnappyException;
use Nsq\Stream; use Nsq\Stream;
use function Amp\call; use function Amp\call;
class SnappyStream implements Stream class SnappyStream implements Stream
{ {
private const IDENTIFIER = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59]; private const IDENTIFIER = [0xFF, 0x06, 0x00, 0x00, 0x73, 0x4E, 0x61, 0x50, 0x70, 0x59];
private const SIZE_HEADER = 4; private const SIZE_HEADER = 4;
private const SIZE_CHECKSUM = 4; private const SIZE_CHECKSUM = 4;
private const SIZE_CHUNK = 65536; private const SIZE_CHUNK = 65536;
private const TYPE_IDENTIFIER = 0xff; private const TYPE_IDENTIFIER = 0xFF;
private const TYPE_COMPRESSED = 0x00; private const TYPE_COMPRESSED = 0x00;
private const TYPE_UNCOMPRESSED = 0x01; private const TYPE_UNCOMPRESSED = 0x01;
private const TYPE_PADDING = 0xfe; private const TYPE_PADDING = 0xFE;
private Buffer $buffer; private Buffer $buffer;
public function __construct(private Stream $stream, string $bytes = '') public function __construct(private Stream $stream, string $bytes = '')
{ {
if (!\function_exists('snappy_uncompress')) { if (!\function_exists('snappy_uncompress') || !\function_exists('snappy_compress')) {
throw SnappyException::notInstalled(); throw SnappyException::notInstalled();
} }
@@ -45,7 +46,7 @@ class SnappyStream implements Stream
$type = $this->buffer->readUInt32LE(); $type = $this->buffer->readUInt32LE();
$size = $type >> 8; $size = $type >> 8;
$type &= 0xff; $type &= 0xFF;
while ($this->buffer->size() < $size && null !== ($chunk = yield $this->stream->read())) { while ($this->buffer->size() < $size && null !== ($chunk = yield $this->stream->read())) {
$this->buffer->append($chunk); $this->buffer->append($chunk);
@@ -59,6 +60,7 @@ class SnappyStream implements Stream
case self::TYPE_COMPRESSED: case self::TYPE_COMPRESSED:
$this->buffer->discard(self::SIZE_CHECKSUM); $this->buffer->discard(self::SIZE_CHECKSUM);
/** @psalm-suppress UndefinedFunction */
return snappy_uncompress($this->buffer->consume($size - self::SIZE_HEADER)); return snappy_uncompress($this->buffer->consume($size - self::SIZE_HEADER));
case self::TYPE_UNCOMPRESSED: case self::TYPE_UNCOMPRESSED:
$this->buffer->discard(self::SIZE_CHECKSUM); $this->buffer->discard(self::SIZE_CHECKSUM);
@@ -78,6 +80,7 @@ class SnappyStream implements Stream
public function write(string $data): Promise public function write(string $data): Promise
{ {
return call(function () use ($data): Promise { return call(function () use ($data): Promise {
/** @var string $result */
$result = pack('CCCCCCCCCC', ...self::IDENTIFIER); $result = pack('CCCCCCCCCC', ...self::IDENTIFIER);
foreach (str_split($data, self::SIZE_CHUNK) as $chunk) { foreach (str_split($data, self::SIZE_CHUNK) as $chunk) {
@@ -93,23 +96,27 @@ class SnappyStream implements Stream
$this->stream->close(); $this->stream->close();
} }
/**
* @psalm-suppress PossiblyFalseArgument
*/
private function compress(string $uncompressed): string private function compress(string $uncompressed): string
{ {
/** @psalm-suppress UndefinedFunction */
$compressed = snappy_compress($uncompressed); $compressed = snappy_compress($uncompressed);
\assert(\is_string($compressed));
[$type, $data] = \strlen($compressed) <= 0.875 * \strlen($uncompressed) [$type, $data] = \strlen($compressed) <= 0.875 * \strlen($uncompressed)
? [self::TYPE_COMPRESSED, $compressed] ? [self::TYPE_COMPRESSED, $compressed]
: [self::TYPE_UNCOMPRESSED, $uncompressed]; : [self::TYPE_UNCOMPRESSED, $uncompressed];
/** @phpstan-ignore-next-line */ /** @psalm-suppress PossiblyFalseArgument */
$checksum = unpack('N', hash('crc32c', $uncompressed, true))[1]; $unpacked = unpack('N', hash('crc32c', $uncompressed, true));
$checksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff; \assert(\is_array($unpacked));
$checksum = $unpacked[1];
$checksum = (($checksum >> 15) | ($checksum << 17)) + 0xA282EAD8 & 0xFFFFFFFF;
$size = (\strlen($data) + 4) << 8; $size = (\strlen($data) + 4) << 8;
/** @psalm-suppress PossiblyFalseOperand */
return pack('VV', $type + $size, $checksum).$data; return pack('VV', $type + $size, $checksum).$data;
} }
} }

View File

@@ -5,20 +5,22 @@ declare(strict_types=1);
namespace Nsq\Stream; namespace Nsq\Stream;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\ClientTlsContext;
use Amp\Socket\ConnectContext; use Amp\Socket\ConnectContext;
use Amp\Socket\Socket; use Amp\Socket\EncryptableSocket;
use Nsq\Stream; use Nsq\Stream;
use function Amp\call; use function Amp\call;
use function Amp\Socket\connect; use function Amp\Socket\connect;
class SocketStream implements Stream class SocketStream implements Stream
{ {
public function __construct(private Socket $socket) public function __construct(private EncryptableSocket $socket)
{ {
} }
/** /**
* @return Promise<self> * @psalm-return Promise<self>
*/ */
public static function connect(string $uri, int $timeout = 0, int $attempts = 0, bool $noDelay = false): Promise public static function connect(string $uri, int $timeout = 0, int $attempts = 0, bool $noDelay = false): Promise
{ {
@@ -37,12 +39,17 @@ class SocketStream implements Stream
$context = $context->withTcpNoDelay(); $context = $context->withTcpNoDelay();
} }
$context = $context->withTlsContext(
(new ClientTlsContext(''))
->withoutPeerVerification(),
);
return new self(yield connect($uri, $context)); return new self(yield connect($uri, $context));
}); });
} }
/** /**
* @return Promise<null|string> * @psalm-return Promise<null|string>
*/ */
public function read(): Promise public function read(): Promise
{ {
@@ -50,15 +57,26 @@ class SocketStream implements Stream
} }
/** /**
* @return Promise<void> * @psalm-return Promise<void>
*/ */
public function write(string $data): Promise public function write(string $data): Promise
{ {
return $this->socket->write($data); return $this->socket->write($data);
} }
/**
* {@inheritDoc}
*/
public function close(): void public function close(): void
{ {
$this->socket->close(); $this->socket->close();
} }
/**
* @psalm-return Promise<void>
*/
public function setupTls(): Promise
{
return $this->socket->setupTls();
}
} }

View File

@@ -7,6 +7,11 @@ use PHPUnit\Framework\TestCase;
final class ClientConfigTest extends TestCase final class ClientConfigTest extends TestCase
{ {
public function testNegotiationPayload(): void
{
self::assertJson((new ClientConfig())->asNegotiationPayload());
}
public function testInvalidCompression(): void public function testInvalidCompression(): void
{ {
$this->expectException(InvalidArgumentException::class); $this->expectException(InvalidArgumentException::class);
@@ -14,4 +19,42 @@ final class ClientConfigTest extends TestCase
new ClientConfig(deflate: true, snappy: true); new ClientConfig(deflate: true, snappy: true);
} }
/**
* @dataProvider array
*/
public function testFromArray(array $data, array $expected): void
{
self::assertSame($expected, get_object_vars(ClientConfig::fromArray($data)));
}
public function array(): Generator
{
$default = get_object_vars(new ClientConfig());
yield 'Empty array' => [[], $default];
yield 'With wrong keys' => [['bla' => 'bla'], $default];
$custom = [
'authSecret' => 'SomeSecret',
'connectTimeout' => 100,
'maxAttempts' => 10,
'tcpNoDelay' => true,
'rdyCount' => 1,
'featureNegotiation' => true,
'clientId' => 'SomeGorgeousClientId',
'deflate' => true,
'deflateLevel' => 1,
'heartbeatInterval' => 31111,
'hostname' => gethostname(),
'msgTimeout' => 59999,
'sampleRate' => 25,
'tls' => true,
'snappy' => false,
'userAgent' => 'nsqphp/test',
];
yield 'Full filled' => [$custom, $custom];
}
} }

View File

@@ -55,6 +55,7 @@ final class MessageTest extends TestCase
*/ */
public function messages(): Generator public function messages(): Generator
{ {
/** @phpstan-ignore-next-line */
$consumer = $this->createMock(Consumer::class); $consumer = $this->createMock(Consumer::class);
$consumer->method('fin')->willReturn(new Success()); $consumer->method('fin')->willReturn(new Success());
$consumer->method('touch')->willReturn(new Success()); $consumer->method('touch')->willReturn(new Success());

37
tests/NsqTest.php Normal file
View File

@@ -0,0 +1,37 @@
<?php
declare(strict_types=1);
use Nsq\Config\ClientConfig;
use PHPUnit\Framework\TestCase;
final class NsqTest extends TestCase
{
/**
* @dataProvider configs
*/
public function test(ClientConfig $clientConfig): void
{
self::markTestSkipped('');
}
/**
* @return Generator<string, array<int, ClientConfig>>
*/
public function configs(): Generator
{
yield 'default' => [
new ClientConfig(
heartbeatInterval: 3000,
snappy: false,
),
];
yield 'snappy' => [
new ClientConfig(
heartbeatInterval: 3000,
snappy: true,
),
];
}
}

View File

@@ -3,44 +3,12 @@
declare(strict_types=1); declare(strict_types=1);
use Amp\Loop; use Amp\Loop;
use Amp\Process\Process;
use Nsq\Exception\ServerException; use Nsq\Exception\ServerException;
use Nsq\Producer; use Nsq\Producer;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use function Amp\ByteStream\buffer;
use function Amp\Promise\wait;
final class ProducerTest extends TestCase final class ProducerTest extends TestCase
{ {
/**
* @param array<int, string>|string $body
*
* @dataProvider data
*/
public function testPublish(array | string $body, string $expected): void
{
$process = new Process(
sprintf('bin/nsq_tail -topic %s -channel default -nsqd-tcp-address localhost:4150 -n 1', __FUNCTION__),
);
wait($process->start());
$producer = Producer::create('tcp://localhost:4150');
wait($producer->connect());
wait($producer->publish(__FUNCTION__, $body));
wait($process->join());
self::assertSame($expected, wait(buffer($process->getStdout())));
}
/**
* @return Generator<int, array{0: string|array, 1: string}>
*/
public function data(): Generator
{
yield ['Test Message One!', 'Test Message One!'.PHP_EOL];
}
/** /**
* @dataProvider pubFails * @dataProvider pubFails
*/ */
@@ -58,9 +26,6 @@ final class ProducerTest extends TestCase
}); });
} }
/**
* @return Generator<string, array>
*/
public function pubFails(): Generator public function pubFails(): Generator
{ {
yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0']; yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0'];