Compare commits
3 Commits
350f08c2c1
...
tests
Author | SHA1 | Date | |
---|---|---|---|
f5ab37c579 | |||
809f967fb1 | |||
1b5e1ffb95 |
56
.github/workflows/ci.yaml
vendored
56
.github/workflows/ci.yaml
vendored
@ -14,7 +14,7 @@ jobs:
|
|||||||
os:
|
os:
|
||||||
- ubuntu-latest
|
- ubuntu-latest
|
||||||
php:
|
php:
|
||||||
- '8.1'
|
- '8.0'
|
||||||
dependencies:
|
dependencies:
|
||||||
- lowest
|
- lowest
|
||||||
- highest
|
- highest
|
||||||
@ -59,6 +59,9 @@ jobs:
|
|||||||
run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest
|
run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest
|
||||||
if: ${{ matrix.dependencies == 'lowest' }}
|
if: ${{ matrix.dependencies == 'lowest' }}
|
||||||
|
|
||||||
|
- name: Install nsq bin
|
||||||
|
run: curl -L https://github.com/nsqio/nsq/releases/download/v1.2.0/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz | tar xz --strip 1
|
||||||
|
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml
|
run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml
|
||||||
|
|
||||||
@ -77,7 +80,7 @@ jobs:
|
|||||||
- name: Setup PHP
|
- name: Setup PHP
|
||||||
uses: shivammathur/setup-php@v2
|
uses: shivammathur/setup-php@v2
|
||||||
with:
|
with:
|
||||||
php-version: '8.1'
|
php-version: '8.0'
|
||||||
env:
|
env:
|
||||||
update: true
|
update: true
|
||||||
|
|
||||||
@ -108,7 +111,7 @@ jobs:
|
|||||||
- name: Setup PHP
|
- name: Setup PHP
|
||||||
uses: shivammathur/setup-php@v2
|
uses: shivammathur/setup-php@v2
|
||||||
with:
|
with:
|
||||||
php-version: '8.1'
|
php-version: '8.0'
|
||||||
extensions: snappy-kjdev/php-ext-snappy@0.2.1
|
extensions: snappy-kjdev/php-ext-snappy@0.2.1
|
||||||
env:
|
env:
|
||||||
update: true
|
update: true
|
||||||
@ -140,7 +143,7 @@ jobs:
|
|||||||
- name: Setup PHP
|
- name: Setup PHP
|
||||||
uses: shivammathur/setup-php@v2
|
uses: shivammathur/setup-php@v2
|
||||||
with:
|
with:
|
||||||
php-version: '8.1'
|
php-version: '8.0'
|
||||||
extensions: snappy-kjdev/php-ext-snappy@0.2.1
|
extensions: snappy-kjdev/php-ext-snappy@0.2.1
|
||||||
env:
|
env:
|
||||||
update: true
|
update: true
|
||||||
@ -161,3 +164,48 @@ jobs:
|
|||||||
|
|
||||||
- name: Run script
|
- name: Run script
|
||||||
run: vendor/bin/psalm --output-format=github
|
run: vendor/bin/psalm --output-format=github
|
||||||
|
|
||||||
|
infection:
|
||||||
|
name: Infection
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
services:
|
||||||
|
nsqd:
|
||||||
|
image: nsqio/nsq:v1.2.0
|
||||||
|
options: --entrypoint /nsqd
|
||||||
|
ports:
|
||||||
|
- 4150:4150
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
|
||||||
|
- name: Setup PHP
|
||||||
|
uses: shivammathur/setup-php@v2
|
||||||
|
with:
|
||||||
|
php-version: '8.0'
|
||||||
|
coverage: pcov
|
||||||
|
env:
|
||||||
|
update: true
|
||||||
|
|
||||||
|
- name: Get composer cache directory
|
||||||
|
id: composer-cache
|
||||||
|
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||||
|
|
||||||
|
- name: Cache dependencies
|
||||||
|
uses: actions/cache@v2
|
||||||
|
with:
|
||||||
|
path: ${{ steps.composer-cache.outputs.dir }}
|
||||||
|
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||||
|
restore-keys: ${{ runner.os }}-composer-
|
||||||
|
|
||||||
|
- name: Install dependencies
|
||||||
|
run: composer update --no-progress --no-interaction --prefer-dist
|
||||||
|
|
||||||
|
- name: Install nsq bin
|
||||||
|
run: curl -L https://github.com/nsqio/nsq/releases/download/v1.2.0/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz | tar xz --strip 1
|
||||||
|
|
||||||
|
- name: Run script
|
||||||
|
env:
|
||||||
|
STRYKER_DASHBOARD_API_KEY: ${{ secrets.STRYKER_DASHBOARD_API_KEY }}
|
||||||
|
run: |
|
||||||
|
git fetch --depth=1 origin $GITHUB_BASE_REF
|
||||||
|
php vendor/bin/infection -j2 --git-diff-filter=A --git-diff-base=origin/$GITHUB_BASE_REF --logger-github --ignore-msi-with-no-mutations --only-covered
|
||||||
|
13
.gitignore
vendored
13
.gitignore
vendored
@ -1,6 +1,17 @@
|
|||||||
/vendor/
|
/vendor/
|
||||||
/composer.lock
|
/composer.lock
|
||||||
|
|
||||||
/.php-cs-fixer.cache
|
/.php_cs.cache
|
||||||
/.phpunit.result.cache
|
/.phpunit.result.cache
|
||||||
/infection.log
|
/infection.log
|
||||||
|
|
||||||
|
# Nsq
|
||||||
|
bin/nsq_stat
|
||||||
|
bin/nsq_tail
|
||||||
|
bin/nsq_to_file
|
||||||
|
bin/nsq_to_http
|
||||||
|
bin/nsq_to_nsq
|
||||||
|
bin/nsqadmin
|
||||||
|
bin/nsqd
|
||||||
|
bin/nsqlookupd
|
||||||
|
bin/to_nsq
|
||||||
|
@ -1,45 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
$finder = PhpCsFixer\Finder::create()
|
|
||||||
->in(__DIR__)
|
|
||||||
->exclude('vendor');
|
|
||||||
|
|
||||||
return (new PhpCsFixer\Config())
|
|
||||||
->setRiskyAllowed(true)
|
|
||||||
->setRules([
|
|
||||||
'@PhpCsFixer' => true,
|
|
||||||
'@PhpCsFixer:risky' => true,
|
|
||||||
'@PSR12' => true,
|
|
||||||
'@PSR12:risky' => true,
|
|
||||||
'blank_line_before_statement' => [
|
|
||||||
'statements' => [
|
|
||||||
'continue',
|
|
||||||
'do',
|
|
||||||
'exit',
|
|
||||||
'goto',
|
|
||||||
'if',
|
|
||||||
'return',
|
|
||||||
'switch',
|
|
||||||
'throw',
|
|
||||||
'try',
|
|
||||||
],
|
|
||||||
],
|
|
||||||
'declare_strict_types' => true,
|
|
||||||
'global_namespace_import' => [
|
|
||||||
'import_classes' => false,
|
|
||||||
'import_constants' => false,
|
|
||||||
'import_functions' => false,
|
|
||||||
],
|
|
||||||
'php_unit_internal_class' => false,
|
|
||||||
'php_unit_test_case_static_method_calls' => ['call_type' => 'self'],
|
|
||||||
'php_unit_test_class_requires_covers' => false,
|
|
||||||
'phpdoc_to_comment' => false,
|
|
||||||
'yoda_style' => true,
|
|
||||||
'trailing_comma_in_multiline' => [
|
|
||||||
'after_heredoc' => true,
|
|
||||||
'elements' => ['arrays', 'arguments', 'parameters'],
|
|
||||||
],
|
|
||||||
'types_spaces' => ['space' => 'single'],
|
|
||||||
])
|
|
||||||
->setFinder($finder)
|
|
||||||
->setCacheFile(__DIR__.'/.php-cs-fixer.cache');
|
|
28
.php_cs.dist
Normal file
28
.php_cs.dist
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
#!/usr/bin/env php
|
||||||
|
<?php
|
||||||
|
|
||||||
|
return (new PhpCsFixer\Config())
|
||||||
|
->setRiskyAllowed(true)
|
||||||
|
->setRules([
|
||||||
|
'@PhpCsFixer' => true,
|
||||||
|
'@PhpCsFixer:risky' => true,
|
||||||
|
'@PSR12' => true,
|
||||||
|
'@PSR12:risky' => true,
|
||||||
|
'braces' => [
|
||||||
|
'allow_single_line_closure' => true,
|
||||||
|
],
|
||||||
|
'blank_line_before_statement' => [
|
||||||
|
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try'],
|
||||||
|
],
|
||||||
|
'declare_strict_types' => true,
|
||||||
|
'global_namespace_import' => ['import_classes' => false, 'import_constants' => false, 'import_functions' => false],
|
||||||
|
'php_unit_internal_class' => false,
|
||||||
|
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
|
||||||
|
'php_unit_test_class_requires_covers' => false,
|
||||||
|
'phpdoc_to_comment' => false,
|
||||||
|
'yoda_style' => true,
|
||||||
|
])
|
||||||
|
->setFinder(
|
||||||
|
PhpCsFixer\Finder::create()
|
||||||
|
->in(__DIR__)
|
||||||
|
);
|
20
Makefile
20
Makefile
@ -1,20 +0,0 @@
|
|||||||
|
|
||||||
all: install composer-validate php-cs-fixer psalm phpstan phpunit
|
|
||||||
|
|
||||||
install:
|
|
||||||
composer install
|
|
||||||
|
|
||||||
psalm:
|
|
||||||
php vendor/bin/psalm
|
|
||||||
|
|
||||||
phpstan:
|
|
||||||
php vendor/bin/phpstan analyse
|
|
||||||
|
|
||||||
phpunit:
|
|
||||||
php vendor/bin/phpunit
|
|
||||||
|
|
||||||
php-cs-fixer:
|
|
||||||
php vendor/bin/php-cs-fixer fix
|
|
||||||
|
|
||||||
composer-validate:
|
|
||||||
composer validate
|
|
28
README.md
28
README.md
@ -31,10 +31,10 @@ Features
|
|||||||
- [x] PUB
|
- [x] PUB
|
||||||
- [x] SUB
|
- [x] SUB
|
||||||
- [X] Feature Negotiation
|
- [X] Feature Negotiation
|
||||||
- [X] Discovery
|
- [ ] Discovery
|
||||||
- [ ] Backoff
|
- [ ] Backoff
|
||||||
- [X] TLS
|
- [ ] TLS
|
||||||
- [X] Deflate
|
- [ ] Deflate
|
||||||
- [X] Snappy
|
- [X] Snappy
|
||||||
- [X] Sampling
|
- [X] Sampling
|
||||||
- [X] AUTH
|
- [X] AUTH
|
||||||
@ -80,28 +80,6 @@ $consumer = Consumer::create(
|
|||||||
);
|
);
|
||||||
```
|
```
|
||||||
|
|
||||||
### Lookup
|
|
||||||
|
|
||||||
```php
|
|
||||||
use Nsq\Lookup;
|
|
||||||
use Nsq\Message;
|
|
||||||
|
|
||||||
$lookup = new Lookup('http://nsqlookupd0:4161');
|
|
||||||
$lookup = new Lookup(['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161']);
|
|
||||||
|
|
||||||
$callable = static function (Message $message): Generator {
|
|
||||||
yield $message->touch(); // Reset the timeout for an in-flight message
|
|
||||||
yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
|
|
||||||
yield $message->finish(); // Finish a message (indicate successful processing)
|
|
||||||
};
|
|
||||||
|
|
||||||
$lookup->subscribe(topic: 'topic', channel: 'channel', onMessage: $callable);
|
|
||||||
$lookup->subscribe(topic: 'anotherTopic', channel: 'channel', onMessage: $callable);
|
|
||||||
|
|
||||||
$lookup->unsubscribe(topic: 'local', channel: 'channel');
|
|
||||||
$lookup->stop(); // unsubscribe all
|
|
||||||
```
|
|
||||||
|
|
||||||
### Integrations
|
### Integrations
|
||||||
|
|
||||||
- [Symfony](https://github.com/nsqphp/NsqBundle)
|
- [Symfony](https://github.com/nsqphp/NsqBundle)
|
||||||
|
0
bin/.gitkeep
Normal file
0
bin/.gitkeep
Normal file
@ -11,44 +11,34 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"require": {
|
"require": {
|
||||||
"php": "^8.1",
|
"php": "^8.0.1",
|
||||||
"ext-json": "*",
|
"ext-json": "*",
|
||||||
"amphp/http-client": "^4.6",
|
|
||||||
"amphp/socket": "^1.1",
|
"amphp/socket": "^1.1",
|
||||||
"composer/semver": "^3.2",
|
"composer/semver": "^3.2",
|
||||||
"phpinnacle/buffer": "^1.2",
|
"phpinnacle/buffer": "^1.2",
|
||||||
"psr/log": "^3.0"
|
"psr/log": "^1.1"
|
||||||
},
|
},
|
||||||
"require-dev": {
|
"require-dev": {
|
||||||
"amphp/log": "^1.1",
|
"amphp/log": "^1.1",
|
||||||
"dg/bypass-finals": "^1.3",
|
"dg/bypass-finals": "^1.3",
|
||||||
"ergebnis/composer-normalize": "^2.15",
|
"ergebnis/composer-normalize": "9999999-dev",
|
||||||
"friendsofphp/php-cs-fixer": "^3.4",
|
"friendsofphp/php-cs-fixer": "^2.18",
|
||||||
|
"infection/infection": "^0.20.2",
|
||||||
"nyholm/nsa": "^1.2",
|
"nyholm/nsa": "^1.2",
|
||||||
"phpstan/phpstan": "^1.8",
|
"phpstan/phpstan": "^0.12.68",
|
||||||
"phpstan/phpstan-phpunit": "^1.1",
|
"phpstan/phpstan-phpunit": "^0.12.17",
|
||||||
"phpstan/phpstan-strict-rules": "^1.3",
|
"phpstan/phpstan-strict-rules": "^0.12.9",
|
||||||
"phpunit/phpunit": "^9.5",
|
"phpunit/phpunit": "^9.5",
|
||||||
"symfony/var-dumper": "^6.1",
|
|
||||||
"vimeo/psalm": "^4.4"
|
"vimeo/psalm": "^4.4"
|
||||||
},
|
},
|
||||||
"config": {
|
"config": {
|
||||||
"sort-packages": true,
|
"sort-packages": true
|
||||||
"allow-plugins": {
|
|
||||||
"ergebnis/composer-normalize": true,
|
|
||||||
"infection/extension-installer": true
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
"autoload": {
|
"autoload": {
|
||||||
"psr-4": {
|
"psr-4": {
|
||||||
"Nsq\\": "src/"
|
"Nsq\\": "src/"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"autoload-dev": {
|
|
||||||
"files": [
|
|
||||||
"vendor/symfony/var-dumper/Resources/functions/dump.php"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"minimum-stability": "dev",
|
"minimum-stability": "dev",
|
||||||
"prefer-stable": true,
|
"prefer-stable": true,
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
@ -2,56 +2,23 @@ version: '3.7'
|
|||||||
|
|
||||||
services:
|
services:
|
||||||
nsqd:
|
nsqd:
|
||||||
image: nsqio/nsq:v${NSQ_VERSION}
|
image: nsqio/nsq:v1.2.0
|
||||||
labels:
|
labels:
|
||||||
ru.grachevko.dhu: 'nsqd'
|
ru.grachevko.dhu: 'nsqd'
|
||||||
command: >-
|
command: /nsqd -log-level debug
|
||||||
nsqd
|
# command: /nsqd
|
||||||
--log-level debug
|
ports:
|
||||||
--lookupd-tcp-address nsqlookupd0:4160
|
- 4150:4150
|
||||||
--lookupd-tcp-address nsqlookupd1:4160
|
- 4151:4151
|
||||||
--lookupd-tcp-address nsqlookupd2:4160
|
|
||||||
|
|
||||||
nsqlookupd0:
|
|
||||||
image: nsqio/nsq:v${NSQ_VERSION}
|
|
||||||
labels:
|
|
||||||
ru.grachevko.dhu: 'nsqlookupd0'
|
|
||||||
command: /nsqlookupd -log-level debug
|
|
||||||
|
|
||||||
nsqlookupd1:
|
|
||||||
image: nsqio/nsq:v${NSQ_VERSION}
|
|
||||||
labels:
|
|
||||||
ru.grachevko.dhu: 'nsqlookupd1'
|
|
||||||
command: /nsqlookupd -log-level debug
|
|
||||||
|
|
||||||
nsqlookupd2:
|
|
||||||
image: nsqio/nsq:v${NSQ_VERSION}
|
|
||||||
labels:
|
|
||||||
ru.grachevko.dhu: 'nsqlookupd2'
|
|
||||||
command: /nsqlookupd -log-level debug
|
|
||||||
|
|
||||||
nsqadmin:
|
nsqadmin:
|
||||||
image: nsqio/nsq:v${NSQ_VERSION}
|
image: nsqio/nsq:v1.2.0
|
||||||
labels:
|
labels:
|
||||||
ru.grachevko.dhu: 'nsqadmin'
|
ru.grachevko.dhu: 'nsqadmin'
|
||||||
command:
|
command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171
|
||||||
- nsqadmin
|
ports:
|
||||||
- --http-address=0.0.0.0:4171
|
- 4171:4171
|
||||||
- --lookupd-http-address=nsqlookupd0:4161
|
|
||||||
- --lookupd-http-address=nsqlookupd1:4161
|
|
||||||
- --lookupd-http-address=nsqlookupd2:4161
|
|
||||||
depends_on:
|
|
||||||
- nsqlookupd0
|
|
||||||
- nsqlookupd1
|
|
||||||
- nsqlookupd2
|
|
||||||
|
|
||||||
tail:
|
tail:
|
||||||
image: nsqio/nsq:v${NSQ_VERSION}
|
image: nsqio/nsq:v1.2.0
|
||||||
command: >-
|
command: nsq_tail -channel nsq_tail -topic local -nsqd-tcp-address nsqd:4150
|
||||||
nsq_tail
|
|
||||||
--channel nsq_tail
|
|
||||||
--topic local
|
|
||||||
--lookupd-http-address nsqlookupd1:4161
|
|
||||||
depends_on:
|
|
||||||
- nsqd
|
|
||||||
- nsqlookupd1
|
|
||||||
|
@ -14,7 +14,6 @@ use Monolog\Processor\PsrLogMessageProcessor;
|
|||||||
use Nsq\Config\ClientConfig;
|
use Nsq\Config\ClientConfig;
|
||||||
use Nsq\Consumer;
|
use Nsq\Consumer;
|
||||||
use Nsq\Message;
|
use Nsq\Message;
|
||||||
|
|
||||||
use function Amp\call;
|
use function Amp\call;
|
||||||
|
|
||||||
Loop::run(static function () {
|
Loop::run(static function () {
|
@ -1,47 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
require dirname(__DIR__).'/../vendor/autoload.php';
|
|
||||||
|
|
||||||
use Amp\ByteStream;
|
|
||||||
use Amp\Log\ConsoleFormatter;
|
|
||||||
use Amp\Log\StreamHandler;
|
|
||||||
use Amp\Loop;
|
|
||||||
use Monolog\Logger;
|
|
||||||
use Monolog\Processor\PsrLogMessageProcessor;
|
|
||||||
use Nsq\Config\ClientConfig;
|
|
||||||
use Nsq\Config\LookupConfig;
|
|
||||||
use Nsq\Lookup;
|
|
||||||
use Nsq\Message;
|
|
||||||
|
|
||||||
Loop::run(static function () {
|
|
||||||
$handler = new StreamHandler(ByteStream\getStdout());
|
|
||||||
$handler->setFormatter(new ConsoleFormatter());
|
|
||||||
$logger = new Logger('consumer', [$handler], [new PsrLogMessageProcessor()]);
|
|
||||||
|
|
||||||
$callable = static function (Message $message) {
|
|
||||||
yield $message->finish();
|
|
||||||
};
|
|
||||||
|
|
||||||
$clientConfig = new ClientConfig();
|
|
||||||
|
|
||||||
$lookupConfig = new LookupConfig();
|
|
||||||
|
|
||||||
$watcherId = Loop::repeat(5000, function () {
|
|
||||||
yield Amp\Dns\resolver()->reloadConfig();
|
|
||||||
});
|
|
||||||
|
|
||||||
$lookup = Lookup::create(
|
|
||||||
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
|
|
||||||
$lookupConfig,
|
|
||||||
$logger,
|
|
||||||
);
|
|
||||||
|
|
||||||
$lookup->subscribe('local', 'local', $callable, $clientConfig);
|
|
||||||
|
|
||||||
Loop::delay(10000, function () use ($lookup, $watcherId) {
|
|
||||||
$lookup->stop();
|
|
||||||
Loop::cancel($watcherId);
|
|
||||||
});
|
|
||||||
});
|
|
@ -1,102 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
require dirname(__DIR__).'/../vendor/autoload.php';
|
|
||||||
|
|
||||||
use Amp\ByteStream;
|
|
||||||
use Amp\Log\ConsoleFormatter;
|
|
||||||
use Amp\Log\StreamHandler;
|
|
||||||
use Amp\Loop;
|
|
||||||
use Monolog\Logger;
|
|
||||||
use Monolog\Processor\PsrLogMessageProcessor;
|
|
||||||
use Nsq\Config\ClientConfig;
|
|
||||||
use Nsq\Config\LookupConfig;
|
|
||||||
use Nsq\Lookup;
|
|
||||||
use Nsq\Producer;
|
|
||||||
|
|
||||||
use function Amp\asyncCall;
|
|
||||||
use function Amp\delay;
|
|
||||||
|
|
||||||
Loop::run(static function () {
|
|
||||||
$handler = new StreamHandler(ByteStream\getStdout());
|
|
||||||
$handler->setFormatter(new ConsoleFormatter());
|
|
||||||
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
|
|
||||||
|
|
||||||
$clientConfig = new ClientConfig();
|
|
||||||
|
|
||||||
/** @var Producer[] $producers */
|
|
||||||
$producers = [];
|
|
||||||
|
|
||||||
$lookupConfig = new LookupConfig();
|
|
||||||
|
|
||||||
$lookup = Lookup::create(
|
|
||||||
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
|
|
||||||
$lookupConfig,
|
|
||||||
$logger,
|
|
||||||
);
|
|
||||||
|
|
||||||
$isRunning = true;
|
|
||||||
|
|
||||||
asyncCall(static function () use ($lookup, $clientConfig, $logger, &$producers, &$isRunning) {
|
|
||||||
while ($isRunning) {
|
|
||||||
/** @var Lookup\Producer[] $nodes */
|
|
||||||
$nodes = yield $lookup->nodes();
|
|
||||||
|
|
||||||
foreach ($nodes as $node) {
|
|
||||||
$address = $node->toTcpUri();
|
|
||||||
|
|
||||||
if (array_key_exists($address, $producers)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
asyncCall(function () use ($address, $clientConfig, $logger, &$producers) {
|
|
||||||
yield ($producers[$address] = Producer::create($address, $clientConfig, $logger))
|
|
||||||
->onClose(function () use (&$producers, $address) {
|
|
||||||
unset($producers[$address]);
|
|
||||||
})
|
|
||||||
->connect()
|
|
||||||
;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
yield delay(5000);
|
|
||||||
|
|
||||||
yield Amp\Dns\resolver()->reloadConfig(); // for reload /etc/hosts
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Loop::delay(5000, function () use (&$isRunning, $logger) {
|
|
||||||
$logger->info('Stopping producer.');
|
|
||||||
|
|
||||||
$isRunning = false;
|
|
||||||
});
|
|
||||||
|
|
||||||
$counter = 0;
|
|
||||||
while (true) {
|
|
||||||
if (!$isRunning) {
|
|
||||||
foreach ($producers as $producer) {
|
|
||||||
$producer->close();
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ([] === $producers) {
|
|
||||||
yield delay(200);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
$index = array_rand($producers);
|
|
||||||
$producer = $producers[$index];
|
|
||||||
|
|
||||||
if (!$producer->isConnected()) {
|
|
||||||
yield delay(100);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
yield $producer->publish('local', 'This is message of count '.$counter++);
|
|
||||||
}
|
|
||||||
});
|
|
@ -8,9 +8,3 @@ parameters:
|
|||||||
paths:
|
paths:
|
||||||
- src
|
- src
|
||||||
- tests
|
- tests
|
||||||
ignoreErrors:
|
|
||||||
-
|
|
||||||
message: '#no value type specified in iterable type array#'
|
|
||||||
paths:
|
|
||||||
- %currentWorkingDirectory%/src
|
|
||||||
- %currentWorkingDirectory%/tests
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
<?xml version="1.0"?>
|
<?xml version="1.0"?>
|
||||||
<psalm
|
<psalm
|
||||||
|
allowPhpStormGenerics="true"
|
||||||
ignoreInternalFunctionFalseReturn="false"
|
ignoreInternalFunctionFalseReturn="false"
|
||||||
ignoreInternalFunctionNullReturn="false"
|
ignoreInternalFunctionNullReturn="false"
|
||||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
@ -13,11 +13,8 @@ final class Buffer extends ByteBuffer
|
|||||||
{
|
{
|
||||||
public function readUInt32LE(): int
|
public function readUInt32LE(): int
|
||||||
{
|
{
|
||||||
$unpacked = unpack('V', $this->consume(4));
|
/** @phpstan-ignore-next-line */
|
||||||
|
return unpack('V', $this->consume(4))[1];
|
||||||
\assert(\is_array($unpacked) && \array_key_exists(1, $unpacked));
|
|
||||||
|
|
||||||
return $unpacked[1];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function consumeTimestamp(): int
|
public function consumeTimestamp(): int
|
||||||
|
@ -13,7 +13,7 @@ use Composer\InstalledVersions;
|
|||||||
*
|
*
|
||||||
* @psalm-immutable
|
* @psalm-immutable
|
||||||
*/
|
*/
|
||||||
final class ClientConfig
|
final class ClientConfig implements \JsonSerializable
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* @psalm-suppress ImpureFunctionCall
|
* @psalm-suppress ImpureFunctionCall
|
||||||
@ -26,26 +26,9 @@ final class ClientConfig
|
|||||||
public ?string $authSecret = null,
|
public ?string $authSecret = null,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The timeout for establishing a connection in milliseconds.
|
* The timeout for establishing a connection in seconds.
|
||||||
*/
|
*/
|
||||||
public int $connectTimeout = 10000,
|
public int $connectTimeout = 10,
|
||||||
|
|
||||||
/**
|
|
||||||
* The max attempts for establishing a connection.
|
|
||||||
*/
|
|
||||||
public int $maxAttempts = 0,
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Use tcp_nodelay for establishing a connection.
|
|
||||||
*/
|
|
||||||
public bool $tcpNoDelay = false,
|
|
||||||
public int $rdyCount = 100,
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
|
||||||
* it will send back a JSON payload of supported features and metadata.
|
|
||||||
*/
|
|
||||||
public bool $featureNegotiation = true,
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An identifier used to disambiguate this client (i.e. something specific to the consumer).
|
* An identifier used to disambiguate this client (i.e. something specific to the consumer).
|
||||||
@ -90,6 +73,12 @@ final class ClientConfig
|
|||||||
*/
|
*/
|
||||||
public int $sampleRate = 0,
|
public int $sampleRate = 0,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
||||||
|
* it will send back a JSON payload of supported features and metadata.
|
||||||
|
*/
|
||||||
|
public bool $featureNegotiation = true,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enable TLS for this connection.
|
* Enable TLS for this connection.
|
||||||
*/
|
*/
|
||||||
@ -100,6 +89,11 @@ final class ClientConfig
|
|||||||
*/
|
*/
|
||||||
public bool $snappy = false,
|
public bool $snappy = false,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The read timeout for connection sockets and for awaiting responses from nsq.
|
||||||
|
*/
|
||||||
|
public int $readTimeout = 5,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A string identifying the agent for this client in the spirit of HTTP.
|
* A string identifying the agent for this client in the spirit of HTTP.
|
||||||
*/
|
*/
|
||||||
@ -120,14 +114,12 @@ final class ClientConfig
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static function fromArray(array $array): self
|
/**
|
||||||
|
* @phpstan-ignore-next-line
|
||||||
|
*/
|
||||||
|
public function jsonSerialize(): array
|
||||||
{
|
{
|
||||||
return new self(...array_intersect_key($array, get_class_vars(self::class)));
|
return [
|
||||||
}
|
|
||||||
|
|
||||||
public function asNegotiationPayload(): string
|
|
||||||
{
|
|
||||||
$data = [
|
|
||||||
'client_id' => $this->clientId,
|
'client_id' => $this->clientId,
|
||||||
'deflate' => $this->deflate,
|
'deflate' => $this->deflate,
|
||||||
'deflate_level' => $this->deflateLevel,
|
'deflate_level' => $this->deflateLevel,
|
||||||
@ -140,7 +132,10 @@ final class ClientConfig
|
|||||||
'tls_v1' => $this->tls,
|
'tls_v1' => $this->tls,
|
||||||
'user_agent' => $this->userAgent,
|
'user_agent' => $this->userAgent,
|
||||||
];
|
];
|
||||||
|
}
|
||||||
|
|
||||||
return json_encode($data, JSON_THROW_ON_ERROR);
|
public function toString(): string
|
||||||
|
{
|
||||||
|
return json_encode($this, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,10 +8,8 @@ namespace Nsq\Config;
|
|||||||
* The configuration object that holds the config status for a single Connection.
|
* The configuration object that holds the config status for a single Connection.
|
||||||
*
|
*
|
||||||
* @psalm-immutable
|
* @psalm-immutable
|
||||||
*
|
|
||||||
* @internal
|
|
||||||
*/
|
*/
|
||||||
final class ServerConfig
|
final class ConnectionConfig
|
||||||
{
|
{
|
||||||
public function __construct(
|
public function __construct(
|
||||||
/**
|
/**
|
||||||
@ -84,6 +82,9 @@ final class ServerConfig
|
|||||||
) {
|
) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @phpstan-ignore-next-line
|
||||||
|
*/
|
||||||
public static function fromArray(array $array): self
|
public static function fromArray(array $array): self
|
||||||
{
|
{
|
||||||
return new self(
|
return new self(
|
@ -1,13 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Config;
|
|
||||||
|
|
||||||
final class LookupConfig
|
|
||||||
{
|
|
||||||
public function __construct(
|
|
||||||
public int $pollingInterval = 10000,
|
|
||||||
) {
|
|
||||||
}
|
|
||||||
}
|
|
@ -4,11 +4,9 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq;
|
namespace Nsq;
|
||||||
|
|
||||||
use Amp\ByteStream\ClosedException;
|
|
||||||
use Amp\Failure;
|
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Nsq\Config\ClientConfig;
|
use Nsq\Config\ClientConfig;
|
||||||
use Nsq\Config\ServerConfig;
|
use Nsq\Config\ConnectionConfig;
|
||||||
use Nsq\Exception\AuthenticationRequired;
|
use Nsq\Exception\AuthenticationRequired;
|
||||||
use Nsq\Exception\NsqException;
|
use Nsq\Exception\NsqException;
|
||||||
use Nsq\Frame\Response;
|
use Nsq\Frame\Response;
|
||||||
@ -17,8 +15,6 @@ use Nsq\Stream\NullStream;
|
|||||||
use Nsq\Stream\SnappyStream;
|
use Nsq\Stream\SnappyStream;
|
||||||
use Nsq\Stream\SocketStream;
|
use Nsq\Stream\SocketStream;
|
||||||
use Psr\Log\LoggerInterface;
|
use Psr\Log\LoggerInterface;
|
||||||
|
|
||||||
use function Amp\asyncCall;
|
|
||||||
use function Amp\call;
|
use function Amp\call;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -26,45 +22,23 @@ use function Amp\call;
|
|||||||
*/
|
*/
|
||||||
abstract class Connection
|
abstract class Connection
|
||||||
{
|
{
|
||||||
private Stream $stream;
|
protected Stream $stream;
|
||||||
|
|
||||||
/**
|
|
||||||
* @var callable
|
|
||||||
*/
|
|
||||||
private $onConnectCallback;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @var callable
|
|
||||||
*/
|
|
||||||
private $onCloseCallback;
|
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
/**
|
private string $address,
|
||||||
* @readonly
|
private ClientConfig $clientConfig,
|
||||||
*/
|
private LoggerInterface $logger,
|
||||||
public string $address,
|
|
||||||
protected ClientConfig $clientConfig,
|
|
||||||
protected LoggerInterface $logger,
|
|
||||||
) {
|
) {
|
||||||
$this->stream = new NullStream();
|
$this->stream = new NullStream();
|
||||||
$this->onConnectCallback = static function (): void {
|
|
||||||
};
|
|
||||||
$this->onCloseCallback = static function (): void {
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function __destruct()
|
public function __destruct()
|
||||||
{
|
{
|
||||||
$this->close(false);
|
$this->close();
|
||||||
}
|
|
||||||
|
|
||||||
public function isConnected(): bool
|
|
||||||
{
|
|
||||||
return !$this->stream instanceof NullStream;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @psalm-return Promise<void>
|
* @return Promise<void>
|
||||||
*/
|
*/
|
||||||
public function connect(): Promise
|
public function connect(): Promise
|
||||||
{
|
{
|
||||||
@ -72,32 +46,16 @@ abstract class Connection
|
|||||||
$buffer = new Buffer();
|
$buffer = new Buffer();
|
||||||
|
|
||||||
/** @var SocketStream $stream */
|
/** @var SocketStream $stream */
|
||||||
$stream = yield SocketStream::connect(
|
$stream = yield SocketStream::connect($this->address);
|
||||||
$this->address,
|
|
||||||
$this->clientConfig->connectTimeout,
|
|
||||||
$this->clientConfig->maxAttempts,
|
|
||||||
$this->clientConfig->tcpNoDelay,
|
|
||||||
);
|
|
||||||
|
|
||||||
yield $stream->write(Command::magic());
|
yield $stream->write(Command::magic());
|
||||||
yield $stream->write(Command::identify($this->clientConfig->asNegotiationPayload()));
|
yield $stream->write(Command::identify($this->clientConfig->toString()));
|
||||||
|
|
||||||
/** @var Response $response */
|
/** @var Response $response */
|
||||||
$response = yield $this->response($stream, $buffer);
|
$response = yield $this->response($stream, $buffer);
|
||||||
$serverConfig = ServerConfig::fromArray($response->toArray());
|
$connectionConfig = ConnectionConfig::fromArray($response->toArray());
|
||||||
|
|
||||||
if ($serverConfig->tls) {
|
if ($connectionConfig->snappy) {
|
||||||
yield $stream->setupTls();
|
|
||||||
|
|
||||||
/** @var Response $response */
|
|
||||||
$response = yield $this->response($stream, $buffer);
|
|
||||||
|
|
||||||
if (!$response->isOk()) {
|
|
||||||
throw new NsqException();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($serverConfig->snappy) {
|
|
||||||
$stream = new SnappyStream($stream, $buffer->flush());
|
$stream = new SnappyStream($stream, $buffer->flush());
|
||||||
|
|
||||||
/** @var Response $response */
|
/** @var Response $response */
|
||||||
@ -108,8 +66,8 @@ abstract class Connection
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($serverConfig->deflate) {
|
if ($connectionConfig->deflate) {
|
||||||
$stream = new GzipStream($stream, $serverConfig->deflateLevel, $buffer->flush());
|
$stream = new GzipStream($stream);
|
||||||
|
|
||||||
/** @var Response $response */
|
/** @var Response $response */
|
||||||
$response = yield $this->response($stream, $buffer);
|
$response = yield $this->response($stream, $buffer);
|
||||||
@ -119,7 +77,7 @@ abstract class Connection
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($serverConfig->authRequired) {
|
if ($connectionConfig->authRequired) {
|
||||||
if (null === $this->clientConfig->authSecret) {
|
if (null === $this->clientConfig->authSecret) {
|
||||||
throw new AuthenticationRequired();
|
throw new AuthenticationRequired();
|
||||||
}
|
}
|
||||||
@ -133,103 +91,15 @@ abstract class Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
$this->stream = $stream;
|
$this->stream = $stream;
|
||||||
|
|
||||||
($this->onConnectCallback)();
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public function close(bool $graceful = true): void
|
public function close(): void
|
||||||
{
|
{
|
||||||
if (!$this->isConnected()) {
|
// $this->stream->write(Command::cls());
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$logger = $this->logger;
|
$this->stream->close();
|
||||||
[$stream, $this->stream] = [$this->stream, new NullStream()];
|
$this->stream = new NullStream();
|
||||||
|
|
||||||
if ($graceful) {
|
|
||||||
$this->logger->debug('Graceful disconnect.', [
|
|
||||||
'class' => static::class,
|
|
||||||
'address' => $this->address,
|
|
||||||
]);
|
|
||||||
|
|
||||||
asyncCall(static function () use ($stream, $logger): \Generator {
|
|
||||||
try {
|
|
||||||
yield $stream->write(Command::cls());
|
|
||||||
} catch (\Throwable $e) {
|
|
||||||
$logger->warning($e->getMessage(), ['exception' => $e]);
|
|
||||||
}
|
|
||||||
|
|
||||||
$stream->close();
|
|
||||||
});
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
$stream->close();
|
|
||||||
} catch (ClosedException) {
|
|
||||||
}
|
|
||||||
|
|
||||||
($this->onCloseCallback)();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function onConnect(callable $callback): static
|
|
||||||
{
|
|
||||||
$previous = $this->onConnectCallback;
|
|
||||||
$this->onConnectCallback = static function () use ($previous, $callback): void {
|
|
||||||
$previous();
|
|
||||||
$callback();
|
|
||||||
};
|
|
||||||
|
|
||||||
return $this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function onClose(callable $callback): static
|
|
||||||
{
|
|
||||||
$previous = $this->onCloseCallback;
|
|
||||||
$this->onCloseCallback = static function () use ($previous, $callback): void {
|
|
||||||
$previous();
|
|
||||||
$callback();
|
|
||||||
};
|
|
||||||
|
|
||||||
return $this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @psalm-return Promise<null|string>
|
|
||||||
*/
|
|
||||||
protected function read(): Promise
|
|
||||||
{
|
|
||||||
return call(function (): \Generator {
|
|
||||||
try {
|
|
||||||
return yield $this->stream->read();
|
|
||||||
} catch (\Throwable $e) {
|
|
||||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
|
||||||
|
|
||||||
$this->close(false);
|
|
||||||
|
|
||||||
return new Failure($e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @psalm-return Promise<void>
|
|
||||||
*/
|
|
||||||
protected function write(string $data): Promise
|
|
||||||
{
|
|
||||||
return call(function () use ($data): \Generator {
|
|
||||||
try {
|
|
||||||
return yield $this->stream->write($data);
|
|
||||||
} catch (\Throwable $e) {
|
|
||||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
|
||||||
|
|
||||||
$this->close(false);
|
|
||||||
|
|
||||||
return new Failure($e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function handleError(Frame\Error $error): void
|
protected function handleError(Frame\Error $error): void
|
||||||
@ -244,7 +114,7 @@ abstract class Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @psalm-return Promise<Frame\Response>
|
* @return Promise<Frame\Response>
|
||||||
*/
|
*/
|
||||||
private function response(Stream $stream, Buffer $buffer): Promise
|
private function response(Stream $stream, Buffer $buffer): Promise
|
||||||
{
|
{
|
||||||
|
183
src/Consumer.php
183
src/Consumer.php
@ -6,19 +6,15 @@ namespace Nsq;
|
|||||||
|
|
||||||
use Amp\Failure;
|
use Amp\Failure;
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Amp\Success;
|
|
||||||
use Nsq\Config\ClientConfig;
|
use Nsq\Config\ClientConfig;
|
||||||
use Nsq\Exception\ConsumerException;
|
use Nsq\Exception\ConsumerException;
|
||||||
use Nsq\Frame\Response;
|
use Nsq\Frame\Response;
|
||||||
|
use Nsq\Stream\NullStream;
|
||||||
use Psr\Log\LoggerInterface;
|
use Psr\Log\LoggerInterface;
|
||||||
use Psr\Log\NullLogger;
|
use Psr\Log\NullLogger;
|
||||||
|
|
||||||
use function Amp\asyncCall;
|
use function Amp\asyncCall;
|
||||||
use function Amp\call;
|
use function Amp\call;
|
||||||
|
|
||||||
/**
|
|
||||||
* @internal
|
|
||||||
*/
|
|
||||||
final class Consumer extends Connection
|
final class Consumer extends Connection
|
||||||
{
|
{
|
||||||
private int $rdy = 0;
|
private int $rdy = 0;
|
||||||
@ -29,35 +25,20 @@ final class Consumer extends Connection
|
|||||||
private $onMessage;
|
private $onMessage;
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
string $address,
|
private string $address,
|
||||||
/**
|
private string $topic,
|
||||||
* @readonly
|
private string $channel,
|
||||||
*/
|
|
||||||
public string $topic,
|
|
||||||
/**
|
|
||||||
* @readonly
|
|
||||||
*/
|
|
||||||
public string $channel,
|
|
||||||
callable $onMessage,
|
callable $onMessage,
|
||||||
ClientConfig $clientConfig,
|
ClientConfig $clientConfig,
|
||||||
LoggerInterface $logger,
|
private LoggerInterface $logger,
|
||||||
) {
|
) {
|
||||||
parent::__construct(
|
parent::__construct(
|
||||||
$address,
|
$this->address,
|
||||||
$clientConfig,
|
$clientConfig,
|
||||||
$logger,
|
$this->logger,
|
||||||
);
|
);
|
||||||
|
|
||||||
$this->onMessage = $onMessage;
|
$this->onMessage = $onMessage;
|
||||||
|
|
||||||
$context = compact('address', 'topic', 'channel');
|
|
||||||
$this->onConnect(function () use ($context): void {
|
|
||||||
$this->logger->debug('Consumer connected.', $context);
|
|
||||||
});
|
|
||||||
$this->onClose(function () use ($context): void {
|
|
||||||
$this->logger->debug('Consumer disconnected.', $context);
|
|
||||||
});
|
|
||||||
$this->logger->debug('Consumer created.', $context);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static function create(
|
public static function create(
|
||||||
@ -80,7 +61,7 @@ final class Consumer extends Connection
|
|||||||
|
|
||||||
public function connect(): Promise
|
public function connect(): Promise
|
||||||
{
|
{
|
||||||
if ($this->isConnected()) {
|
if (!$this->stream instanceof NullStream) {
|
||||||
return call(static function (): void {
|
return call(static function (): void {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -88,57 +69,58 @@ final class Consumer extends Connection
|
|||||||
return call(function (): \Generator {
|
return call(function (): \Generator {
|
||||||
yield parent::connect();
|
yield parent::connect();
|
||||||
|
|
||||||
$buffer = new Buffer();
|
$this->run();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private function run(): void
|
||||||
|
{
|
||||||
|
$buffer = new Buffer();
|
||||||
|
|
||||||
|
asyncCall(function () use ($buffer): \Generator {
|
||||||
|
yield $this->stream->write(Command::sub($this->topic, $this->channel));
|
||||||
|
|
||||||
|
if (null !== ($chunk = yield $this->stream->read())) {
|
||||||
|
$buffer->append($chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @var Response $response */
|
||||||
|
$response = Parser::parse($buffer);
|
||||||
|
|
||||||
|
if (!$response->isOk()) {
|
||||||
|
return new Failure(new ConsumerException('Fail subscription.'));
|
||||||
|
}
|
||||||
|
|
||||||
|
yield $this->rdy(2500);
|
||||||
|
|
||||||
|
/** @phpstan-ignore-next-line */
|
||||||
asyncCall(function () use ($buffer): \Generator {
|
asyncCall(function () use ($buffer): \Generator {
|
||||||
yield $this->write(Command::sub($this->topic, $this->channel));
|
while (null !== $chunk = yield $this->stream->read()) {
|
||||||
|
|
||||||
if (null !== ($chunk = yield $this->read())) {
|
|
||||||
$buffer->append($chunk);
|
$buffer->append($chunk);
|
||||||
}
|
|
||||||
|
|
||||||
/** @var Response $response */
|
while ($frame = Parser::parse($buffer)) {
|
||||||
$response = Parser::parse($buffer);
|
switch (true) {
|
||||||
|
case $frame instanceof Frame\Response:
|
||||||
if (!$response->isOk()) {
|
if ($frame->isHeartBeat()) {
|
||||||
return new Failure(new ConsumerException('Fail subscription.'));
|
yield $this->stream->write(Command::nop());
|
||||||
}
|
|
||||||
|
|
||||||
yield $this->rdy(1);
|
|
||||||
|
|
||||||
/** @phpstan-ignore-next-line */
|
|
||||||
asyncCall(function () use ($buffer): \Generator {
|
|
||||||
while (null !== $chunk = yield $this->read()) {
|
|
||||||
$buffer->append($chunk);
|
|
||||||
|
|
||||||
while ($frame = Parser::parse($buffer)) {
|
|
||||||
switch (true) {
|
|
||||||
case $frame instanceof Frame\Response:
|
|
||||||
if ($frame->isHeartBeat()) {
|
|
||||||
yield $this->write(Command::nop());
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
throw ConsumerException::response($frame);
|
|
||||||
case $frame instanceof Frame\Error:
|
|
||||||
$this->handleError($frame);
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
case $frame instanceof Frame\Message:
|
}
|
||||||
asyncCall($this->onMessage, Message::compose($frame, $this));
|
|
||||||
|
|
||||||
break;
|
throw ConsumerException::response($frame);
|
||||||
}
|
case $frame instanceof Frame\Error:
|
||||||
|
$this->handleError($frame);
|
||||||
|
|
||||||
if ($this->rdy !== $this->clientConfig->rdyCount) {
|
break;
|
||||||
yield $this->rdy($this->clientConfig->rdyCount);
|
case $frame instanceof Frame\Message:
|
||||||
}
|
asyncCall($this->onMessage, Message::compose($frame, $this));
|
||||||
|
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
$this->close(false);
|
$this->stream = new NullStream();
|
||||||
});
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -146,53 +128,32 @@ final class Consumer extends Connection
|
|||||||
/**
|
/**
|
||||||
* Update RDY state (indicate you are ready to receive N messages).
|
* Update RDY state (indicate you are ready to receive N messages).
|
||||||
*
|
*
|
||||||
* @psalm-return Promise<bool>
|
* @return Promise<void>
|
||||||
*/
|
*/
|
||||||
public function rdy(int $count): Promise
|
public function rdy(int $count): Promise
|
||||||
{
|
{
|
||||||
if (!$this->isConnected()) {
|
|
||||||
return new Success(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($this->rdy === $count) {
|
if ($this->rdy === $count) {
|
||||||
return new Success(true);
|
return call(static function (): void {
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->rdy = $count;
|
$this->rdy = $count;
|
||||||
|
|
||||||
return call(function () use ($count): \Generator {
|
return $this->stream->write(Command::rdy($count));
|
||||||
try {
|
|
||||||
yield $this->write(Command::rdy($count));
|
|
||||||
|
|
||||||
return true;
|
|
||||||
} catch (\Throwable) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finish a message (indicate successful processing).
|
* Finish a message (indicate successful processing).
|
||||||
*
|
*
|
||||||
* @psalm-return Promise<bool>
|
* @return Promise<void>
|
||||||
*
|
*
|
||||||
* @internal
|
* @internal
|
||||||
*/
|
*/
|
||||||
public function fin(string $id): Promise
|
public function fin(string $id): Promise
|
||||||
{
|
{
|
||||||
if (!$this->isConnected()) {
|
--$this->rdy;
|
||||||
return new Success(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
return call(function () use ($id): \Generator {
|
return $this->stream->write(Command::fin($id));
|
||||||
try {
|
|
||||||
yield $this->write(Command::fin($id));
|
|
||||||
|
|
||||||
return true;
|
|
||||||
} catch (\Throwable) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -201,48 +162,26 @@ final class Consumer extends Connection
|
|||||||
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
|
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
|
||||||
* behaves identically to an explicit REQ.
|
* behaves identically to an explicit REQ.
|
||||||
*
|
*
|
||||||
* @psalm-return Promise<bool>
|
* @return Promise<void>
|
||||||
*
|
*
|
||||||
* @internal
|
* @internal
|
||||||
*/
|
*/
|
||||||
public function req(string $id, int $timeout): Promise
|
public function req(string $id, int $timeout): Promise
|
||||||
{
|
{
|
||||||
if (!$this->isConnected()) {
|
--$this->rdy;
|
||||||
return new Success(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
return call(function () use ($id, $timeout): \Generator {
|
return $this->stream->write(Command::req($id, $timeout));
|
||||||
try {
|
|
||||||
yield $this->write(Command::req($id, $timeout));
|
|
||||||
|
|
||||||
return true;
|
|
||||||
} catch (\Throwable) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reset the timeout for an in-flight message.
|
* Reset the timeout for an in-flight message.
|
||||||
*
|
*
|
||||||
* @psalm-return Promise<bool>
|
* @return Promise<void>
|
||||||
*
|
*
|
||||||
* @internal
|
* @internal
|
||||||
*/
|
*/
|
||||||
public function touch(string $id): Promise
|
public function touch(string $id): Promise
|
||||||
{
|
{
|
||||||
if (!$this->isConnected()) {
|
return $this->stream->write(Command::touch($id));
|
||||||
return new Success(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
return call(function () use ($id): \Generator {
|
|
||||||
try {
|
|
||||||
yield $this->write(Command::touch($id));
|
|
||||||
|
|
||||||
return true;
|
|
||||||
} catch (\Throwable) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,18 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Exception;
|
|
||||||
|
|
||||||
use Psr\Log\LogLevel;
|
|
||||||
|
|
||||||
final class LookupException extends NsqException
|
|
||||||
{
|
|
||||||
public function level(): string
|
|
||||||
{
|
|
||||||
return match ($this->getMessage()) {
|
|
||||||
'TOPIC_NOT_FOUND' => LogLevel::DEBUG,
|
|
||||||
default => LogLevel::WARNING,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,9 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Exception;
|
|
||||||
|
|
||||||
final class StreamException extends NsqException
|
|
||||||
{
|
|
||||||
}
|
|
@ -6,9 +6,10 @@ namespace Nsq;
|
|||||||
|
|
||||||
abstract class Frame
|
abstract class Frame
|
||||||
{
|
{
|
||||||
public const TYPE_RESPONSE = 0;
|
public const TYPE_RESPONSE = 0,
|
||||||
public const TYPE_ERROR = 1;
|
TYPE_ERROR = 1,
|
||||||
public const TYPE_MESSAGE = 2;
|
TYPE_MESSAGE = 2
|
||||||
|
;
|
||||||
|
|
||||||
public function __construct(public int $type)
|
public function __construct(public int $type)
|
||||||
{
|
{
|
||||||
|
@ -30,7 +30,7 @@ final class Response extends Frame
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @psalm-return array<mixed, mixed>
|
* @return array<mixed, mixed>
|
||||||
*/
|
*/
|
||||||
public function toArray(): array
|
public function toArray(): array
|
||||||
{
|
{
|
||||||
|
274
src/Lookup.php
274
src/Lookup.php
@ -1,274 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq;
|
|
||||||
|
|
||||||
use Amp\Deferred;
|
|
||||||
use Amp\Dns\DnsException;
|
|
||||||
use Amp\Http\Client\DelegateHttpClient;
|
|
||||||
use Amp\Http\Client\HttpClientBuilder;
|
|
||||||
use Amp\Http\Client\Request;
|
|
||||||
use Amp\Http\Client\Response;
|
|
||||||
use Amp\NullCancellationToken;
|
|
||||||
use Amp\Promise;
|
|
||||||
use Nsq\Config\ClientConfig;
|
|
||||||
use Nsq\Config\LookupConfig;
|
|
||||||
use Nsq\Exception\LookupException;
|
|
||||||
use Psr\Log\LoggerInterface;
|
|
||||||
use Psr\Log\NullLogger;
|
|
||||||
|
|
||||||
use function Amp\asyncCall;
|
|
||||||
use function Amp\call;
|
|
||||||
use function Amp\delay;
|
|
||||||
|
|
||||||
final class Lookup
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* @psalm-var array<string, array<string, Lookup\Producer>>
|
|
||||||
*/
|
|
||||||
private array $producers = [];
|
|
||||||
|
|
||||||
private array $consumers = [];
|
|
||||||
|
|
||||||
private array $running = [];
|
|
||||||
|
|
||||||
private array $topicWatchers = [];
|
|
||||||
|
|
||||||
public function __construct(
|
|
||||||
private array $addresses,
|
|
||||||
private LookupConfig $config,
|
|
||||||
private LoggerInterface $logger,
|
|
||||||
private DelegateHttpClient $httpClient,
|
|
||||||
) {
|
|
||||||
}
|
|
||||||
|
|
||||||
public static function create(
|
|
||||||
string | array $address,
|
|
||||||
LookupConfig $config = null,
|
|
||||||
LoggerInterface $logger = null,
|
|
||||||
DelegateHttpClient $httpClient = null,
|
|
||||||
): self {
|
|
||||||
return new self(
|
|
||||||
(array) $address,
|
|
||||||
$config ?? new LookupConfig(),
|
|
||||||
$logger ?? new NullLogger(),
|
|
||||||
$httpClient ?? HttpClientBuilder::buildDefault(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @psalm-return Promise<Lookup\Producer[]>
|
|
||||||
*/
|
|
||||||
public function nodes(): Promise
|
|
||||||
{
|
|
||||||
return call(function (): \Generator {
|
|
||||||
$requestHandler = function (string $uri): \Generator {
|
|
||||||
/** @var Response $response */
|
|
||||||
$response = yield $this->httpClient->request(new Request($uri.'/nodes'), new NullCancellationToken());
|
|
||||||
|
|
||||||
try {
|
|
||||||
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
|
|
||||||
} catch (LookupException $e) {
|
|
||||||
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
$promises = [];
|
|
||||||
foreach ($this->addresses as $address) {
|
|
||||||
$promises[$address] = call($requestHandler, $address);
|
|
||||||
}
|
|
||||||
|
|
||||||
$nodes = [];
|
|
||||||
/** @var Lookup\Response $response */
|
|
||||||
foreach (yield $promises as $response) {
|
|
||||||
foreach ($response->producers as $producer) {
|
|
||||||
$nodes[$producer->toTcpUri()] = $producer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return array_values($nodes);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public function stop(): void
|
|
||||||
{
|
|
||||||
$this->producers = [];
|
|
||||||
$this->consumers = [];
|
|
||||||
$this->running = [];
|
|
||||||
$this->topicWatchers = [];
|
|
||||||
|
|
||||||
$this->logger->info('Lookup stopped.');
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @psalm-suppress InvalidPropertyAssignmentValue
|
|
||||||
*/
|
|
||||||
public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void
|
|
||||||
{
|
|
||||||
if (null !== ($this->running[$topic][$channel] ?? null)) {
|
|
||||||
throw new \InvalidArgumentException('Subscription already exists.');
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->running[$topic][$channel] = true;
|
|
||||||
|
|
||||||
asyncCall(function () use ($topic, $channel, $onMessage, $config): \Generator {
|
|
||||||
while (true) {
|
|
||||||
if (null === ($this->running[$topic][$channel] ?? null)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @phpstan-ignore-next-line */
|
|
||||||
$producers = $this->producers[$topic] ??= new Deferred();
|
|
||||||
|
|
||||||
if ($producers instanceof Deferred) {
|
|
||||||
/** @var array<string, Lookup\Producer> $producers */
|
|
||||||
$producers = yield $producers->promise();
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach (array_diff_key($this->consumers, $producers) as $address => $producer) {
|
|
||||||
unset($this->consumers[$address]);
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach ($producers as $address => $producer) {
|
|
||||||
if (null !== ($this->consumers[$address][$topic][$channel] ?? null)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->keepConnection(
|
|
||||||
Consumer::create(
|
|
||||||
$address,
|
|
||||||
$topic,
|
|
||||||
$channel,
|
|
||||||
$onMessage,
|
|
||||||
$config,
|
|
||||||
$this->logger,
|
|
||||||
),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
yield delay($this->config->pollingInterval);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
$this->watch($topic);
|
|
||||||
|
|
||||||
$this->logger->info('Subscribed.', compact('topic', 'channel'));
|
|
||||||
}
|
|
||||||
|
|
||||||
public function unsubscribe(string $topic, string $channel): void
|
|
||||||
{
|
|
||||||
if (null === ($this->running[$topic][$channel] ?? null)) {
|
|
||||||
$this->logger->debug('Not subscribed.', compact('topic', 'channel'));
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
unset($this->running[$topic][$channel]);
|
|
||||||
|
|
||||||
if ([] === $this->running[$topic]) {
|
|
||||||
unset($this->running[$topic]);
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->logger->info('Unsubscribed.', compact('topic', 'channel'));
|
|
||||||
}
|
|
||||||
|
|
||||||
private function keepConnection(Consumer $consumer): void
|
|
||||||
{
|
|
||||||
$this->consumers[$consumer->address][$consumer->topic][$consumer->channel] = $consumer;
|
|
||||||
|
|
||||||
asyncCall(function () use ($consumer): \Generator {
|
|
||||||
while (null !== ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
|
|
||||||
try {
|
|
||||||
yield $consumer->connect();
|
|
||||||
} catch (DnsException $e) {
|
|
||||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
|
||||||
|
|
||||||
unset(
|
|
||||||
$this->consumers[$consumer->address],
|
|
||||||
$this->producers[$consumer->topic][$consumer->address],
|
|
||||||
);
|
|
||||||
|
|
||||||
return;
|
|
||||||
} catch (\Throwable $e) {
|
|
||||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
|
||||||
|
|
||||||
yield delay($this->config->pollingInterval);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
/** @phpstan-ignore-next-line */
|
|
||||||
if (null === ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
|
|
||||||
$consumer->close();
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!$consumer->isConnected()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
yield delay(500);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private function watch(string $topic): void
|
|
||||||
{
|
|
||||||
if (\array_key_exists($topic, $this->topicWatchers)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->topicWatchers[$topic] = true;
|
|
||||||
|
|
||||||
asyncCall(function () use ($topic): \Generator {
|
|
||||||
$cancellationToken = new NullCancellationToken();
|
|
||||||
$requestHandler = function (string $uri) use ($topic, $cancellationToken): \Generator {
|
|
||||||
$this->logger->debug('Lookup', compact('topic'));
|
|
||||||
|
|
||||||
/** @var Response $response */
|
|
||||||
$response = yield $this->httpClient->request(new Request($uri.'/lookup?topic='.$topic), $cancellationToken);
|
|
||||||
|
|
||||||
try {
|
|
||||||
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
|
|
||||||
} catch (LookupException $e) {
|
|
||||||
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
while (\array_key_exists($topic, $this->running)) {
|
|
||||||
$promises = [];
|
|
||||||
foreach ($this->addresses as $address) {
|
|
||||||
$promises[$address] = call($requestHandler, $address);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @var Lookup\Response[] $responses */
|
|
||||||
$responses = yield $promises;
|
|
||||||
|
|
||||||
$producers = [];
|
|
||||||
foreach ($responses as $response) {
|
|
||||||
foreach ($response->producers as $producer) {
|
|
||||||
$producers[$producer->toTcpUri()] = $producer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @phpstan-ignore-next-line */
|
|
||||||
if (($deferred = ($this->producers[$topic] ?? null)) instanceof Deferred) {
|
|
||||||
$deferred->resolve($producers);
|
|
||||||
}
|
|
||||||
$this->producers[$topic] = $producers;
|
|
||||||
|
|
||||||
yield delay($this->config->pollingInterval);
|
|
||||||
}
|
|
||||||
|
|
||||||
unset($this->topicWatchers[$topic]);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,39 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Lookup;
|
|
||||||
|
|
||||||
final class Producer
|
|
||||||
{
|
|
||||||
public function __construct(
|
|
||||||
public string $broadcastAddress,
|
|
||||||
public string $hostname,
|
|
||||||
public string $remoteAddress,
|
|
||||||
public int $tcpPort,
|
|
||||||
public int $httpPort,
|
|
||||||
public string $version,
|
|
||||||
public array $tombstones,
|
|
||||||
public array $topics,
|
|
||||||
) {
|
|
||||||
}
|
|
||||||
|
|
||||||
public static function fromArray(array $array): self
|
|
||||||
{
|
|
||||||
return new self(
|
|
||||||
$array['broadcast_address'],
|
|
||||||
$array['hostname'],
|
|
||||||
$array['remote_address'],
|
|
||||||
$array['tcp_port'],
|
|
||||||
$array['http_port'],
|
|
||||||
$array['version'],
|
|
||||||
$array['tombstones'] ?? [],
|
|
||||||
$array['topics'] ?? [],
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function toTcpUri(): string
|
|
||||||
{
|
|
||||||
return sprintf('%s:%s', $this->broadcastAddress, $this->tcpPort);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,34 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
namespace Nsq\Lookup;
|
|
||||||
|
|
||||||
use Nsq\Exception\LookupException;
|
|
||||||
|
|
||||||
final class Response
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* @param string[] $channels
|
|
||||||
* @param Producer[] $producers
|
|
||||||
*/
|
|
||||||
public function __construct(
|
|
||||||
public array $channels,
|
|
||||||
public array $producers,
|
|
||||||
) {
|
|
||||||
}
|
|
||||||
|
|
||||||
public static function fromJson(string $json): self
|
|
||||||
{
|
|
||||||
$array = json_decode($json, true, flags: JSON_THROW_ON_ERROR);
|
|
||||||
|
|
||||||
if (\array_key_exists('message', $array)) {
|
|
||||||
throw new LookupException($array['message']);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new self(
|
|
||||||
$array['channels'] ?? [],
|
|
||||||
array_map([Producer::class, 'fromArray'], $array['producers']),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
@ -6,6 +6,7 @@ namespace Nsq;
|
|||||||
|
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Nsq\Exception\MessageException;
|
use Nsq\Exception\MessageException;
|
||||||
|
use function Amp\call;
|
||||||
|
|
||||||
final class Message
|
final class Message
|
||||||
{
|
{
|
||||||
@ -31,51 +32,51 @@ final class Message
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function isProcessed(): bool
|
|
||||||
{
|
|
||||||
return $this->processed;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @psalm-return Promise<bool>
|
* @return Promise<void>
|
||||||
*/
|
*/
|
||||||
public function finish(): Promise
|
public function finish(): Promise
|
||||||
{
|
{
|
||||||
$this->markAsProcessedOrFail();
|
return call(function (): \Generator {
|
||||||
|
if ($this->processed) {
|
||||||
|
throw MessageException::processed($this);
|
||||||
|
}
|
||||||
|
|
||||||
return $this->consumer->fin($this->id);
|
yield $this->consumer->fin($this->id);
|
||||||
|
|
||||||
|
$this->processed = true;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @psalm-param positive-int|0 $timeout
|
* @return Promise<void>
|
||||||
*
|
|
||||||
* @psalm-return Promise<bool>
|
|
||||||
*/
|
*/
|
||||||
public function requeue(int $timeout): Promise
|
public function requeue(int $timeout): Promise
|
||||||
{
|
{
|
||||||
$this->markAsProcessedOrFail();
|
return call(function () use ($timeout): \Generator {
|
||||||
|
if ($this->processed) {
|
||||||
|
throw MessageException::processed($this);
|
||||||
|
}
|
||||||
|
|
||||||
return $this->consumer->req($this->id, $timeout);
|
yield $this->consumer->req($this->id, $timeout);
|
||||||
|
|
||||||
|
$this->processed = true;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @psalm-return Promise<bool>
|
* @return Promise<void>
|
||||||
*/
|
*/
|
||||||
public function touch(): Promise
|
public function touch(): Promise
|
||||||
{
|
{
|
||||||
if ($this->processed) {
|
return call(function (): \Generator {
|
||||||
throw MessageException::processed($this);
|
if ($this->processed) {
|
||||||
}
|
throw MessageException::processed($this);
|
||||||
|
}
|
||||||
|
|
||||||
return $this->consumer->touch($this->id);
|
yield $this->consumer->touch($this->id);
|
||||||
}
|
|
||||||
|
|
||||||
private function markAsProcessedOrFail(): void
|
$this->processed = true;
|
||||||
{
|
});
|
||||||
if ($this->processed) {
|
|
||||||
throw MessageException::processed($this);
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->processed = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
125
src/Producer.php
125
src/Producer.php
@ -5,38 +5,16 @@ declare(strict_types=1);
|
|||||||
namespace Nsq;
|
namespace Nsq;
|
||||||
|
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Amp\Success;
|
|
||||||
use Nsq\Config\ClientConfig;
|
use Nsq\Config\ClientConfig;
|
||||||
use Nsq\Exception\NsqException;
|
use Nsq\Exception\NsqException;
|
||||||
|
use Nsq\Stream\NullStream;
|
||||||
use Psr\Log\LoggerInterface;
|
use Psr\Log\LoggerInterface;
|
||||||
use Psr\Log\NullLogger;
|
use Psr\Log\NullLogger;
|
||||||
|
|
||||||
use function Amp\asyncCall;
|
use function Amp\asyncCall;
|
||||||
use function Amp\call;
|
use function Amp\call;
|
||||||
|
|
||||||
final class Producer extends Connection
|
final class Producer extends Connection
|
||||||
{
|
{
|
||||||
public function __construct(
|
|
||||||
string $address,
|
|
||||||
ClientConfig $clientConfig,
|
|
||||||
LoggerInterface $logger,
|
|
||||||
) {
|
|
||||||
parent::__construct(
|
|
||||||
$address,
|
|
||||||
$clientConfig,
|
|
||||||
$logger,
|
|
||||||
);
|
|
||||||
|
|
||||||
$context = compact('address');
|
|
||||||
$this->onConnect(function () use ($context): void {
|
|
||||||
$this->logger->debug('Producer connected.', $context);
|
|
||||||
});
|
|
||||||
$this->onClose(function () use ($context): void {
|
|
||||||
$this->logger->debug('Producer disconnected.', $context);
|
|
||||||
});
|
|
||||||
$this->logger->debug('Producer created.', $context);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static function create(
|
public static function create(
|
||||||
string $address,
|
string $address,
|
||||||
ClientConfig $clientConfig = null,
|
ClientConfig $clientConfig = null,
|
||||||
@ -51,7 +29,7 @@ final class Producer extends Connection
|
|||||||
|
|
||||||
public function connect(): Promise
|
public function connect(): Promise
|
||||||
{
|
{
|
||||||
if ($this->isConnected()) {
|
if (!$this->stream instanceof NullStream) {
|
||||||
return call(static function (): void {
|
return call(static function (): void {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -59,72 +37,63 @@ final class Producer extends Connection
|
|||||||
return call(function (): \Generator {
|
return call(function (): \Generator {
|
||||||
yield parent::connect();
|
yield parent::connect();
|
||||||
|
|
||||||
$buffer = new Buffer();
|
$this->run();
|
||||||
|
|
||||||
asyncCall(function () use ($buffer): \Generator {
|
|
||||||
while (null !== $chunk = yield $this->read()) {
|
|
||||||
$buffer->append($chunk);
|
|
||||||
|
|
||||||
while ($frame = Parser::parse($buffer)) {
|
|
||||||
switch (true) {
|
|
||||||
case $frame instanceof Frame\Response:
|
|
||||||
if ($frame->isHeartBeat()) {
|
|
||||||
yield $this->write(Command::nop());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ok received
|
|
||||||
break;
|
|
||||||
case $frame instanceof Frame\Error:
|
|
||||||
$this->handleError($frame);
|
|
||||||
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new NsqException('Unreachable statement.');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->close(false);
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param array<int, string>|string $body
|
* @param array<int, string>|string $body
|
||||||
*
|
*
|
||||||
* @psalm-param positive-int|0 $delay
|
* @return Promise<void>
|
||||||
*
|
|
||||||
* @psalm-return Promise<bool>
|
|
||||||
*/
|
*/
|
||||||
public function publish(string $topic, string | array $body, int $delay = null): Promise
|
public function publish(string $topic, string | array $body, int $delay = 0): Promise
|
||||||
{
|
{
|
||||||
if (!$this->isConnected()) {
|
if (0 < $delay) {
|
||||||
return new Success(false);
|
return call(
|
||||||
|
function (array $bodies) use ($topic, $delay): \Generator {
|
||||||
|
foreach ($bodies as $body) {
|
||||||
|
yield $this->stream->write(Command::dpub($topic, $body, $delay));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
(array) $body,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
return call(
|
$command = \is_array($body)
|
||||||
function (iterable $commands): \Generator {
|
? Command::mpub($topic, $body)
|
||||||
try {
|
: Command::pub($topic, $body);
|
||||||
foreach ($commands as $command) {
|
|
||||||
yield $this->write($command);
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
return $this->stream->write($command);
|
||||||
} catch (\Throwable) {
|
}
|
||||||
return false;
|
|
||||||
}
|
private function run(): void
|
||||||
},
|
{
|
||||||
(static function () use ($topic, $body, $delay): \Generator {
|
$buffer = new Buffer();
|
||||||
if (\is_array($body) && null === $delay) {
|
|
||||||
yield Command::mpub($topic, $body);
|
asyncCall(function () use ($buffer): \Generator {
|
||||||
} elseif (null !== $delay) {
|
while (null !== $chunk = yield $this->stream->read()) {
|
||||||
foreach ((array) $body as $content) {
|
$buffer->append($chunk);
|
||||||
yield Command::dpub($topic, $content, $delay);
|
|
||||||
|
while ($frame = Parser::parse($buffer)) {
|
||||||
|
switch (true) {
|
||||||
|
case $frame instanceof Frame\Response:
|
||||||
|
if ($frame->isHeartBeat()) {
|
||||||
|
yield $this->stream->write(Command::nop());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ok received
|
||||||
|
break;
|
||||||
|
case $frame instanceof Frame\Error:
|
||||||
|
$this->handleError($frame);
|
||||||
|
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new NsqException('Unreachable statement.');
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
yield Command::pub($topic, $body);
|
|
||||||
}
|
}
|
||||||
})(),
|
}
|
||||||
);
|
|
||||||
|
$this->stream = new NullStream();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,23 +4,19 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq;
|
namespace Nsq;
|
||||||
|
|
||||||
use Amp\ByteStream\ClosedException;
|
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
|
|
||||||
interface Stream
|
interface Stream
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* @psalm-return Promise<null|string>
|
* @return Promise<null|string>
|
||||||
*/
|
*/
|
||||||
public function read(): Promise;
|
public function read(): Promise;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @psalm-return Promise<void>
|
* @return Promise<void>
|
||||||
*/
|
*/
|
||||||
public function write(string $data): Promise;
|
public function write(string $data): Promise;
|
||||||
|
|
||||||
/**
|
|
||||||
* @throws ClosedException
|
|
||||||
*/
|
|
||||||
public function close(): void;
|
public function close(): void;
|
||||||
}
|
}
|
||||||
|
@ -5,38 +5,14 @@ declare(strict_types=1);
|
|||||||
namespace Nsq\Stream;
|
namespace Nsq\Stream;
|
||||||
|
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Nsq\Buffer;
|
use Nsq\Exception\NsqException;
|
||||||
use Nsq\Exception\StreamException;
|
|
||||||
use Nsq\Stream;
|
use Nsq\Stream;
|
||||||
|
|
||||||
use function Amp\call;
|
|
||||||
|
|
||||||
class GzipStream implements Stream
|
class GzipStream implements Stream
|
||||||
{
|
{
|
||||||
private ?\InflateContext $inflate = null;
|
public function __construct(private Stream $stream)
|
||||||
|
|
||||||
private ?\DeflateContext $deflate = null;
|
|
||||||
|
|
||||||
private Buffer $buffer;
|
|
||||||
|
|
||||||
public function __construct(private Stream $stream, private int $level, string $bytes = '')
|
|
||||||
{
|
{
|
||||||
/** @var false|\InflateContext $inflate */
|
throw new NsqException('GzipStream not implemented yet.');
|
||||||
$inflate = @inflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
|
|
||||||
/** @var \DeflateContext|false $deflate */
|
|
||||||
$deflate = @deflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
|
|
||||||
|
|
||||||
if (false === $inflate) {
|
|
||||||
throw new StreamException('Failed initializing inflate context');
|
|
||||||
}
|
|
||||||
|
|
||||||
if (false === $deflate) {
|
|
||||||
throw new StreamException('Failed initializing deflate context');
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->inflate = $inflate;
|
|
||||||
$this->deflate = $deflate;
|
|
||||||
$this->buffer = new Buffer($bytes);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -44,33 +20,7 @@ class GzipStream implements Stream
|
|||||||
*/
|
*/
|
||||||
public function read(): Promise
|
public function read(): Promise
|
||||||
{
|
{
|
||||||
return call(function (): \Generator {
|
return $this->stream->read();
|
||||||
if (null === $this->inflate) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($this->buffer->empty()) {
|
|
||||||
$chunk = yield $this->stream->read();
|
|
||||||
|
|
||||||
if (null !== $chunk) {
|
|
||||||
$this->buffer->append($chunk);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
$data = $this->buffer->flush();
|
|
||||||
|
|
||||||
if ('' === $data) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
/** @psalm-suppress UndefinedFunction,InvalidArgument */
|
|
||||||
$decompressed = inflate_add($this->inflate, $data, ZLIB_SYNC_FLUSH);
|
|
||||||
|
|
||||||
if (false === $decompressed) {
|
|
||||||
throw new StreamException('Failed adding data to deflate context');
|
|
||||||
}
|
|
||||||
|
|
||||||
return $decompressed;
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -78,27 +28,11 @@ class GzipStream implements Stream
|
|||||||
*/
|
*/
|
||||||
public function write(string $data): Promise
|
public function write(string $data): Promise
|
||||||
{
|
{
|
||||||
if (null === $this->deflate) {
|
return $this->stream->write($data);
|
||||||
throw new StreamException('The stream has already been closed');
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @psalm-suppress UndefinedFunction,InvalidArgument */
|
|
||||||
$compressed = deflate_add($this->deflate, $data, ZLIB_SYNC_FLUSH);
|
|
||||||
|
|
||||||
if (false === $compressed) {
|
|
||||||
throw new StreamException('Failed adding data to deflate context');
|
|
||||||
}
|
|
||||||
|
|
||||||
return $this->stream->write($compressed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function close(): void
|
public function close(): void
|
||||||
{
|
{
|
||||||
$this->stream->close();
|
$this->stream->close();
|
||||||
$this->inflate = null;
|
|
||||||
$this->deflate = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,11 +4,10 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq\Stream;
|
namespace Nsq\Stream;
|
||||||
|
|
||||||
use Amp\Failure;
|
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Amp\Success;
|
use Amp\Success;
|
||||||
use Nsq\Exception\NsqException;
|
|
||||||
use Nsq\Stream;
|
use Nsq\Stream;
|
||||||
|
use function Amp\call;
|
||||||
|
|
||||||
final class NullStream implements Stream
|
final class NullStream implements Stream
|
||||||
{
|
{
|
||||||
@ -25,7 +24,8 @@ final class NullStream implements Stream
|
|||||||
*/
|
*/
|
||||||
public function write(string $data): Promise
|
public function write(string $data): Promise
|
||||||
{
|
{
|
||||||
return new Failure(new NsqException('Connection closed.'));
|
return call(static function (): void {
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -8,25 +8,24 @@ use Amp\Promise;
|
|||||||
use Nsq\Buffer;
|
use Nsq\Buffer;
|
||||||
use Nsq\Exception\SnappyException;
|
use Nsq\Exception\SnappyException;
|
||||||
use Nsq\Stream;
|
use Nsq\Stream;
|
||||||
|
|
||||||
use function Amp\call;
|
use function Amp\call;
|
||||||
|
|
||||||
class SnappyStream implements Stream
|
class SnappyStream implements Stream
|
||||||
{
|
{
|
||||||
private const IDENTIFIER = [0xFF, 0x06, 0x00, 0x00, 0x73, 0x4E, 0x61, 0x50, 0x70, 0x59];
|
private const IDENTIFIER = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
|
||||||
private const SIZE_HEADER = 4;
|
private const SIZE_HEADER = 4;
|
||||||
private const SIZE_CHECKSUM = 4;
|
private const SIZE_CHECKSUM = 4;
|
||||||
private const SIZE_CHUNK = 65536;
|
private const SIZE_CHUNK = 65536;
|
||||||
private const TYPE_IDENTIFIER = 0xFF;
|
private const TYPE_IDENTIFIER = 0xff;
|
||||||
private const TYPE_COMPRESSED = 0x00;
|
private const TYPE_COMPRESSED = 0x00;
|
||||||
private const TYPE_UNCOMPRESSED = 0x01;
|
private const TYPE_UNCOMPRESSED = 0x01;
|
||||||
private const TYPE_PADDING = 0xFE;
|
private const TYPE_PADDING = 0xfe;
|
||||||
|
|
||||||
private Buffer $buffer;
|
private Buffer $buffer;
|
||||||
|
|
||||||
public function __construct(private Stream $stream, string $bytes = '')
|
public function __construct(private Stream $stream, string $bytes = '')
|
||||||
{
|
{
|
||||||
if (!\function_exists('snappy_uncompress') || !\function_exists('snappy_compress')) {
|
if (!\function_exists('snappy_uncompress')) {
|
||||||
throw SnappyException::notInstalled();
|
throw SnappyException::notInstalled();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -46,7 +45,7 @@ class SnappyStream implements Stream
|
|||||||
$type = $this->buffer->readUInt32LE();
|
$type = $this->buffer->readUInt32LE();
|
||||||
|
|
||||||
$size = $type >> 8;
|
$size = $type >> 8;
|
||||||
$type &= 0xFF;
|
$type &= 0xff;
|
||||||
|
|
||||||
while ($this->buffer->size() < $size && null !== ($chunk = yield $this->stream->read())) {
|
while ($this->buffer->size() < $size && null !== ($chunk = yield $this->stream->read())) {
|
||||||
$this->buffer->append($chunk);
|
$this->buffer->append($chunk);
|
||||||
@ -60,7 +59,6 @@ class SnappyStream implements Stream
|
|||||||
case self::TYPE_COMPRESSED:
|
case self::TYPE_COMPRESSED:
|
||||||
$this->buffer->discard(self::SIZE_CHECKSUM);
|
$this->buffer->discard(self::SIZE_CHECKSUM);
|
||||||
|
|
||||||
/** @psalm-suppress UndefinedFunction */
|
|
||||||
return snappy_uncompress($this->buffer->consume($size - self::SIZE_HEADER));
|
return snappy_uncompress($this->buffer->consume($size - self::SIZE_HEADER));
|
||||||
case self::TYPE_UNCOMPRESSED:
|
case self::TYPE_UNCOMPRESSED:
|
||||||
$this->buffer->discard(self::SIZE_CHECKSUM);
|
$this->buffer->discard(self::SIZE_CHECKSUM);
|
||||||
@ -80,7 +78,6 @@ class SnappyStream implements Stream
|
|||||||
public function write(string $data): Promise
|
public function write(string $data): Promise
|
||||||
{
|
{
|
||||||
return call(function () use ($data): Promise {
|
return call(function () use ($data): Promise {
|
||||||
/** @var string $result */
|
|
||||||
$result = pack('CCCCCCCCCC', ...self::IDENTIFIER);
|
$result = pack('CCCCCCCCCC', ...self::IDENTIFIER);
|
||||||
|
|
||||||
foreach (str_split($data, self::SIZE_CHUNK) as $chunk) {
|
foreach (str_split($data, self::SIZE_CHUNK) as $chunk) {
|
||||||
@ -96,27 +93,23 @@ class SnappyStream implements Stream
|
|||||||
$this->stream->close();
|
$this->stream->close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @psalm-suppress PossiblyFalseArgument
|
||||||
|
*/
|
||||||
private function compress(string $uncompressed): string
|
private function compress(string $uncompressed): string
|
||||||
{
|
{
|
||||||
/** @psalm-suppress UndefinedFunction */
|
|
||||||
$compressed = snappy_compress($uncompressed);
|
$compressed = snappy_compress($uncompressed);
|
||||||
|
|
||||||
\assert(\is_string($compressed));
|
|
||||||
|
|
||||||
[$type, $data] = \strlen($compressed) <= 0.875 * \strlen($uncompressed)
|
[$type, $data] = \strlen($compressed) <= 0.875 * \strlen($uncompressed)
|
||||||
? [self::TYPE_COMPRESSED, $compressed]
|
? [self::TYPE_COMPRESSED, $compressed]
|
||||||
: [self::TYPE_UNCOMPRESSED, $uncompressed];
|
: [self::TYPE_UNCOMPRESSED, $uncompressed];
|
||||||
|
|
||||||
/** @psalm-suppress PossiblyFalseArgument */
|
/** @phpstan-ignore-next-line */
|
||||||
$unpacked = unpack('N', hash('crc32c', $uncompressed, true));
|
$checksum = unpack('N', hash('crc32c', $uncompressed, true))[1];
|
||||||
\assert(\is_array($unpacked));
|
$checksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff;
|
||||||
|
|
||||||
$checksum = $unpacked[1];
|
|
||||||
$checksum = (($checksum >> 15) | ($checksum << 17)) + 0xA282EAD8 & 0xFFFFFFFF;
|
|
||||||
|
|
||||||
$size = (\strlen($data) + 4) << 8;
|
$size = (\strlen($data) + 4) << 8;
|
||||||
|
|
||||||
/** @psalm-suppress PossiblyFalseOperand */
|
|
||||||
return pack('VV', $type + $size, $checksum).$data;
|
return pack('VV', $type + $size, $checksum).$data;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,22 +5,20 @@ declare(strict_types=1);
|
|||||||
namespace Nsq\Stream;
|
namespace Nsq\Stream;
|
||||||
|
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Amp\Socket\ClientTlsContext;
|
|
||||||
use Amp\Socket\ConnectContext;
|
use Amp\Socket\ConnectContext;
|
||||||
use Amp\Socket\EncryptableSocket;
|
use Amp\Socket\Socket;
|
||||||
use Nsq\Stream;
|
use Nsq\Stream;
|
||||||
|
|
||||||
use function Amp\call;
|
use function Amp\call;
|
||||||
use function Amp\Socket\connect;
|
use function Amp\Socket\connect;
|
||||||
|
|
||||||
class SocketStream implements Stream
|
class SocketStream implements Stream
|
||||||
{
|
{
|
||||||
public function __construct(private EncryptableSocket $socket)
|
public function __construct(private Socket $socket)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @psalm-return Promise<self>
|
* @return Promise<self>
|
||||||
*/
|
*/
|
||||||
public static function connect(string $uri, int $timeout = 0, int $attempts = 0, bool $noDelay = false): Promise
|
public static function connect(string $uri, int $timeout = 0, int $attempts = 0, bool $noDelay = false): Promise
|
||||||
{
|
{
|
||||||
@ -39,17 +37,12 @@ class SocketStream implements Stream
|
|||||||
$context = $context->withTcpNoDelay();
|
$context = $context->withTcpNoDelay();
|
||||||
}
|
}
|
||||||
|
|
||||||
$context = $context->withTlsContext(
|
|
||||||
(new ClientTlsContext(''))
|
|
||||||
->withoutPeerVerification(),
|
|
||||||
);
|
|
||||||
|
|
||||||
return new self(yield connect($uri, $context));
|
return new self(yield connect($uri, $context));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @psalm-return Promise<null|string>
|
* @return Promise<null|string>
|
||||||
*/
|
*/
|
||||||
public function read(): Promise
|
public function read(): Promise
|
||||||
{
|
{
|
||||||
@ -57,26 +50,15 @@ class SocketStream implements Stream
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @psalm-return Promise<void>
|
* @return Promise<void>
|
||||||
*/
|
*/
|
||||||
public function write(string $data): Promise
|
public function write(string $data): Promise
|
||||||
{
|
{
|
||||||
return $this->socket->write($data);
|
return $this->socket->write($data);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
public function close(): void
|
public function close(): void
|
||||||
{
|
{
|
||||||
$this->socket->close();
|
$this->socket->close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @psalm-return Promise<void>
|
|
||||||
*/
|
|
||||||
public function setupTls(): Promise
|
|
||||||
{
|
|
||||||
return $this->socket->setupTls();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -7,11 +7,6 @@ use PHPUnit\Framework\TestCase;
|
|||||||
|
|
||||||
final class ClientConfigTest extends TestCase
|
final class ClientConfigTest extends TestCase
|
||||||
{
|
{
|
||||||
public function testNegotiationPayload(): void
|
|
||||||
{
|
|
||||||
self::assertJson((new ClientConfig())->asNegotiationPayload());
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testInvalidCompression(): void
|
public function testInvalidCompression(): void
|
||||||
{
|
{
|
||||||
$this->expectException(InvalidArgumentException::class);
|
$this->expectException(InvalidArgumentException::class);
|
||||||
@ -19,42 +14,4 @@ final class ClientConfigTest extends TestCase
|
|||||||
|
|
||||||
new ClientConfig(deflate: true, snappy: true);
|
new ClientConfig(deflate: true, snappy: true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @dataProvider array
|
|
||||||
*/
|
|
||||||
public function testFromArray(array $data, array $expected): void
|
|
||||||
{
|
|
||||||
self::assertSame($expected, get_object_vars(ClientConfig::fromArray($data)));
|
|
||||||
}
|
|
||||||
|
|
||||||
public function array(): Generator
|
|
||||||
{
|
|
||||||
$default = get_object_vars(new ClientConfig());
|
|
||||||
|
|
||||||
yield 'Empty array' => [[], $default];
|
|
||||||
|
|
||||||
yield 'With wrong keys' => [['bla' => 'bla'], $default];
|
|
||||||
|
|
||||||
$custom = [
|
|
||||||
'authSecret' => 'SomeSecret',
|
|
||||||
'connectTimeout' => 100,
|
|
||||||
'maxAttempts' => 10,
|
|
||||||
'tcpNoDelay' => true,
|
|
||||||
'rdyCount' => 1,
|
|
||||||
'featureNegotiation' => true,
|
|
||||||
'clientId' => 'SomeGorgeousClientId',
|
|
||||||
'deflate' => true,
|
|
||||||
'deflateLevel' => 1,
|
|
||||||
'heartbeatInterval' => 31111,
|
|
||||||
'hostname' => gethostname(),
|
|
||||||
'msgTimeout' => 59999,
|
|
||||||
'sampleRate' => 25,
|
|
||||||
'tls' => true,
|
|
||||||
'snappy' => false,
|
|
||||||
'userAgent' => 'nsqphp/test',
|
|
||||||
];
|
|
||||||
|
|
||||||
yield 'Full filled' => [$custom, $custom];
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -55,7 +55,6 @@ final class MessageTest extends TestCase
|
|||||||
*/
|
*/
|
||||||
public function messages(): Generator
|
public function messages(): Generator
|
||||||
{
|
{
|
||||||
/** @phpstan-ignore-next-line */
|
|
||||||
$consumer = $this->createMock(Consumer::class);
|
$consumer = $this->createMock(Consumer::class);
|
||||||
$consumer->method('fin')->willReturn(new Success());
|
$consumer->method('fin')->willReturn(new Success());
|
||||||
$consumer->method('touch')->willReturn(new Success());
|
$consumer->method('touch')->willReturn(new Success());
|
||||||
|
@ -1,37 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
declare(strict_types=1);
|
|
||||||
|
|
||||||
use Nsq\Config\ClientConfig;
|
|
||||||
use PHPUnit\Framework\TestCase;
|
|
||||||
|
|
||||||
final class NsqTest extends TestCase
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* @dataProvider configs
|
|
||||||
*/
|
|
||||||
public function test(ClientConfig $clientConfig): void
|
|
||||||
{
|
|
||||||
self::markTestSkipped('');
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return Generator<string, array<int, ClientConfig>>
|
|
||||||
*/
|
|
||||||
public function configs(): Generator
|
|
||||||
{
|
|
||||||
yield 'default' => [
|
|
||||||
new ClientConfig(
|
|
||||||
heartbeatInterval: 3000,
|
|
||||||
snappy: false,
|
|
||||||
),
|
|
||||||
];
|
|
||||||
|
|
||||||
yield 'snappy' => [
|
|
||||||
new ClientConfig(
|
|
||||||
heartbeatInterval: 3000,
|
|
||||||
snappy: true,
|
|
||||||
),
|
|
||||||
];
|
|
||||||
}
|
|
||||||
}
|
|
@ -3,12 +3,44 @@
|
|||||||
declare(strict_types=1);
|
declare(strict_types=1);
|
||||||
|
|
||||||
use Amp\Loop;
|
use Amp\Loop;
|
||||||
|
use Amp\Process\Process;
|
||||||
use Nsq\Exception\ServerException;
|
use Nsq\Exception\ServerException;
|
||||||
use Nsq\Producer;
|
use Nsq\Producer;
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use function Amp\ByteStream\buffer;
|
||||||
|
use function Amp\Promise\wait;
|
||||||
|
|
||||||
final class ProducerTest extends TestCase
|
final class ProducerTest extends TestCase
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* @param array<int, string>|string $body
|
||||||
|
*
|
||||||
|
* @dataProvider data
|
||||||
|
*/
|
||||||
|
public function testPublish(array | string $body, string $expected): void
|
||||||
|
{
|
||||||
|
$process = new Process(
|
||||||
|
sprintf('bin/nsq_tail -topic %s -channel default -nsqd-tcp-address localhost:4150 -n 1', __FUNCTION__),
|
||||||
|
);
|
||||||
|
wait($process->start());
|
||||||
|
|
||||||
|
$producer = Producer::create('tcp://localhost:4150');
|
||||||
|
wait($producer->connect());
|
||||||
|
wait($producer->publish(__FUNCTION__, $body));
|
||||||
|
|
||||||
|
wait($process->join());
|
||||||
|
|
||||||
|
self::assertSame($expected, wait(buffer($process->getStdout())));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Generator<int, array{0: string|array, 1: string}>
|
||||||
|
*/
|
||||||
|
public function data(): Generator
|
||||||
|
{
|
||||||
|
yield ['Test Message One!', 'Test Message One!'.PHP_EOL];
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @dataProvider pubFails
|
* @dataProvider pubFails
|
||||||
*/
|
*/
|
||||||
@ -26,6 +58,9 @@ final class ProducerTest extends TestCase
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Generator<string, array>
|
||||||
|
*/
|
||||||
public function pubFails(): Generator
|
public function pubFails(): Generator
|
||||||
{
|
{
|
||||||
yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0'];
|
yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0'];
|
||||||
|
Reference in New Issue
Block a user