Compare commits
1 Commits
renovate/c
...
snappy
Author | SHA1 | Date | |
---|---|---|---|
87fe939018 |
21
.gitattributes
vendored
21
.gitattributes
vendored
@@ -1,21 +0,0 @@
|
||||
# Exclude build/test files from archive
|
||||
/.editorconfig export-ignore
|
||||
/.gitattributes export-ignore
|
||||
/.github export-ignore
|
||||
/.gitignore export-ignore
|
||||
/.php_cs export-ignore
|
||||
/.php_cs.dist export-ignore
|
||||
/.psalm export-ignore
|
||||
/docs export-ignore
|
||||
/examples export-ignore
|
||||
/infection.json export-ignore
|
||||
/infection.json.dist export-ignore
|
||||
/phpstan.neon export-ignore
|
||||
/phpunit.xml export-ignore
|
||||
/phpunit.xml.dist export-ignore
|
||||
/psalm.xml export-ignore
|
||||
/tests export-ignore
|
||||
|
||||
# Configure diff output for .php and .phar files.
|
||||
*.php diff=php
|
||||
*.phar -diff
|
202
.github/workflows/ci.yaml
vendored
Normal file
202
.github/workflows/ci.yaml
vendored
Normal file
@@ -0,0 +1,202 @@
|
||||
name: CI
|
||||
|
||||
on:
|
||||
- pull_request
|
||||
- push
|
||||
|
||||
jobs:
|
||||
tests:
|
||||
name: Tests
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os:
|
||||
- ubuntu-latest
|
||||
php:
|
||||
- '8.0'
|
||||
dependencies:
|
||||
- lowest
|
||||
- highest
|
||||
services:
|
||||
nsqd:
|
||||
image: nsqio/nsq:v1.2.0
|
||||
options: --entrypoint /nsqd
|
||||
ports:
|
||||
- 4150:4150
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: ${{ matrix.php }}
|
||||
coverage: pcov
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Setup Problem Matchers for PHPUnit
|
||||
run: echo "::add-matcher::${{ runner.tool_cache }}/phpunit.json"
|
||||
|
||||
- name: Determine Composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=directory::$(composer config cache-dir)"
|
||||
|
||||
- name: Cache Composer dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.directory }}
|
||||
key: ${{ runner.os }}-${{ matrix.php }}-composer-${{ matrix.dependencies }}-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-${{ matrix.php }}-${{ matrix.dependencies }}-composer-
|
||||
|
||||
- name: Install highest dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
if: ${{ matrix.dependencies == 'highest' }}
|
||||
|
||||
- name: Install lowest dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest
|
||||
if: ${{ matrix.dependencies == 'lowest' }}
|
||||
|
||||
- name: Run tests
|
||||
run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml
|
||||
|
||||
- name: Upload code coverage
|
||||
uses: codecov/codecov-action@v1
|
||||
with:
|
||||
file: build/coverage-report.xml
|
||||
|
||||
php-cs-fixer:
|
||||
name: PHP-CS-Fixer
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.0'
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Run script
|
||||
run: composer cs-check
|
||||
|
||||
phpstan:
|
||||
name: PHPStan
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.0'
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Run script
|
||||
run: composer phpstan
|
||||
|
||||
psalm:
|
||||
name: Psalm
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.0'
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Run script
|
||||
run: vendor/bin/psalm --output-format=github
|
||||
|
||||
infection:
|
||||
name: Infection
|
||||
runs-on: ubuntu-latest
|
||||
services:
|
||||
nsqd:
|
||||
image: nsqio/nsq:v1.2.0
|
||||
options: --entrypoint /nsqd
|
||||
ports:
|
||||
- 4150:4150
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.0'
|
||||
coverage: pcov
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Run script
|
||||
env:
|
||||
STRYKER_DASHBOARD_API_KEY: ${{ secrets.STRYKER_DASHBOARD_API_KEY }}
|
||||
run: |
|
||||
git fetch --depth=1 origin $GITHUB_BASE_REF
|
||||
php vendor/bin/infection -j2 --git-diff-filter=A --git-diff-base=origin/$GITHUB_BASE_REF --logger-github --ignore-msi-with-no-mutations --only-covered
|
37
.github/workflows/code_style.yaml
vendored
37
.github/workflows/code_style.yaml
vendored
@@ -1,37 +0,0 @@
|
||||
name: Code Style
|
||||
|
||||
on:
|
||||
- pull_request
|
||||
- push
|
||||
|
||||
jobs:
|
||||
php-cs-fixer:
|
||||
name: PHP-CS-Fixer
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.1'
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Run script
|
||||
run: composer cs-check
|
71
.github/workflows/phpunit.yaml
vendored
71
.github/workflows/phpunit.yaml
vendored
@@ -1,71 +0,0 @@
|
||||
name: phpunit
|
||||
|
||||
on:
|
||||
- pull_request
|
||||
- push
|
||||
|
||||
jobs:
|
||||
phpunit:
|
||||
name: phpunit
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os:
|
||||
- ubuntu-latest
|
||||
php:
|
||||
- '8.1'
|
||||
nsq:
|
||||
- nsq-1.2.0.linux-amd64.go1.12.9
|
||||
- nsq-1.2.1.linux-amd64.go1.16.6
|
||||
dependencies:
|
||||
- lowest
|
||||
- highest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: ${{ matrix.php }}
|
||||
coverage: pcov
|
||||
extensions: kjdev/php-ext-snappy@0.2.1
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Download NSQ
|
||||
run: |
|
||||
curl -sSL "http://bitly-downloads.s3.amazonaws.com/nsq/${{ matrix.nsq }}.tar.gz" \
|
||||
| tar -xzv --strip-components=1
|
||||
./bin/nsqd --version
|
||||
|
||||
- name: Setup Problem Matchers for PHPUnit
|
||||
run: echo "::add-matcher::${{ runner.tool_cache }}/phpunit.json"
|
||||
|
||||
- name: Determine Composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=directory::$(composer config cache-dir)"
|
||||
|
||||
- name: Cache Composer dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.directory }}
|
||||
key: ${{ runner.os }}-${{ matrix.php }}-composer-${{ matrix.dependencies }}-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-${{ matrix.php }}-${{ matrix.dependencies }}-composer-
|
||||
|
||||
- name: Install highest dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
if: ${{ matrix.dependencies == 'highest' }}
|
||||
|
||||
- name: Install lowest dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest
|
||||
if: ${{ matrix.dependencies == 'lowest' }}
|
||||
|
||||
- name: Run tests
|
||||
run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml
|
||||
|
||||
- name: Upload code coverage
|
||||
uses: codecov/codecov-action@v1
|
||||
with:
|
||||
file: build/coverage-report.xml
|
70
.github/workflows/static_analyze.yaml
vendored
70
.github/workflows/static_analyze.yaml
vendored
@@ -1,70 +0,0 @@
|
||||
name: Static Analyze
|
||||
|
||||
on:
|
||||
- pull_request
|
||||
- push
|
||||
|
||||
jobs:
|
||||
phpstan:
|
||||
name: PHPStan
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.1'
|
||||
extensions: snappy-kjdev/php-ext-snappy@0.2.1
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Run script
|
||||
run: composer phpstan
|
||||
|
||||
psalm:
|
||||
name: Psalm
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.1'
|
||||
extensions: snappy-kjdev/php-ext-snappy@0.2.1
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Run script
|
||||
run: vendor/bin/psalm --output-format=github
|
13
.gitignore
vendored
13
.gitignore
vendored
@@ -1,17 +1,6 @@
|
||||
/vendor/
|
||||
/composer.lock
|
||||
|
||||
/.php-cs-fixer.cache
|
||||
/.php_cs.cache
|
||||
/.phpunit.result.cache
|
||||
/infection.log
|
||||
|
||||
# Nsq
|
||||
bin/nsq_stat
|
||||
bin/nsq_tail
|
||||
bin/nsq_to_file
|
||||
bin/nsq_to_http
|
||||
bin/nsq_to_nsq
|
||||
bin/nsqadmin
|
||||
bin/nsqd
|
||||
bin/nsqlookupd
|
||||
bin/to_nsq
|
||||
|
@@ -1,45 +0,0 @@
|
||||
<?php
|
||||
|
||||
$finder = PhpCsFixer\Finder::create()
|
||||
->in(__DIR__)
|
||||
->exclude('vendor');
|
||||
|
||||
return (new PhpCsFixer\Config())
|
||||
->setRiskyAllowed(true)
|
||||
->setRules([
|
||||
'@PhpCsFixer' => true,
|
||||
'@PhpCsFixer:risky' => true,
|
||||
'@PSR12' => true,
|
||||
'@PSR12:risky' => true,
|
||||
'blank_line_before_statement' => [
|
||||
'statements' => [
|
||||
'continue',
|
||||
'do',
|
||||
'exit',
|
||||
'goto',
|
||||
'if',
|
||||
'return',
|
||||
'switch',
|
||||
'throw',
|
||||
'try',
|
||||
],
|
||||
],
|
||||
'declare_strict_types' => true,
|
||||
'global_namespace_import' => [
|
||||
'import_classes' => false,
|
||||
'import_constants' => false,
|
||||
'import_functions' => false,
|
||||
],
|
||||
'php_unit_internal_class' => false,
|
||||
'php_unit_test_case_static_method_calls' => ['call_type' => 'self'],
|
||||
'php_unit_test_class_requires_covers' => false,
|
||||
'phpdoc_to_comment' => false,
|
||||
'yoda_style' => true,
|
||||
'trailing_comma_in_multiline' => [
|
||||
'after_heredoc' => true,
|
||||
'elements' => ['arrays', 'arguments', 'parameters'],
|
||||
],
|
||||
'types_spaces' => ['space' => 'single'],
|
||||
])
|
||||
->setFinder($finder)
|
||||
->setCacheFile(__DIR__.'/.php-cs-fixer.cache');
|
23
.php_cs.dist
Normal file
23
.php_cs.dist
Normal file
@@ -0,0 +1,23 @@
|
||||
#!/usr/bin/env php
|
||||
<?php
|
||||
|
||||
return (new PhpCsFixer\Config())
|
||||
->setRiskyAllowed(true)
|
||||
->setRules([
|
||||
'@PhpCsFixer' => true,
|
||||
'@PhpCsFixer:risky' => true,
|
||||
'@PSR12' => true,
|
||||
'@PSR12:risky' => true,
|
||||
'declare_strict_types' => true,
|
||||
'php_unit_internal_class' => false,
|
||||
'php_unit_test_class_requires_covers' => false,
|
||||
'yoda_style' => true,
|
||||
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
|
||||
'blank_line_before_statement' => [
|
||||
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try']
|
||||
],
|
||||
])
|
||||
->setFinder(
|
||||
PhpCsFixer\Finder::create()
|
||||
->in(__DIR__)
|
||||
);
|
20
Makefile
20
Makefile
@@ -1,20 +0,0 @@
|
||||
|
||||
all: install composer-validate php-cs-fixer psalm phpstan phpunit
|
||||
|
||||
install:
|
||||
composer install
|
||||
|
||||
psalm:
|
||||
php vendor/bin/psalm
|
||||
|
||||
phpstan:
|
||||
php vendor/bin/phpstan analyse
|
||||
|
||||
phpunit:
|
||||
php vendor/bin/phpunit
|
||||
|
||||
php-cs-fixer:
|
||||
php vendor/bin/php-cs-fixer fix
|
||||
|
||||
composer-validate:
|
||||
composer validate
|
73
README.md
73
README.md
@@ -1,12 +1,11 @@
|
||||
# Nsq PHP
|
||||
|
||||
<img src="https://github.com/nsqphp/nsqphp/raw/master/docs/logo.png" alt="" align="left" width="150">
|
||||
<img src="https://github.com/nsqphp/nsqphp/raw/main/logo.png" alt="" align="left" width="150">
|
||||
|
||||
PHP Client for [NSQ](https://nsq.io/).
|
||||
|
||||
[](//packagist.org/packages/nsq/nsq) [](//packagist.org/packages/nsq/nsq) [](//packagist.org/packages/nsq/nsq)
|
||||
[](https://codecov.io/gh/nsqphp/nsqphp) [](https://dashboard.stryker-mutator.io/reports/github.com/nsqphp/nsqphp/master) [](http://t.me/grachevko)
|
||||
|
||||
[](https://codecov.io/gh/nsqphp/nsqphp) [](https://dashboard.stryker-mutator.io/reports/github.com/nsqphp/nsqphp/main)
|
||||
|
||||
This library follow [SemVer](https://semver.org/). Until version 1.0 will be released anything MAY change at any time, public API SHOULD NOT be considered stable. If you want use it before stable version was released install strict version without range.
|
||||
|
||||
@@ -32,75 +31,67 @@ Features
|
||||
- [x] PUB
|
||||
- [x] SUB
|
||||
- [X] Feature Negotiation
|
||||
- [X] Discovery
|
||||
- [ ] Discovery
|
||||
- [ ] Backoff
|
||||
- [X] TLS
|
||||
- [X] Deflate
|
||||
- [X] Snappy
|
||||
- [ ] TLS
|
||||
- [ ] Snappy
|
||||
- [X] Sampling
|
||||
- [X] AUTH
|
||||
|
||||
Usage
|
||||
-----
|
||||
|
||||
### Producer
|
||||
### Publish
|
||||
|
||||
```php
|
||||
use Nsq\Producer;
|
||||
|
||||
$producer = Producer::create(address: 'tcp://nsqd:4150');
|
||||
$producer = new Producer(address: 'tcp://nsqd:4150');
|
||||
|
||||
// Publish a message to a topic
|
||||
$producer->publish('topic', 'Simple message');
|
||||
$producer->pub('topic', 'Simple message');
|
||||
|
||||
// Publish multiple messages to a topic (atomically)
|
||||
$producer->publish('topic', [
|
||||
$producer->mpub('topic', [
|
||||
'Message one',
|
||||
'Message two',
|
||||
]);
|
||||
|
||||
// Publish a deferred message to a topic
|
||||
$producer->publish('topic', 'Deferred message', delay: 5000);
|
||||
$producer->dpub('topic', 'Deferred message', delay: 5000);
|
||||
```
|
||||
|
||||
### Consumer
|
||||
### Subscription
|
||||
|
||||
```php
|
||||
use Nsq\Consumer;
|
||||
use Nsq\Message;
|
||||
use Nsq\Subscriber;
|
||||
|
||||
$consumer = Consumer::create(
|
||||
address: 'tcp://nsqd:4150',
|
||||
topic: 'topic',
|
||||
channel: 'channel',
|
||||
onMessage: static function (Message $message): Generator {
|
||||
yield $message->touch(); // Reset the timeout for an in-flight message
|
||||
yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
|
||||
yield $message->finish(); // Finish a message (indicate successful processing)
|
||||
},
|
||||
);
|
||||
```
|
||||
$consumer = new Consumer('tcp://nsqd:4150');
|
||||
$subscriber = new Subscriber($consumer);
|
||||
|
||||
### Lookup
|
||||
$generator = $subscriber->subscribe('topic', 'channel');
|
||||
foreach ($generator as $message) {
|
||||
if ($message instanceof Message) {
|
||||
$payload = $message->body;
|
||||
|
||||
```php
|
||||
use Nsq\Lookup;
|
||||
use Nsq\Message;
|
||||
// handle message
|
||||
|
||||
$lookup = new Lookup('http://nsqlookupd0:4161');
|
||||
$lookup = new Lookup(['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161']);
|
||||
$message->touch(); // Reset the timeout for an in-flight message
|
||||
$message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
|
||||
$message->finish(); // Finish a message (indicate successful processing)
|
||||
}
|
||||
|
||||
// In case of nothing received during timeout generator will return NULL
|
||||
// Here we can do something between messages, like pcntl_signal_dispatch()
|
||||
|
||||
// We can also communicate with generator through send
|
||||
// for example:
|
||||
|
||||
$callable = static function (Message $message): Generator {
|
||||
yield $message->touch(); // Reset the timeout for an in-flight message
|
||||
yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
|
||||
yield $message->finish(); // Finish a message (indicate successful processing)
|
||||
};
|
||||
|
||||
$lookup->subscribe(topic: 'topic', channel: 'channel', onMessage: $callable);
|
||||
$lookup->subscribe(topic: 'anotherTopic', channel: 'channel', onMessage: $callable);
|
||||
|
||||
$lookup->unsubscribe(topic: 'local', channel: 'channel');
|
||||
$lookup->stop(); // unsubscribe all
|
||||
// Gracefully close connection (loop will be ended)
|
||||
$generator->send(Subscriber::STOP);
|
||||
}
|
||||
```
|
||||
|
||||
### Integrations
|
||||
|
@@ -11,54 +11,38 @@
|
||||
}
|
||||
],
|
||||
"require": {
|
||||
"php": "^8.1",
|
||||
"php": "^8.0.1",
|
||||
"ext-json": "*",
|
||||
"amphp/http-client": "^4.6",
|
||||
"amphp/socket": "^1.1",
|
||||
"clue/socket-raw": "^1.5",
|
||||
"composer/semver": "^3.2",
|
||||
"phpinnacle/buffer": "^1.2",
|
||||
"psr/log": "^3.0"
|
||||
"psr/log": "^1.1"
|
||||
},
|
||||
"require-dev": {
|
||||
"amphp/log": "^1.1",
|
||||
"dg/bypass-finals": "^1.3",
|
||||
"ergebnis/composer-normalize": "^2.15",
|
||||
"friendsofphp/php-cs-fixer": "^3.4",
|
||||
"ergebnis/composer-normalize": "9999999-dev",
|
||||
"friendsofphp/php-cs-fixer": "^2.18",
|
||||
"infection/infection": "^0.20.2",
|
||||
"nyholm/nsa": "^1.2",
|
||||
"phpstan/phpstan": "^1.8",
|
||||
"phpstan/phpstan-phpunit": "^1.1",
|
||||
"phpstan/phpstan-strict-rules": "^1.3",
|
||||
"phpstan/phpstan": "^0.12.68",
|
||||
"phpstan/phpstan-phpunit": "^0.12.17",
|
||||
"phpstan/phpstan-strict-rules": "^0.12.9",
|
||||
"phpunit/phpunit": "^9.5",
|
||||
"symfony/filesystem": "^6.1",
|
||||
"symfony/process": "^6.1",
|
||||
"symfony/var-dumper": "^6.1",
|
||||
"vimeo/psalm": "^4.4"
|
||||
},
|
||||
"config": {
|
||||
"sort-packages": true,
|
||||
"allow-plugins": {
|
||||
"ergebnis/composer-normalize": true,
|
||||
"infection/extension-installer": true
|
||||
}
|
||||
"sort-packages": true
|
||||
},
|
||||
"autoload": {
|
||||
"psr-4": {
|
||||
"Nsq\\": "src/"
|
||||
}
|
||||
},
|
||||
"autoload-dev": {
|
||||
"classmap": [
|
||||
"tests/"
|
||||
],
|
||||
"files": [
|
||||
"vendor/symfony/var-dumper/Resources/functions/dump.php"
|
||||
]
|
||||
},
|
||||
"minimum-stability": "dev",
|
||||
"prefer-stable": true,
|
||||
"scripts": {
|
||||
"cs": [
|
||||
"vendor/bin/php-cs-fixer fix --using-cache=no"
|
||||
"vendor/bin/php-cs-fixer fix"
|
||||
],
|
||||
"cs-check": [
|
||||
"vendor/bin/php-cs-fixer fix --verbose --diff --dry-run"
|
||||
@@ -75,7 +59,7 @@
|
||||
"vendor/bin/psalm"
|
||||
],
|
||||
"test": [
|
||||
"@norm",
|
||||
"@norm-check",
|
||||
"@cs",
|
||||
"@phpstan",
|
||||
"@psalm",
|
||||
|
@@ -2,56 +2,14 @@ version: '3.7'
|
||||
|
||||
services:
|
||||
nsqd:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqd'
|
||||
command: >-
|
||||
nsqd
|
||||
--log-level debug
|
||||
--lookupd-tcp-address nsqlookupd0:4160
|
||||
--lookupd-tcp-address nsqlookupd1:4160
|
||||
--lookupd-tcp-address nsqlookupd2:4160
|
||||
|
||||
nsqlookupd0:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqlookupd0'
|
||||
command: /nsqlookupd -log-level debug
|
||||
|
||||
nsqlookupd1:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqlookupd1'
|
||||
command: /nsqlookupd -log-level debug
|
||||
|
||||
nsqlookupd2:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqlookupd2'
|
||||
command: /nsqlookupd -log-level debug
|
||||
image: nsqio/nsq:v1.2.0
|
||||
command: /nsqd
|
||||
ports:
|
||||
- 4150:4150
|
||||
- 4151:4151
|
||||
|
||||
nsqadmin:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqadmin'
|
||||
command:
|
||||
- nsqadmin
|
||||
- --http-address=0.0.0.0:4171
|
||||
- --lookupd-http-address=nsqlookupd0:4161
|
||||
- --lookupd-http-address=nsqlookupd1:4161
|
||||
- --lookupd-http-address=nsqlookupd2:4161
|
||||
depends_on:
|
||||
- nsqlookupd0
|
||||
- nsqlookupd1
|
||||
- nsqlookupd2
|
||||
|
||||
tail:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
command: >-
|
||||
nsq_tail
|
||||
--channel nsq_tail
|
||||
--topic local
|
||||
--lookupd-http-address nsqlookupd1:4161
|
||||
depends_on:
|
||||
- nsqd
|
||||
- nsqlookupd1
|
||||
image: nsqio/nsq:v1.2.0
|
||||
command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171
|
||||
ports:
|
||||
- 4171:4171
|
||||
|
@@ -1,47 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
require dirname(__DIR__).'/../vendor/autoload.php';
|
||||
|
||||
use Amp\ByteStream;
|
||||
use Amp\Log\ConsoleFormatter;
|
||||
use Amp\Log\StreamHandler;
|
||||
use Amp\Loop;
|
||||
use Monolog\Logger;
|
||||
use Monolog\Processor\PsrLogMessageProcessor;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Config\LookupConfig;
|
||||
use Nsq\Lookup;
|
||||
use Nsq\Message;
|
||||
|
||||
Loop::run(static function () {
|
||||
$handler = new StreamHandler(ByteStream\getStdout());
|
||||
$handler->setFormatter(new ConsoleFormatter());
|
||||
$logger = new Logger('consumer', [$handler], [new PsrLogMessageProcessor()]);
|
||||
|
||||
$callable = static function (Message $message) {
|
||||
yield $message->finish();
|
||||
};
|
||||
|
||||
$clientConfig = new ClientConfig();
|
||||
|
||||
$lookupConfig = new LookupConfig();
|
||||
|
||||
$watcherId = Loop::repeat(5000, function () {
|
||||
yield Amp\Dns\resolver()->reloadConfig();
|
||||
});
|
||||
|
||||
$lookup = Lookup::create(
|
||||
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
|
||||
$lookupConfig,
|
||||
$logger,
|
||||
);
|
||||
|
||||
$lookup->subscribe('local', 'local', $callable, $clientConfig);
|
||||
|
||||
Loop::delay(10000, function () use ($lookup, $watcherId) {
|
||||
$lookup->stop();
|
||||
Loop::cancel($watcherId);
|
||||
});
|
||||
});
|
@@ -1,102 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
require dirname(__DIR__).'/../vendor/autoload.php';
|
||||
|
||||
use Amp\ByteStream;
|
||||
use Amp\Log\ConsoleFormatter;
|
||||
use Amp\Log\StreamHandler;
|
||||
use Amp\Loop;
|
||||
use Monolog\Logger;
|
||||
use Monolog\Processor\PsrLogMessageProcessor;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Config\LookupConfig;
|
||||
use Nsq\Lookup;
|
||||
use Nsq\Producer;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\delay;
|
||||
|
||||
Loop::run(static function () {
|
||||
$handler = new StreamHandler(ByteStream\getStdout());
|
||||
$handler->setFormatter(new ConsoleFormatter());
|
||||
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
|
||||
|
||||
$clientConfig = new ClientConfig();
|
||||
|
||||
/** @var Producer[] $producers */
|
||||
$producers = [];
|
||||
|
||||
$lookupConfig = new LookupConfig();
|
||||
|
||||
$lookup = Lookup::create(
|
||||
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
|
||||
$lookupConfig,
|
||||
$logger,
|
||||
);
|
||||
|
||||
$isRunning = true;
|
||||
|
||||
asyncCall(static function () use ($lookup, $clientConfig, $logger, &$producers, &$isRunning) {
|
||||
while ($isRunning) {
|
||||
/** @var Lookup\Producer[] $nodes */
|
||||
$nodes = yield $lookup->nodes();
|
||||
|
||||
foreach ($nodes as $node) {
|
||||
$address = $node->toTcpUri();
|
||||
|
||||
if (array_key_exists($address, $producers)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
asyncCall(function () use ($address, $clientConfig, $logger, &$producers) {
|
||||
yield ($producers[$address] = Producer::create($address, $clientConfig, $logger))
|
||||
->onClose(function () use (&$producers, $address) {
|
||||
unset($producers[$address]);
|
||||
})
|
||||
->connect()
|
||||
;
|
||||
});
|
||||
}
|
||||
|
||||
yield delay(5000);
|
||||
|
||||
yield Amp\Dns\resolver()->reloadConfig(); // for reload /etc/hosts
|
||||
}
|
||||
});
|
||||
|
||||
Loop::delay(5000, function () use (&$isRunning, $logger) {
|
||||
$logger->info('Stopping producer.');
|
||||
|
||||
$isRunning = false;
|
||||
});
|
||||
|
||||
$counter = 0;
|
||||
while (true) {
|
||||
if (!$isRunning) {
|
||||
foreach ($producers as $producer) {
|
||||
$producer->close();
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if ([] === $producers) {
|
||||
yield delay(200);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
$index = array_rand($producers);
|
||||
$producer = $producers[$index];
|
||||
|
||||
if (!$producer->isConnected()) {
|
||||
yield delay(100);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
yield $producer->publish('local', 'This is message of count '.$counter++);
|
||||
}
|
||||
});
|
@@ -1,44 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
require __DIR__.'/../vendor/autoload.php';
|
||||
|
||||
use Amp\ByteStream;
|
||||
use Amp\Log\ConsoleFormatter;
|
||||
use Amp\Log\StreamHandler;
|
||||
use Amp\Loop;
|
||||
use Amp\Promise;
|
||||
use Monolog\Logger;
|
||||
use Monolog\Processor\PsrLogMessageProcessor;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Consumer;
|
||||
use Nsq\Message;
|
||||
|
||||
use function Amp\call;
|
||||
|
||||
Loop::run(static function () {
|
||||
$handler = new StreamHandler(ByteStream\getStdout());
|
||||
$handler->setFormatter(new ConsoleFormatter());
|
||||
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
|
||||
|
||||
$consumer = new Consumer(
|
||||
'tcp://localhost:4150',
|
||||
topic: 'local',
|
||||
channel: 'local',
|
||||
onMessage: static function (Message $message) use ($logger): Promise {
|
||||
return call(function () use ($message, $logger): Generator {
|
||||
$logger->info('Received: {body}', ['body' => $message->body]);
|
||||
|
||||
yield $message->finish();
|
||||
});
|
||||
},
|
||||
clientConfig: new ClientConfig(
|
||||
deflate: false,
|
||||
snappy: true,
|
||||
),
|
||||
logger: $logger,
|
||||
);
|
||||
|
||||
yield $consumer->connect();
|
||||
});
|
@@ -1,36 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
require __DIR__.'/../vendor/autoload.php';
|
||||
|
||||
use Amp\ByteStream;
|
||||
use Amp\Log\ConsoleFormatter;
|
||||
use Amp\Log\StreamHandler;
|
||||
use Amp\Loop;
|
||||
use Monolog\Logger;
|
||||
use Monolog\Processor\PsrLogMessageProcessor;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Producer;
|
||||
|
||||
Loop::run(static function () {
|
||||
$handler = new StreamHandler(ByteStream\getStdout());
|
||||
$handler->setFormatter(new ConsoleFormatter());
|
||||
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
|
||||
|
||||
$producer = new Producer(
|
||||
'tcp://localhost:4150',
|
||||
clientConfig: new ClientConfig(
|
||||
deflate: false,
|
||||
heartbeatInterval: 5000,
|
||||
snappy: true,
|
||||
),
|
||||
logger: $logger,
|
||||
);
|
||||
|
||||
yield $producer->connect();
|
||||
|
||||
while (true) {
|
||||
yield $producer->publish(topic: 'local', body: array_fill(0, 200, 'Message body!'));
|
||||
}
|
||||
});
|
Before Width: | Height: | Size: 98 KiB After Width: | Height: | Size: 98 KiB |
@@ -8,9 +8,3 @@ parameters:
|
||||
paths:
|
||||
- src
|
||||
- tests
|
||||
ignoreErrors:
|
||||
-
|
||||
message: '#no value type specified in iterable type array#'
|
||||
paths:
|
||||
- %currentWorkingDirectory%/src
|
||||
- %currentWorkingDirectory%/tests
|
||||
|
@@ -1,5 +1,6 @@
|
||||
<?xml version="1.0"?>
|
||||
<psalm
|
||||
allowPhpStormGenerics="true"
|
||||
ignoreInternalFunctionFalseReturn="false"
|
||||
ignoreInternalFunctionNullReturn="false"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
|
@@ -1,3 +0,0 @@
|
||||
{
|
||||
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
|
||||
}
|
@@ -1,37 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use PHPinnacle\Buffer\ByteBuffer;
|
||||
|
||||
/**
|
||||
* @psalm-suppress
|
||||
*/
|
||||
final class Buffer extends ByteBuffer
|
||||
{
|
||||
public function readUInt32LE(): int
|
||||
{
|
||||
$unpacked = unpack('V', $this->consume(4));
|
||||
|
||||
\assert(\is_array($unpacked) && \array_key_exists(1, $unpacked));
|
||||
|
||||
return $unpacked[1];
|
||||
}
|
||||
|
||||
public function consumeTimestamp(): int
|
||||
{
|
||||
return $this->consumeUint64();
|
||||
}
|
||||
|
||||
public function consumeAttempts(): int
|
||||
{
|
||||
return $this->consumeUint16();
|
||||
}
|
||||
|
||||
public function consumeMessageID(): string
|
||||
{
|
||||
return $this->consume(16);
|
||||
}
|
||||
}
|
111
src/Command.php
111
src/Command.php
@@ -1,111 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use PHPinnacle\Buffer\ByteBuffer;
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
final class Command
|
||||
{
|
||||
public static function magic(): string
|
||||
{
|
||||
return ' V2';
|
||||
}
|
||||
|
||||
public static function identify(string $data): string
|
||||
{
|
||||
return self::pack('IDENTIFY', data: $data);
|
||||
}
|
||||
|
||||
public static function auth(?string $authSecret): string
|
||||
{
|
||||
return self::pack('AUTH', data: $authSecret);
|
||||
}
|
||||
|
||||
public static function nop(): string
|
||||
{
|
||||
return self::pack('NOP');
|
||||
}
|
||||
|
||||
public static function cls(): string
|
||||
{
|
||||
return self::pack('CLS');
|
||||
}
|
||||
|
||||
public static function rdy(int $count): string
|
||||
{
|
||||
return self::pack('RDY', (string) $count);
|
||||
}
|
||||
|
||||
public static function fin(string $id): string
|
||||
{
|
||||
return self::pack('FIN', $id);
|
||||
}
|
||||
|
||||
public static function req(string $id, int $timeout): string
|
||||
{
|
||||
return self::pack('REQ', [$id, $timeout]);
|
||||
}
|
||||
|
||||
public static function touch(string $id): string
|
||||
{
|
||||
return self::pack('TOUCH', $id);
|
||||
}
|
||||
|
||||
public static function pub(string $topic, string $body): string
|
||||
{
|
||||
return self::pack('PUB', $topic, $body);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, string> $bodies
|
||||
*/
|
||||
public static function mpub(string $topic, array $bodies): string
|
||||
{
|
||||
static $buffer;
|
||||
$buffer ??= new ByteBuffer();
|
||||
|
||||
$buffer->appendUint32(\count($bodies));
|
||||
|
||||
foreach ($bodies as $body) {
|
||||
$buffer->appendUint32(\strlen($body));
|
||||
$buffer->append($body);
|
||||
}
|
||||
|
||||
return self::pack('MPUB', $topic, $buffer->flush());
|
||||
}
|
||||
|
||||
public static function dpub(string $topic, string $body, int $delay): string
|
||||
{
|
||||
return self::pack('DPUB', [$topic, $delay], $body);
|
||||
}
|
||||
|
||||
public static function sub(string $topic, string $channel): string
|
||||
{
|
||||
return self::pack('SUB', [$topic, $channel]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, scalar>|string $params
|
||||
*/
|
||||
private static function pack(string $command, array | string $params = [], string $data = null): string
|
||||
{
|
||||
static $buffer;
|
||||
$buffer ??= new Buffer();
|
||||
|
||||
$command = implode(' ', [$command, ...((array) $params)]);
|
||||
|
||||
$buffer->append($command.PHP_EOL);
|
||||
|
||||
if (null !== $data) {
|
||||
$buffer->appendUint32(\strlen($data));
|
||||
$buffer->append($data);
|
||||
}
|
||||
|
||||
return $buffer->flush();
|
||||
}
|
||||
}
|
@@ -5,6 +5,9 @@ declare(strict_types=1);
|
||||
namespace Nsq\Config;
|
||||
|
||||
use Composer\InstalledVersions;
|
||||
use InvalidArgumentException;
|
||||
use JsonSerializable;
|
||||
use function gethostname;
|
||||
|
||||
/**
|
||||
* This class is used for configuring the clients for nsq. Immutable properties must be set when creating the object and
|
||||
@@ -13,51 +16,25 @@ use Composer\InstalledVersions;
|
||||
*
|
||||
* @psalm-immutable
|
||||
*/
|
||||
final class ClientConfig
|
||||
final class ClientConfig implements JsonSerializable
|
||||
{
|
||||
/**
|
||||
* @psalm-suppress ImpureFunctionCall
|
||||
*/
|
||||
/** @psalm-suppress ImpureFunctionCall */
|
||||
public function __construct(
|
||||
/**
|
||||
/*
|
||||
* The secret used for authorization, if the server requires it. This value will be ignored if the server
|
||||
* does not require authorization.
|
||||
*/
|
||||
public ?string $authSecret = null,
|
||||
|
||||
/**
|
||||
* The timeout for establishing a connection in milliseconds.
|
||||
*/
|
||||
public int $connectTimeout = 10000,
|
||||
// The timeout for establishing a connection in seconds.
|
||||
public int $connectTimeout = 10,
|
||||
|
||||
/**
|
||||
* The max attempts for establishing a connection.
|
||||
*/
|
||||
public int $maxAttempts = 0,
|
||||
|
||||
/**
|
||||
* Use tcp_nodelay for establishing a connection.
|
||||
*/
|
||||
public bool $tcpNoDelay = false,
|
||||
public int $rdyCount = 100,
|
||||
|
||||
/**
|
||||
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
||||
* it will send back a JSON payload of supported features and metadata.
|
||||
*/
|
||||
public bool $featureNegotiation = true,
|
||||
|
||||
/**
|
||||
* An identifier used to disambiguate this client (i.e. something specific to the consumer).
|
||||
*/
|
||||
// An identifier used to disambiguate this client (i.e. something specific to the consumer)
|
||||
public string $clientId = '',
|
||||
|
||||
/**
|
||||
* Enable deflate compression for this connection. A client cannot enable both [snappy] and [deflate].
|
||||
*/
|
||||
// Enable deflate compression for this connection. A client cannot enable both [snappy] and [deflate].
|
||||
public bool $deflate = false,
|
||||
|
||||
/**
|
||||
/*
|
||||
* Configure the deflate compression level for this connection.
|
||||
*
|
||||
* Valid range: `1 <= deflate_level <= configured_max`
|
||||
@@ -66,43 +43,42 @@ final class ClientConfig
|
||||
*/
|
||||
public int $deflateLevel = 6,
|
||||
|
||||
/**
|
||||
/*
|
||||
* Milliseconds between heartbeats.
|
||||
*
|
||||
* Valid range: `1000 <= heartbeat_interval <= configured_max` (`-1` disables heartbeats)
|
||||
*/
|
||||
public int $heartbeatInterval = 30000,
|
||||
|
||||
/**
|
||||
* The hostname where the client is deployed.
|
||||
*/
|
||||
// The hostname where the client is deployed
|
||||
public string $hostname = '',
|
||||
|
||||
/**
|
||||
* Configure the server-side message timeout in milliseconds for messages delivered to this client.
|
||||
*/
|
||||
// Configure the server-side message timeout in milliseconds for messages delivered to this client.
|
||||
public int $msgTimeout = 60000,
|
||||
|
||||
/**
|
||||
/*
|
||||
* The sample rate for incoming data to deliver a percentage of all messages received to this connection.
|
||||
* This only applies to subscribing connections. The valid range is between 0 and 99, where 0 means that all
|
||||
* data is sent (this is the default). 1 means that 1% of the data is sent.
|
||||
*/
|
||||
public int $sampleRate = 0,
|
||||
|
||||
/**
|
||||
* Enable TLS for this connection.
|
||||
/*
|
||||
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
||||
* it will send back a JSON payload of supported features and metadata.
|
||||
*/
|
||||
public bool $featureNegotiation = true,
|
||||
|
||||
// Enable TLS for this connection
|
||||
public bool $tls = false,
|
||||
|
||||
/**
|
||||
* Enable snappy compression for this connection. A client cannot enable both [snappy] and [deflate].
|
||||
*/
|
||||
// Enable snappy compression for this connection. A client cannot enable both [snappy] and [deflate].
|
||||
public bool $snappy = false,
|
||||
|
||||
/**
|
||||
* A string identifying the agent for this client in the spirit of HTTP.
|
||||
*/
|
||||
// The read timeout for connection sockets and for awaiting responses from nsq.
|
||||
public int $readTimeout = 5,
|
||||
|
||||
// A string identifying the agent for this client in the spirit of HTTP.
|
||||
public string $userAgent = '',
|
||||
) {
|
||||
$this->featureNegotiation = true; // Always enabled
|
||||
@@ -116,18 +92,16 @@ final class ClientConfig
|
||||
}
|
||||
|
||||
if ($this->snappy && $this->deflate) {
|
||||
throw new \InvalidArgumentException('Client cannot enable both [snappy] and [deflate]');
|
||||
throw new InvalidArgumentException('Client cannot enable both [snappy] and [deflate]');
|
||||
}
|
||||
}
|
||||
|
||||
public static function fromArray(array $array): self
|
||||
/**
|
||||
* @phpstan-ignore-next-line
|
||||
*/
|
||||
public function jsonSerialize(): array
|
||||
{
|
||||
return new self(...array_intersect_key($array, get_class_vars(self::class)));
|
||||
}
|
||||
|
||||
public function asNegotiationPayload(): string
|
||||
{
|
||||
$data = [
|
||||
return [
|
||||
'client_id' => $this->clientId,
|
||||
'deflate' => $this->deflate,
|
||||
'deflate_level' => $this->deflateLevel,
|
||||
@@ -140,7 +114,5 @@ final class ClientConfig
|
||||
'tls_v1' => $this->tls,
|
||||
'user_agent' => $this->userAgent,
|
||||
];
|
||||
|
||||
return json_encode($data, JSON_THROW_ON_ERROR);
|
||||
}
|
||||
}
|
||||
|
@@ -8,82 +8,58 @@ namespace Nsq\Config;
|
||||
* The configuration object that holds the config status for a single Connection.
|
||||
*
|
||||
* @psalm-immutable
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
final class ServerConfig
|
||||
final class ConnectionConfig
|
||||
{
|
||||
public function __construct(
|
||||
/**
|
||||
* Whether or not authorization is required by nsqd.
|
||||
*/
|
||||
// Whether or not authorization is required by nsqd.
|
||||
public bool $authRequired,
|
||||
|
||||
/**
|
||||
* Whether deflate compression is enabled for this connection or not.
|
||||
*/
|
||||
// Whether deflate compression is enabled for this connection or not.
|
||||
public bool $deflate,
|
||||
|
||||
/**
|
||||
* The deflate level. This value can be ignored if [deflate] is `false`.
|
||||
*/
|
||||
// The deflate level. This value can be ignored if [deflate] is `false`.
|
||||
public int $deflateLevel,
|
||||
|
||||
/**
|
||||
* The maximum deflate level supported by the server.
|
||||
*/
|
||||
// The maximum deflate level supported by the server.
|
||||
public int $maxDeflateLevel,
|
||||
|
||||
/**
|
||||
* The maximum value for message timeout.
|
||||
*/
|
||||
// The maximum value for message timeout.
|
||||
public int $maxMsgTimeout,
|
||||
|
||||
/**
|
||||
/*
|
||||
* Each nsqd is configurable with a max-rdy-count. If the consumer sends a RDY count that is outside
|
||||
* of the acceptable range its connection will be forcefully closed.
|
||||
*/
|
||||
public int $maxRdyCount,
|
||||
|
||||
/**
|
||||
* The effective message timeout.
|
||||
*/
|
||||
// The effective message timeout.
|
||||
public int $msgTimeout,
|
||||
|
||||
/**
|
||||
* The size in bytes of the buffer nsqd will use when writing to this client.
|
||||
*/
|
||||
// The size in bytes of the buffer nsqd will use when writing to this client.
|
||||
public int $outputBufferSize,
|
||||
|
||||
/**
|
||||
* The timeout after which any data that nsqd has buffered will be flushed to this client.
|
||||
*/
|
||||
// The timeout after which any data that nsqd has buffered will be flushed to this client.
|
||||
public int $outputBufferTimeout,
|
||||
|
||||
/**
|
||||
/*
|
||||
* The sample rate for incoming data to deliver a percentage of all messages received to this connection.
|
||||
* This only applies to subscribing connections. The valid range is between 0 and 99, where 0 means that all
|
||||
* data is sent (this is the default). 1 means that 1% of the data is sent.
|
||||
*/
|
||||
public int $sampleRate,
|
||||
|
||||
/**
|
||||
* Whether snappy compression is enabled for this connection or not.
|
||||
*/
|
||||
// Whether snappy compression is enabled for this connection or not.
|
||||
public bool $snappy,
|
||||
|
||||
/**
|
||||
* Whether TLS is enabled for this connection or not.
|
||||
*/
|
||||
// Whether TLS is enabled for this connection or not.
|
||||
public bool $tls,
|
||||
|
||||
/**
|
||||
* The nsqd version.
|
||||
*/
|
||||
// The nsqd version.
|
||||
public string $version,
|
||||
) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @phpstan-ignore-next-line
|
||||
*/
|
||||
public static function fromArray(array $array): self
|
||||
{
|
||||
return new self(
|
@@ -1,13 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Config;
|
||||
|
||||
final class LookupConfig
|
||||
{
|
||||
public function __construct(
|
||||
public int $pollingInterval = 10000,
|
||||
) {
|
||||
}
|
||||
}
|
@@ -4,266 +4,340 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\ByteStream\ClosedException;
|
||||
use Amp\Failure;
|
||||
use Amp\Promise;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Config\ServerConfig;
|
||||
use Nsq\Config\ConnectionConfig;
|
||||
use Nsq\Exception\AuthenticationRequired;
|
||||
use Nsq\Exception\NsqException;
|
||||
use Nsq\Frame\Response;
|
||||
use Nsq\Stream\GzipStream;
|
||||
use Nsq\Stream\NullStream;
|
||||
use Nsq\Stream\SnappyStream;
|
||||
use Nsq\Stream\SocketStream;
|
||||
use Nsq\Exception\ConnectionFail;
|
||||
use Nsq\Exception\UnexpectedResponse;
|
||||
use Nsq\Reconnect\ExponentialStrategy;
|
||||
use Nsq\Reconnect\ReconnectStrategy;
|
||||
use PHPinnacle\Buffer\ByteBuffer;
|
||||
use Psr\Log\LoggerAwareTrait;
|
||||
use Psr\Log\LoggerInterface;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\call;
|
||||
use Psr\Log\NullLogger;
|
||||
use Socket\Raw\Exception;
|
||||
use Socket\Raw\Factory;
|
||||
use Socket\Raw\Socket;
|
||||
use function addcslashes;
|
||||
use function hash;
|
||||
use function http_build_query;
|
||||
use function implode;
|
||||
use function json_encode;
|
||||
use function pack;
|
||||
use function snappy_compress;
|
||||
use function unpack;
|
||||
use const JSON_FORCE_OBJECT;
|
||||
use const JSON_THROW_ON_ERROR;
|
||||
use const PHP_EOL;
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*
|
||||
* @property ConnectionConfig $connectionConfig
|
||||
*/
|
||||
abstract class Connection
|
||||
{
|
||||
private Stream $stream;
|
||||
use LoggerAwareTrait;
|
||||
|
||||
/**
|
||||
* @var callable
|
||||
*/
|
||||
private $onConnectCallback;
|
||||
private string $address;
|
||||
|
||||
/**
|
||||
* @var callable
|
||||
*/
|
||||
private $onCloseCallback;
|
||||
private ?Socket $socket = null;
|
||||
|
||||
private ReconnectStrategy $reconnect;
|
||||
|
||||
private ClientConfig $clientConfig;
|
||||
|
||||
private ?ConnectionConfig $connectionConfig = null;
|
||||
|
||||
public function __construct(
|
||||
/**
|
||||
* @readonly
|
||||
*/
|
||||
public string $address,
|
||||
protected ClientConfig $clientConfig,
|
||||
protected LoggerInterface $logger,
|
||||
string $address,
|
||||
ClientConfig $clientConfig = null,
|
||||
ReconnectStrategy $reconnectStrategy = null,
|
||||
LoggerInterface $logger = null,
|
||||
) {
|
||||
$this->stream = new NullStream();
|
||||
$this->onConnectCallback = static function (): void {
|
||||
};
|
||||
$this->onCloseCallback = static function (): void {
|
||||
};
|
||||
$this->address = $address;
|
||||
|
||||
$this->logger = $logger ?? new NullLogger();
|
||||
$this->reconnect = $reconnectStrategy ?? new ExponentialStrategy(logger: $this->logger);
|
||||
$this->clientConfig = $clientConfig ?? new ClientConfig();
|
||||
}
|
||||
|
||||
public function __destruct()
|
||||
public function connect(): void
|
||||
{
|
||||
$this->close(false);
|
||||
}
|
||||
$this->reconnect->connect(function (): void {
|
||||
try {
|
||||
$this->socket = (new Factory())->createClient($this->address);
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->logger->error('Connecting to {address} failed.', ['address' => $this->address]);
|
||||
|
||||
public function isConnected(): bool
|
||||
{
|
||||
return !$this->stream instanceof NullStream;
|
||||
}
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
*/
|
||||
public function connect(): Promise
|
||||
{
|
||||
return call(function (): \Generator {
|
||||
$buffer = new Buffer();
|
||||
$this->socket->write(' V2');
|
||||
|
||||
/** @var SocketStream $stream */
|
||||
$stream = yield SocketStream::connect(
|
||||
$this->address,
|
||||
$this->clientConfig->connectTimeout,
|
||||
$this->clientConfig->maxAttempts,
|
||||
$this->clientConfig->tcpNoDelay,
|
||||
);
|
||||
$body = json_encode($this->clientConfig, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
||||
|
||||
yield $stream->write(Command::magic());
|
||||
yield $stream->write(Command::identify($this->clientConfig->asNegotiationPayload()));
|
||||
$response = $this->command('IDENTIFY', data: $body)->response();
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->response($stream, $buffer);
|
||||
$serverConfig = ServerConfig::fromArray($response->toArray());
|
||||
$this->connectionConfig = ConnectionConfig::fromArray($response->toArray());
|
||||
|
||||
if ($serverConfig->tls) {
|
||||
yield $stream->setupTls();
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->response($stream, $buffer);
|
||||
|
||||
if (!$response->isOk()) {
|
||||
throw new NsqException();
|
||||
}
|
||||
if ($this->connectionConfig->snappy || $this->connectionConfig->deflate) {
|
||||
$this->response()->okOrFail();
|
||||
}
|
||||
|
||||
if ($serverConfig->snappy) {
|
||||
$stream = new SnappyStream($stream, $buffer->flush());
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->response($stream, $buffer);
|
||||
|
||||
if (!$response->isOk()) {
|
||||
throw new NsqException();
|
||||
}
|
||||
}
|
||||
|
||||
if ($serverConfig->deflate) {
|
||||
$stream = new GzipStream($stream, $serverConfig->deflateLevel, $buffer->flush());
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->response($stream, $buffer);
|
||||
|
||||
if (!$response->isOk()) {
|
||||
throw new NsqException();
|
||||
}
|
||||
}
|
||||
|
||||
if ($serverConfig->authRequired) {
|
||||
if ($this->connectionConfig->authRequired) {
|
||||
if (null === $this->clientConfig->authSecret) {
|
||||
throw new AuthenticationRequired();
|
||||
throw new AuthenticationRequired('NSQ requires authorization, set ClientConfig::$authSecret before connecting');
|
||||
}
|
||||
|
||||
yield $stream->write(Command::auth($this->clientConfig->authSecret));
|
||||
$authResponse = $this->command('AUTH', data: $this->clientConfig->authSecret)->response()->toArray();
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->response($stream, $buffer);
|
||||
|
||||
$this->logger->info('Authorization response: '.http_build_query($response->toArray()));
|
||||
$this->logger->info('Authorization response: '.http_build_query($authResponse));
|
||||
}
|
||||
|
||||
$this->stream = $stream;
|
||||
|
||||
($this->onConnectCallback)();
|
||||
});
|
||||
}
|
||||
|
||||
public function close(bool $graceful = true): void
|
||||
/**
|
||||
* Cleanly close your connection (no more messages are sent).
|
||||
*/
|
||||
public function disconnect(): void
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return;
|
||||
}
|
||||
|
||||
$logger = $this->logger;
|
||||
[$stream, $this->stream] = [$this->stream, new NullStream()];
|
||||
|
||||
if ($graceful) {
|
||||
$this->logger->debug('Graceful disconnect.', [
|
||||
'class' => static::class,
|
||||
'address' => $this->address,
|
||||
]);
|
||||
|
||||
asyncCall(static function () use ($stream, $logger): \Generator {
|
||||
try {
|
||||
yield $stream->write(Command::cls());
|
||||
} catch (\Throwable $e) {
|
||||
$logger->warning($e->getMessage(), ['exception' => $e]);
|
||||
}
|
||||
|
||||
$stream->close();
|
||||
});
|
||||
|
||||
if (null === $this->socket) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$stream->close();
|
||||
} catch (ClosedException) {
|
||||
$this->socket->write('CLS'.PHP_EOL);
|
||||
$this->socket->close();
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->logger->debug($e->getMessage(), ['exception' => $e]);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
|
||||
($this->onCloseCallback)();
|
||||
$this->socket = null;
|
||||
$this->connectionConfig = null;
|
||||
}
|
||||
|
||||
public function onConnect(callable $callback): static
|
||||
public function isReady(): bool
|
||||
{
|
||||
$previous = $this->onConnectCallback;
|
||||
$this->onConnectCallback = static function () use ($previous, $callback): void {
|
||||
$previous();
|
||||
$callback();
|
||||
};
|
||||
return null !== $this->socket;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, int|string>|string $params
|
||||
*/
|
||||
protected function command(string $command, array | string $params = [], string $data = null): self
|
||||
{
|
||||
$socket = $this->socket();
|
||||
|
||||
$buffer = [] === $params ? $command : implode(' ', [$command, ...((array) $params)]);
|
||||
$buffer .= PHP_EOL;
|
||||
|
||||
if (null !== $data) {
|
||||
$buffer .= pack('N', \strlen($data));
|
||||
$buffer .= $data;
|
||||
}
|
||||
|
||||
$this->logger->debug('Prepare send uncompressed buffer: {bytes}', ['bytes' => addcslashes($buffer, PHP_EOL)]);
|
||||
|
||||
if ($this->connectionConfig?->snappy) {
|
||||
$identifierFrame = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
|
||||
$compressedFrame = 0x00;
|
||||
$uncompressedFrame = 0x01;
|
||||
|
||||
$chunk = snappy_compress($buffer);
|
||||
[$chunk, $compressFrame] = match (\strlen($chunk) < \strlen($buffer)) {
|
||||
true => [$chunk, $compressedFrame],
|
||||
false => [$buffer, $uncompressedFrame],
|
||||
};
|
||||
|
||||
$size = \strlen($chunk) + 4;
|
||||
|
||||
$buffer = new ByteBuffer();
|
||||
foreach ([...$identifierFrame, $compressFrame, $size, $size >> 8, $size >> 16] as $byte) {
|
||||
$buffer->appendUint8($byte);
|
||||
}
|
||||
|
||||
$crc32c = hash('crc32c', $data, true);
|
||||
$crc32c = unpack('V', $crc32c)[1];
|
||||
|
||||
$unsignedRightShift = static function ($a, $b) {
|
||||
if ($b >= 32 || $b < -32) {
|
||||
$m = (int) ($b / 32);
|
||||
$b -= ($m * 32);
|
||||
}
|
||||
|
||||
if ($b < 0) {
|
||||
$b = 32 + $b;
|
||||
}
|
||||
|
||||
if (0 === $b) {
|
||||
return (($a >> 1) & 0x7fffffff) * 2 + (($a >> $b) & 1);
|
||||
}
|
||||
|
||||
if ($a < 0) {
|
||||
$a >>= 1;
|
||||
$a &= 2147483647;
|
||||
$a |= 0x40000000;
|
||||
$a >>= ($b - 1);
|
||||
} else {
|
||||
$a >>= $b;
|
||||
}
|
||||
|
||||
return $a;
|
||||
};
|
||||
$checksum = $unsignedRightShift((($crc32c >> 15) | ($crc32c << 17)) + 0xa282ead8, 0);
|
||||
|
||||
$buffer->appendUint32($checksum);
|
||||
$buffer->append($chunk);
|
||||
|
||||
$buffer = $buffer->bytes();
|
||||
}
|
||||
|
||||
$this->logger->debug('Prepare send compressed buffer: {bytes}', ['bytes' => addcslashes($buffer, PHP_EOL)]);
|
||||
|
||||
try {
|
||||
$socket->write($buffer);
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->disconnect();
|
||||
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function onClose(callable $callback): static
|
||||
public function hasMessage(float $timeout = 0): bool
|
||||
{
|
||||
$previous = $this->onCloseCallback;
|
||||
$this->onCloseCallback = static function () use ($previous, $callback): void {
|
||||
$previous();
|
||||
$callback();
|
||||
};
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<null|string>
|
||||
*/
|
||||
protected function read(): Promise
|
||||
{
|
||||
return call(function (): \Generator {
|
||||
try {
|
||||
return yield $this->stream->read();
|
||||
} catch (\Throwable $e) {
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
$this->close(false);
|
||||
|
||||
return new Failure($e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
*/
|
||||
protected function write(string $data): Promise
|
||||
{
|
||||
return call(function () use ($data): \Generator {
|
||||
try {
|
||||
return yield $this->stream->write($data);
|
||||
} catch (\Throwable $e) {
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
$this->close(false);
|
||||
|
||||
return new Failure($e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected function handleError(Frame\Error $error): void
|
||||
{
|
||||
$this->logger->error($error->data);
|
||||
|
||||
if (ErrorType::terminable($error)) {
|
||||
$this->close();
|
||||
|
||||
throw $error->toException();
|
||||
try {
|
||||
return false !== $this->socket()->selectRead($timeout);
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->disconnect();
|
||||
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<Frame\Response>
|
||||
*/
|
||||
private function response(Stream $stream, Buffer $buffer): Promise
|
||||
public function receive(float $timeout = null): ?Response
|
||||
{
|
||||
return call(function () use ($stream, $buffer): \Generator {
|
||||
while (true) {
|
||||
$response = Parser::parse($buffer);
|
||||
$socket = $this->socket();
|
||||
|
||||
$timeout ??= $this->clientConfig->readTimeout;
|
||||
$deadline = microtime(true) + $timeout;
|
||||
|
||||
if (!$this->hasMessage($timeout)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
$size = $socket->read(Bytes::BYTES_SIZE);
|
||||
|
||||
if ('' === $size) {
|
||||
$this->disconnect();
|
||||
|
||||
throw new ConnectionFail('Probably connection lost');
|
||||
}
|
||||
|
||||
if ($this->connectionConfig?->snappy) {
|
||||
$buffer = new ByteBuffer();
|
||||
$snappyBuffer = new ByteBuffer($size);
|
||||
while (true) {
|
||||
$typeByte = \ord($snappyBuffer->consume(1));
|
||||
|
||||
$size = \ord($snappyBuffer->consume(1)) + (\ord($snappyBuffer->consume(1)) << 8) + (\ord($snappyBuffer->consume(1)) << 16);
|
||||
$type = match ($typeByte) {
|
||||
0xff => 'identifier',
|
||||
0x00 => 'compressed',
|
||||
0x01 => 'uncompressed',
|
||||
0xfe => 'padding',
|
||||
};
|
||||
|
||||
$this->logger->debug('Received snappy chunk: {type}, size: {size}', [
|
||||
'type' => $type,
|
||||
'size' => $size,
|
||||
]);
|
||||
|
||||
switch ($typeByte) {
|
||||
case 0xff: // 'identifier',
|
||||
$socket->read($size);
|
||||
$snappyBuffer->append($socket->read(4));
|
||||
|
||||
continue 2;
|
||||
case 0x00: // 'compressed',
|
||||
case 0x01: // 'uncompressed',
|
||||
$uncompressed = $socket->read($size);
|
||||
|
||||
$this->logger->debug('Received uncompressed bytes: {bytes}', ['bytes' => $uncompressed]);
|
||||
$buffer->append($uncompressed);
|
||||
$buffer->consume(4); // slice snappy prefix
|
||||
$buffer->consumeUint32(); // slice size
|
||||
|
||||
break 2;
|
||||
case 0xfe:// 'padding',
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$this->logger->debug('Size bytes received: "{bytes}"', ['bytes' => $size]);
|
||||
|
||||
$buffer = new ByteBuffer($size);
|
||||
|
||||
$size = $buffer->consumeUint32();
|
||||
|
||||
do {
|
||||
$chunk = $socket->read($size);
|
||||
|
||||
if (null === $response && null !== ($chunk = yield $stream->read())) {
|
||||
$buffer->append($chunk);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!$response instanceof Frame\Response) {
|
||||
throw new NsqException();
|
||||
}
|
||||
|
||||
return $response;
|
||||
$size -= \strlen($chunk);
|
||||
} while (0 < $size);
|
||||
}
|
||||
});
|
||||
|
||||
$this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL));
|
||||
|
||||
$response = new Response($buffer);
|
||||
|
||||
if ($response->isHeartBeat()) {
|
||||
$this->command('NOP');
|
||||
|
||||
return $this->receive(
|
||||
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
|
||||
);
|
||||
}
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->disconnect();
|
||||
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
|
||||
return $response;
|
||||
}
|
||||
|
||||
protected function response(): Response
|
||||
{
|
||||
return $this->receive() ?? throw UnexpectedResponse::null();
|
||||
}
|
||||
|
||||
private function socket(): Socket
|
||||
{
|
||||
if (null === $this->socket) {
|
||||
$this->connect();
|
||||
}
|
||||
|
||||
return $this->socket ?? throw new ConnectionFail('This connection is closed, create new one.');
|
||||
}
|
||||
}
|
||||
|
223
src/Consumer.php
223
src/Consumer.php
@@ -4,245 +4,60 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\Failure;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Exception\ConsumerException;
|
||||
use Nsq\Frame\Response;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\call;
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
final class Consumer extends Connection
|
||||
{
|
||||
private int $rdy = 0;
|
||||
|
||||
/**
|
||||
* @var callable
|
||||
* Subscribe to a topic/channel.
|
||||
*/
|
||||
private $onMessage;
|
||||
|
||||
public function __construct(
|
||||
string $address,
|
||||
/**
|
||||
* @readonly
|
||||
*/
|
||||
public string $topic,
|
||||
/**
|
||||
* @readonly
|
||||
*/
|
||||
public string $channel,
|
||||
callable $onMessage,
|
||||
ClientConfig $clientConfig,
|
||||
LoggerInterface $logger,
|
||||
) {
|
||||
parent::__construct(
|
||||
$address,
|
||||
$clientConfig,
|
||||
$logger,
|
||||
);
|
||||
|
||||
$this->onMessage = $onMessage;
|
||||
|
||||
$context = compact('address', 'topic', 'channel');
|
||||
$this->onConnect(function () use ($context): void {
|
||||
$this->logger->debug('Consumer connected.', $context);
|
||||
});
|
||||
$this->onClose(function () use ($context): void {
|
||||
$this->logger->debug('Consumer disconnected.', $context);
|
||||
});
|
||||
$this->logger->debug('Consumer created.', $context);
|
||||
}
|
||||
|
||||
public static function create(
|
||||
string $address,
|
||||
string $topic,
|
||||
string $channel,
|
||||
callable $onMessage,
|
||||
?ClientConfig $clientConfig = null,
|
||||
?LoggerInterface $logger = null,
|
||||
): self {
|
||||
return new self(
|
||||
$address,
|
||||
$topic,
|
||||
$channel,
|
||||
$onMessage,
|
||||
$clientConfig ?? new ClientConfig(),
|
||||
$logger ?? new NullLogger(),
|
||||
);
|
||||
}
|
||||
|
||||
public function connect(): Promise
|
||||
public function sub(string $topic, string $channel): void
|
||||
{
|
||||
if ($this->isConnected()) {
|
||||
return call(static function (): void {
|
||||
});
|
||||
}
|
||||
|
||||
return call(function (): \Generator {
|
||||
yield parent::connect();
|
||||
|
||||
$buffer = new Buffer();
|
||||
|
||||
asyncCall(function () use ($buffer): \Generator {
|
||||
yield $this->write(Command::sub($this->topic, $this->channel));
|
||||
|
||||
if (null !== ($chunk = yield $this->read())) {
|
||||
$buffer->append($chunk);
|
||||
}
|
||||
|
||||
/** @var Response $response */
|
||||
$response = Parser::parse($buffer);
|
||||
|
||||
if (!$response->isOk()) {
|
||||
return new Failure(new ConsumerException('Fail subscription.'));
|
||||
}
|
||||
|
||||
yield $this->rdy(1);
|
||||
|
||||
/** @phpstan-ignore-next-line */
|
||||
asyncCall(function () use ($buffer): \Generator {
|
||||
while (null !== $chunk = yield $this->read()) {
|
||||
$buffer->append($chunk);
|
||||
|
||||
while ($frame = Parser::parse($buffer)) {
|
||||
switch (true) {
|
||||
case $frame instanceof Frame\Response:
|
||||
if ($frame->isHeartBeat()) {
|
||||
yield $this->write(Command::nop());
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
throw ConsumerException::response($frame);
|
||||
case $frame instanceof Frame\Error:
|
||||
$this->handleError($frame);
|
||||
|
||||
break;
|
||||
case $frame instanceof Frame\Message:
|
||||
asyncCall($this->onMessage, Message::compose($frame, $this));
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if ($this->rdy !== $this->clientConfig->rdyCount) {
|
||||
yield $this->rdy($this->clientConfig->rdyCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$this->close(false);
|
||||
});
|
||||
});
|
||||
});
|
||||
$this->command('SUB', [$topic, $channel])->response()->okOrFail();
|
||||
}
|
||||
|
||||
/**
|
||||
* Update RDY state (indicate you are ready to receive N messages).
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
*/
|
||||
public function rdy(int $count): Promise
|
||||
public function rdy(int $count): void
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
if ($this->rdy === $count) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ($this->rdy === $count) {
|
||||
return new Success(true);
|
||||
}
|
||||
$this->command('RDY', (string) $count);
|
||||
|
||||
$this->rdy = $count;
|
||||
|
||||
return call(function () use ($count): \Generator {
|
||||
try {
|
||||
yield $this->write(Command::rdy($count));
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish a message (indicate successful processing).
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
public function fin(string $id): Promise
|
||||
public function fin(string $id): void
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
}
|
||||
$this->command('FIN', $id);
|
||||
|
||||
return call(function () use ($id): \Generator {
|
||||
try {
|
||||
yield $this->write(Command::fin($id));
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
--$this->rdy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Re-queue a message (indicate failure to process) The re-queued message is placed at the tail of the queue,
|
||||
* equivalent to having just published it, but for various implementation specific reasons that behavior should not
|
||||
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
|
||||
* behaves identically to an explicit REQ.
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
*
|
||||
* @internal
|
||||
* Re-queue a message (indicate failure to process)
|
||||
* The re-queued message is placed at the tail of the queue, equivalent to having just published it,
|
||||
* but for various implementation specific reasons that behavior should not be explicitly relied upon and may change in the future.
|
||||
* Similarly, a message that is in-flight and times out behaves identically to an explicit REQ.
|
||||
*/
|
||||
public function req(string $id, int $timeout): Promise
|
||||
public function req(string $id, int $timeout): void
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
}
|
||||
$this->command('REQ', [$id, $timeout]);
|
||||
|
||||
return call(function () use ($id, $timeout): \Generator {
|
||||
try {
|
||||
yield $this->write(Command::req($id, $timeout));
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
--$this->rdy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the timeout for an in-flight message.
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
public function touch(string $id): Promise
|
||||
public function touch(string $id): void
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
}
|
||||
|
||||
return call(function () use ($id): \Generator {
|
||||
try {
|
||||
yield $this->write(Command::touch($id));
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
$this->command('TOUCH', $id);
|
||||
}
|
||||
}
|
||||
|
@@ -1,99 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
/**
|
||||
* @psalm-immutable
|
||||
*/
|
||||
final class ErrorType
|
||||
{
|
||||
/**
|
||||
* A generic error type without any more hints.
|
||||
*/
|
||||
public const E_INVALID = true;
|
||||
/**
|
||||
* This error might be returned during multiple occasions. It can be returned for IDENTIFY, AUTH or MPUB messages.
|
||||
* It is caused for payloads that do not meet certain requirements. For IDENTIFY and AUTH, this is usually a bug in
|
||||
* the library and should be reported. For MPUB, this error can occur if the payload is larger than the maximum
|
||||
* payload size specified in the nsqd config.
|
||||
*/
|
||||
public const E_BAD_BODY = true;
|
||||
/**
|
||||
* This error indicates that the topic sent to nsqd is not valid.
|
||||
*/
|
||||
public const E_BAD_TOPIC = true;
|
||||
/**
|
||||
* This error indicates that the channel sent to nsqd is not valid.
|
||||
*/
|
||||
public const E_BAD_CHANNEL = true;
|
||||
/**
|
||||
* This error is returned by nsqd if the message in the payload of a publishing operation does not meet the
|
||||
* requirements of the server. This might be caused by too big payloads being sent to nsqd. You should consider
|
||||
* adding a limit to the payload size or increasing it in the nsqd config.
|
||||
*/
|
||||
public const E_BAD_MESSAGE = true;
|
||||
/**
|
||||
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
|
||||
* temporary error and can be caused by topics being added, deleted or cleared.
|
||||
*/
|
||||
public const E_PUB_FAILED = true;
|
||||
/**
|
||||
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
|
||||
* temporary error and can be caused by topics being added, deleted or cleared.
|
||||
*/
|
||||
public const E_MPUB_FAILED = true;
|
||||
/**
|
||||
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
|
||||
* temporary error and can be caused by topics being added, deleted or cleared.
|
||||
*/
|
||||
public const E_DPUB_FAILED = true;
|
||||
/**
|
||||
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
|
||||
* happen in particular for messages that are no longer queued on the server side.
|
||||
*/
|
||||
public const E_FIN_FAILED = false;
|
||||
/**
|
||||
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
|
||||
* happen in particular for messages that are no longer queued on the server side.
|
||||
*/
|
||||
public const E_REQ_FAILED = false;
|
||||
/**
|
||||
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
|
||||
* happen in particular for messages that are no longer queued on the server side.
|
||||
*/
|
||||
public const E_TOUCH_FAILED = false;
|
||||
/**
|
||||
* This error indicates that the authorization of the client failed on the server side. This might be related
|
||||
* to connection issues to the authorization server. Depending on the authorization server implementation, this
|
||||
* might also indicate that the given auth secret in the [ClientConfig] is not known on the server or the server
|
||||
* denied authentication with the current connection properties (i.e. TLS status and IP).
|
||||
*/
|
||||
public const E_AUTH_FAILED = true;
|
||||
/**
|
||||
* This error happens if something breaks on the nsqd side while performing the authorization. This might be
|
||||
* caused by bugs in nsqd, the authorization server or network issues.
|
||||
*/
|
||||
public const E_AUTH_ERROR = true;
|
||||
/**
|
||||
* This error is sent by nsqd if the client attempts an authentication, but the server does not support it. This
|
||||
* should never happen using this library as authorization requests are only sent if the server supports it.
|
||||
* It is safe to expect that this error is never thrown.
|
||||
*/
|
||||
public const E_AUTH_DISABLED = true;
|
||||
/**
|
||||
* This error indicates that the client related to the authorization secret set in the [ClientConfig] is not
|
||||
* allowed to do the operation it tried to do.
|
||||
*/
|
||||
public const E_UNAUTHORIZED = true;
|
||||
|
||||
public static function terminable(Frame\Error $error): bool
|
||||
{
|
||||
$type = explode(' ', $error->data)[0];
|
||||
|
||||
$constant = 'self::'.$type;
|
||||
|
||||
return \defined($constant) ? \constant($constant) : self::E_INVALID;
|
||||
}
|
||||
}
|
@@ -4,10 +4,8 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
final class AuthenticationRequired extends NsqException
|
||||
use RuntimeException;
|
||||
|
||||
final class AuthenticationRequired extends RuntimeException implements NsqException
|
||||
{
|
||||
public function __construct()
|
||||
{
|
||||
parent::__construct('NSQ requires authorization, set ClientConfig::$authSecret before connecting');
|
||||
}
|
||||
}
|
||||
|
19
src/Exception/ConnectionFail.php
Normal file
19
src/Exception/ConnectionFail.php
Normal file
@@ -0,0 +1,19 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use RuntimeException;
|
||||
use Throwable;
|
||||
|
||||
final class ConnectionFail extends RuntimeException implements NsqException
|
||||
{
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
public static function fromThrowable(Throwable $throwable): self
|
||||
{
|
||||
return new self($throwable->getMessage(), (int) $throwable->getCode(), $throwable);
|
||||
}
|
||||
}
|
@@ -1,15 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use Nsq\Frame\Response;
|
||||
|
||||
final class ConsumerException extends NsqException
|
||||
{
|
||||
public static function response(Response $response): self
|
||||
{
|
||||
return new self(sprintf('Consumer receive response "%s" from nsq, which not expected. ', $response->data));
|
||||
}
|
||||
}
|
@@ -1,18 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use Psr\Log\LogLevel;
|
||||
|
||||
final class LookupException extends NsqException
|
||||
{
|
||||
public function level(): string
|
||||
{
|
||||
return match ($this->getMessage()) {
|
||||
'TOPIC_NOT_FOUND' => LogLevel::DEBUG,
|
||||
default => LogLevel::WARNING,
|
||||
};
|
||||
}
|
||||
}
|
26
src/Exception/MessageAlreadyFinished.php
Normal file
26
src/Exception/MessageAlreadyFinished.php
Normal file
@@ -0,0 +1,26 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use Nsq\Message;
|
||||
use RuntimeException;
|
||||
|
||||
final class MessageAlreadyFinished extends RuntimeException implements NsqException
|
||||
{
|
||||
public static function finish(Message $message): self
|
||||
{
|
||||
return new self('Can\'t finish message as it already finished.');
|
||||
}
|
||||
|
||||
public static function requeue(Message $message): self
|
||||
{
|
||||
return new self('Can\'t requeue message as it already finished.');
|
||||
}
|
||||
|
||||
public static function touch(Message $message): self
|
||||
{
|
||||
return new self('Can\'t touch message as it already finished.');
|
||||
}
|
||||
}
|
@@ -1,15 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use Nsq\Message;
|
||||
|
||||
final class MessageException extends NsqException
|
||||
{
|
||||
public static function processed(Message $message): self
|
||||
{
|
||||
return new self(sprintf('Message "%s" already processed.', $message->id));
|
||||
}
|
||||
}
|
11
src/Exception/NsqError.php
Normal file
11
src/Exception/NsqError.php
Normal file
@@ -0,0 +1,11 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use RuntimeException;
|
||||
|
||||
final class NsqError extends RuntimeException implements NsqException
|
||||
{
|
||||
}
|
@@ -4,6 +4,8 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
class NsqException extends \RuntimeException
|
||||
use Throwable;
|
||||
|
||||
interface NsqException extends Throwable
|
||||
{
|
||||
}
|
||||
|
@@ -1,9 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
final class ServerException extends NsqException
|
||||
{
|
||||
}
|
@@ -1,18 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
final class SnappyException extends NsqException
|
||||
{
|
||||
public static function notInstalled(): self
|
||||
{
|
||||
return new self('Snappy extension not installed.');
|
||||
}
|
||||
|
||||
public static function invalidHeader(): self
|
||||
{
|
||||
return new self('Invalid snappy protocol header.');
|
||||
}
|
||||
}
|
@@ -1,9 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
final class StreamException extends NsqException
|
||||
{
|
||||
}
|
18
src/Exception/UnexpectedResponse.php
Normal file
18
src/Exception/UnexpectedResponse.php
Normal file
@@ -0,0 +1,18 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use RuntimeException;
|
||||
|
||||
final class UnexpectedResponse extends RuntimeException implements NsqException
|
||||
{
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
public static function null(): self
|
||||
{
|
||||
return new self('Response was expected, but null received.');
|
||||
}
|
||||
}
|
@@ -1,31 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
abstract class Frame
|
||||
{
|
||||
public const TYPE_RESPONSE = 0;
|
||||
public const TYPE_ERROR = 1;
|
||||
public const TYPE_MESSAGE = 2;
|
||||
|
||||
public function __construct(public int $type)
|
||||
{
|
||||
}
|
||||
|
||||
public function response(): bool
|
||||
{
|
||||
return self::TYPE_RESPONSE === $this->type;
|
||||
}
|
||||
|
||||
public function error(): bool
|
||||
{
|
||||
return self::TYPE_ERROR === $this->type;
|
||||
}
|
||||
|
||||
public function message(): bool
|
||||
{
|
||||
return self::TYPE_MESSAGE === $this->type;
|
||||
}
|
||||
}
|
@@ -1,24 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Frame;
|
||||
|
||||
use Nsq\Exception\ServerException;
|
||||
use Nsq\Frame;
|
||||
|
||||
/**
|
||||
* @psalm-immutable
|
||||
*/
|
||||
final class Error extends Frame
|
||||
{
|
||||
public function __construct(public string $data)
|
||||
{
|
||||
parent::__construct(self::TYPE_ERROR);
|
||||
}
|
||||
|
||||
public function toException(): ServerException
|
||||
{
|
||||
return new ServerException($this->data);
|
||||
}
|
||||
}
|
@@ -1,19 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Frame;
|
||||
|
||||
use Nsq\Frame;
|
||||
|
||||
final class Message extends Frame
|
||||
{
|
||||
public function __construct(
|
||||
public int $timestamp,
|
||||
public int $attempts,
|
||||
public string $id,
|
||||
public string $body,
|
||||
) {
|
||||
parent::__construct(self::TYPE_MESSAGE);
|
||||
}
|
||||
}
|
@@ -1,39 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Frame;
|
||||
|
||||
use Nsq\Frame;
|
||||
|
||||
/**
|
||||
* @psalm-immutable
|
||||
*/
|
||||
final class Response extends Frame
|
||||
{
|
||||
public const OK = 'OK';
|
||||
public const HEARTBEAT = '_heartbeat_';
|
||||
|
||||
public function __construct(public string $data)
|
||||
{
|
||||
parent::__construct(self::TYPE_RESPONSE);
|
||||
}
|
||||
|
||||
public function isOk(): bool
|
||||
{
|
||||
return self::OK === $this->data;
|
||||
}
|
||||
|
||||
public function isHeartBeat(): bool
|
||||
{
|
||||
return self::HEARTBEAT === $this->data;
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return array<mixed, mixed>
|
||||
*/
|
||||
public function toArray(): array
|
||||
{
|
||||
return json_decode($this->data, true, flags: JSON_THROW_ON_ERROR);
|
||||
}
|
||||
}
|
274
src/Lookup.php
274
src/Lookup.php
@@ -1,274 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\Deferred;
|
||||
use Amp\Dns\DnsException;
|
||||
use Amp\Http\Client\DelegateHttpClient;
|
||||
use Amp\Http\Client\HttpClientBuilder;
|
||||
use Amp\Http\Client\Request;
|
||||
use Amp\Http\Client\Response;
|
||||
use Amp\NullCancellationToken;
|
||||
use Amp\Promise;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Config\LookupConfig;
|
||||
use Nsq\Exception\LookupException;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\call;
|
||||
use function Amp\delay;
|
||||
|
||||
final class Lookup
|
||||
{
|
||||
/**
|
||||
* @psalm-var array<string, array<string, Lookup\Producer>>
|
||||
*/
|
||||
private array $producers = [];
|
||||
|
||||
private array $consumers = [];
|
||||
|
||||
private array $running = [];
|
||||
|
||||
private array $topicWatchers = [];
|
||||
|
||||
public function __construct(
|
||||
private array $addresses,
|
||||
private LookupConfig $config,
|
||||
private LoggerInterface $logger,
|
||||
private DelegateHttpClient $httpClient,
|
||||
) {
|
||||
}
|
||||
|
||||
public static function create(
|
||||
string | array $address,
|
||||
LookupConfig $config = null,
|
||||
LoggerInterface $logger = null,
|
||||
DelegateHttpClient $httpClient = null,
|
||||
): self {
|
||||
return new self(
|
||||
(array) $address,
|
||||
$config ?? new LookupConfig(),
|
||||
$logger ?? new NullLogger(),
|
||||
$httpClient ?? HttpClientBuilder::buildDefault(),
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<Lookup\Producer[]>
|
||||
*/
|
||||
public function nodes(): Promise
|
||||
{
|
||||
return call(function (): \Generator {
|
||||
$requestHandler = function (string $uri): \Generator {
|
||||
/** @var Response $response */
|
||||
$response = yield $this->httpClient->request(new Request($uri.'/nodes'), new NullCancellationToken());
|
||||
|
||||
try {
|
||||
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
|
||||
} catch (LookupException $e) {
|
||||
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
|
||||
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
$promises = [];
|
||||
foreach ($this->addresses as $address) {
|
||||
$promises[$address] = call($requestHandler, $address);
|
||||
}
|
||||
|
||||
$nodes = [];
|
||||
/** @var Lookup\Response $response */
|
||||
foreach (yield $promises as $response) {
|
||||
foreach ($response->producers as $producer) {
|
||||
$nodes[$producer->toTcpUri()] = $producer;
|
||||
}
|
||||
}
|
||||
|
||||
return array_values($nodes);
|
||||
});
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
$this->producers = [];
|
||||
$this->consumers = [];
|
||||
$this->running = [];
|
||||
$this->topicWatchers = [];
|
||||
|
||||
$this->logger->info('Lookup stopped.');
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-suppress InvalidPropertyAssignmentValue
|
||||
*/
|
||||
public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void
|
||||
{
|
||||
if (null !== ($this->running[$topic][$channel] ?? null)) {
|
||||
throw new \InvalidArgumentException('Subscription already exists.');
|
||||
}
|
||||
|
||||
$this->running[$topic][$channel] = true;
|
||||
|
||||
asyncCall(function () use ($topic, $channel, $onMessage, $config): \Generator {
|
||||
while (true) {
|
||||
if (null === ($this->running[$topic][$channel] ?? null)) {
|
||||
return;
|
||||
}
|
||||
|
||||
/** @phpstan-ignore-next-line */
|
||||
$producers = $this->producers[$topic] ??= new Deferred();
|
||||
|
||||
if ($producers instanceof Deferred) {
|
||||
/** @var array<string, Lookup\Producer> $producers */
|
||||
$producers = yield $producers->promise();
|
||||
}
|
||||
|
||||
foreach (array_diff_key($this->consumers, $producers) as $address => $producer) {
|
||||
unset($this->consumers[$address]);
|
||||
}
|
||||
|
||||
foreach ($producers as $address => $producer) {
|
||||
if (null !== ($this->consumers[$address][$topic][$channel] ?? null)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->keepConnection(
|
||||
Consumer::create(
|
||||
$address,
|
||||
$topic,
|
||||
$channel,
|
||||
$onMessage,
|
||||
$config,
|
||||
$this->logger,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
yield delay($this->config->pollingInterval);
|
||||
}
|
||||
});
|
||||
|
||||
$this->watch($topic);
|
||||
|
||||
$this->logger->info('Subscribed.', compact('topic', 'channel'));
|
||||
}
|
||||
|
||||
public function unsubscribe(string $topic, string $channel): void
|
||||
{
|
||||
if (null === ($this->running[$topic][$channel] ?? null)) {
|
||||
$this->logger->debug('Not subscribed.', compact('topic', 'channel'));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
unset($this->running[$topic][$channel]);
|
||||
|
||||
if ([] === $this->running[$topic]) {
|
||||
unset($this->running[$topic]);
|
||||
}
|
||||
|
||||
$this->logger->info('Unsubscribed.', compact('topic', 'channel'));
|
||||
}
|
||||
|
||||
private function keepConnection(Consumer $consumer): void
|
||||
{
|
||||
$this->consumers[$consumer->address][$consumer->topic][$consumer->channel] = $consumer;
|
||||
|
||||
asyncCall(function () use ($consumer): \Generator {
|
||||
while (null !== ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
|
||||
try {
|
||||
yield $consumer->connect();
|
||||
} catch (DnsException $e) {
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
unset(
|
||||
$this->consumers[$consumer->address],
|
||||
$this->producers[$consumer->topic][$consumer->address],
|
||||
);
|
||||
|
||||
return;
|
||||
} catch (\Throwable $e) {
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
yield delay($this->config->pollingInterval);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
/** @phpstan-ignore-next-line */
|
||||
if (null === ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
|
||||
$consumer->close();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (!$consumer->isConnected()) {
|
||||
break;
|
||||
}
|
||||
|
||||
yield delay(500);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private function watch(string $topic): void
|
||||
{
|
||||
if (\array_key_exists($topic, $this->topicWatchers)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->topicWatchers[$topic] = true;
|
||||
|
||||
asyncCall(function () use ($topic): \Generator {
|
||||
$cancellationToken = new NullCancellationToken();
|
||||
$requestHandler = function (string $uri) use ($topic, $cancellationToken): \Generator {
|
||||
$this->logger->debug('Lookup', compact('topic'));
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->httpClient->request(new Request($uri.'/lookup?topic='.$topic), $cancellationToken);
|
||||
|
||||
try {
|
||||
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
|
||||
} catch (LookupException $e) {
|
||||
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
|
||||
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
while (\array_key_exists($topic, $this->running)) {
|
||||
$promises = [];
|
||||
foreach ($this->addresses as $address) {
|
||||
$promises[$address] = call($requestHandler, $address);
|
||||
}
|
||||
|
||||
/** @var Lookup\Response[] $responses */
|
||||
$responses = yield $promises;
|
||||
|
||||
$producers = [];
|
||||
foreach ($responses as $response) {
|
||||
foreach ($response->producers as $producer) {
|
||||
$producers[$producer->toTcpUri()] = $producer;
|
||||
}
|
||||
}
|
||||
|
||||
/** @phpstan-ignore-next-line */
|
||||
if (($deferred = ($this->producers[$topic] ?? null)) instanceof Deferred) {
|
||||
$deferred->resolve($producers);
|
||||
}
|
||||
$this->producers[$topic] = $producers;
|
||||
|
||||
yield delay($this->config->pollingInterval);
|
||||
}
|
||||
|
||||
unset($this->topicWatchers[$topic]);
|
||||
});
|
||||
}
|
||||
}
|
@@ -1,39 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Lookup;
|
||||
|
||||
final class Producer
|
||||
{
|
||||
public function __construct(
|
||||
public string $broadcastAddress,
|
||||
public string $hostname,
|
||||
public string $remoteAddress,
|
||||
public int $tcpPort,
|
||||
public int $httpPort,
|
||||
public string $version,
|
||||
public array $tombstones,
|
||||
public array $topics,
|
||||
) {
|
||||
}
|
||||
|
||||
public static function fromArray(array $array): self
|
||||
{
|
||||
return new self(
|
||||
$array['broadcast_address'],
|
||||
$array['hostname'],
|
||||
$array['remote_address'],
|
||||
$array['tcp_port'],
|
||||
$array['http_port'],
|
||||
$array['version'],
|
||||
$array['tombstones'] ?? [],
|
||||
$array['topics'] ?? [],
|
||||
);
|
||||
}
|
||||
|
||||
public function toTcpUri(): string
|
||||
{
|
||||
return sprintf('%s:%s', $this->broadcastAddress, $this->tcpPort);
|
||||
}
|
||||
}
|
@@ -1,34 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Lookup;
|
||||
|
||||
use Nsq\Exception\LookupException;
|
||||
|
||||
final class Response
|
||||
{
|
||||
/**
|
||||
* @param string[] $channels
|
||||
* @param Producer[] $producers
|
||||
*/
|
||||
public function __construct(
|
||||
public array $channels,
|
||||
public array $producers,
|
||||
) {
|
||||
}
|
||||
|
||||
public static function fromJson(string $json): self
|
||||
{
|
||||
$array = json_decode($json, true, flags: JSON_THROW_ON_ERROR);
|
||||
|
||||
if (\array_key_exists('message', $array)) {
|
||||
throw new LookupException($array['message']);
|
||||
}
|
||||
|
||||
return new self(
|
||||
$array['channels'] ?? [],
|
||||
array_map([Producer::class, 'fromArray'], $array['producers']),
|
||||
);
|
||||
}
|
||||
}
|
103
src/Message.php
103
src/Message.php
@@ -4,78 +4,75 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\Promise;
|
||||
use Nsq\Exception\MessageException;
|
||||
use Nsq\Exception\MessageAlreadyFinished;
|
||||
|
||||
final class Message
|
||||
{
|
||||
private bool $processed = false;
|
||||
|
||||
public function __construct(
|
||||
public string $id,
|
||||
public string $body,
|
||||
public int $timestamp,
|
||||
public int $attempts,
|
||||
private Consumer $consumer,
|
||||
) {
|
||||
}
|
||||
|
||||
public static function compose(Frame\Message $message, Consumer $consumer): self
|
||||
{
|
||||
return new self(
|
||||
$message->id,
|
||||
$message->body,
|
||||
$message->timestamp,
|
||||
$message->attempts,
|
||||
$consumer,
|
||||
);
|
||||
}
|
||||
|
||||
public function isProcessed(): bool
|
||||
{
|
||||
return $this->processed;
|
||||
}
|
||||
/**
|
||||
* @psalm-readonly
|
||||
*/
|
||||
public int $timestamp;
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<bool>
|
||||
* @psalm-readonly
|
||||
*/
|
||||
public function finish(): Promise
|
||||
{
|
||||
$this->markAsProcessedOrFail();
|
||||
|
||||
return $this->consumer->fin($this->id);
|
||||
}
|
||||
public int $attempts;
|
||||
|
||||
/**
|
||||
* @psalm-param positive-int|0 $timeout
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
* @psalm-readonly
|
||||
*/
|
||||
public function requeue(int $timeout): Promise
|
||||
{
|
||||
$this->markAsProcessedOrFail();
|
||||
|
||||
return $this->consumer->req($this->id, $timeout);
|
||||
}
|
||||
public string $id;
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<bool>
|
||||
* @psalm-readonly
|
||||
*/
|
||||
public function touch(): Promise
|
||||
public string $body;
|
||||
|
||||
private bool $finished = false;
|
||||
|
||||
private Consumer $consumer;
|
||||
|
||||
public function __construct(int $timestamp, int $attempts, string $id, string $body, Consumer $consumer)
|
||||
{
|
||||
if ($this->processed) {
|
||||
throw MessageException::processed($this);
|
||||
$this->timestamp = $timestamp;
|
||||
$this->attempts = $attempts;
|
||||
$this->id = $id;
|
||||
$this->body = $body;
|
||||
|
||||
$this->consumer = $consumer;
|
||||
}
|
||||
|
||||
public function isFinished(): bool
|
||||
{
|
||||
return $this->finished;
|
||||
}
|
||||
|
||||
public function finish(): void
|
||||
{
|
||||
if ($this->finished) {
|
||||
throw MessageAlreadyFinished::finish($this);
|
||||
}
|
||||
|
||||
return $this->consumer->touch($this->id);
|
||||
$this->consumer->fin($this->id);
|
||||
$this->finished = true;
|
||||
}
|
||||
|
||||
private function markAsProcessedOrFail(): void
|
||||
public function requeue(int $timeout): void
|
||||
{
|
||||
if ($this->processed) {
|
||||
throw MessageException::processed($this);
|
||||
if ($this->finished) {
|
||||
throw MessageAlreadyFinished::requeue($this);
|
||||
}
|
||||
|
||||
$this->processed = true;
|
||||
$this->consumer->req($this->id, $timeout);
|
||||
$this->finished = true;
|
||||
}
|
||||
|
||||
public function touch(): void
|
||||
{
|
||||
if ($this->finished) {
|
||||
throw MessageAlreadyFinished::touch($this);
|
||||
}
|
||||
|
||||
$this->consumer->touch($this->id);
|
||||
}
|
||||
}
|
||||
|
@@ -1,47 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Nsq\Exception\NsqException;
|
||||
|
||||
class Parser
|
||||
{
|
||||
private const SIZE = 4;
|
||||
private const TYPE = 4;
|
||||
private const MESSAGE_HEADER_SIZE =
|
||||
8 + // timestamp
|
||||
2 + // attempts
|
||||
16 + // ID
|
||||
4; // Frame type
|
||||
|
||||
public static function parse(Buffer $buffer): ?Frame
|
||||
{
|
||||
if ($buffer->size() < self::SIZE) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$size = $buffer->readInt32();
|
||||
|
||||
if ($buffer->size() < $size + self::SIZE) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$buffer->discard(self::SIZE);
|
||||
|
||||
$type = $buffer->consumeInt32();
|
||||
|
||||
return match ($type) {
|
||||
Frame::TYPE_RESPONSE => new Frame\Response($buffer->consume($size - self::TYPE)),
|
||||
Frame::TYPE_ERROR => new Frame\Error($buffer->consume($size - self::TYPE)),
|
||||
Frame::TYPE_MESSAGE => new Frame\Message(
|
||||
timestamp: $buffer->consumeTimestamp(),
|
||||
attempts: $buffer->consumeAttempts(),
|
||||
id: $buffer->consumeMessageID(),
|
||||
body: $buffer->consume($size - self::MESSAGE_HEADER_SIZE),
|
||||
),
|
||||
default => throw new NsqException(sprintf('Unexpected frame type: "%s"', $type)),
|
||||
};
|
||||
}
|
||||
}
|
134
src/Producer.php
134
src/Producer.php
@@ -4,127 +4,41 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Exception\NsqException;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\call;
|
||||
use function array_map;
|
||||
use function implode;
|
||||
use function pack;
|
||||
|
||||
final class Producer extends Connection
|
||||
{
|
||||
public function __construct(
|
||||
string $address,
|
||||
ClientConfig $clientConfig,
|
||||
LoggerInterface $logger,
|
||||
) {
|
||||
parent::__construct(
|
||||
$address,
|
||||
$clientConfig,
|
||||
$logger,
|
||||
);
|
||||
|
||||
$context = compact('address');
|
||||
$this->onConnect(function () use ($context): void {
|
||||
$this->logger->debug('Producer connected.', $context);
|
||||
});
|
||||
$this->onClose(function () use ($context): void {
|
||||
$this->logger->debug('Producer disconnected.', $context);
|
||||
});
|
||||
$this->logger->debug('Producer created.', $context);
|
||||
}
|
||||
|
||||
public static function create(
|
||||
string $address,
|
||||
ClientConfig $clientConfig = null,
|
||||
LoggerInterface $logger = null,
|
||||
): self {
|
||||
return new self(
|
||||
$address,
|
||||
$clientConfig ?? new ClientConfig(),
|
||||
$logger ?? new NullLogger(),
|
||||
);
|
||||
}
|
||||
|
||||
public function connect(): Promise
|
||||
/**
|
||||
* @psalm-suppress PossiblyFalseOperand
|
||||
*/
|
||||
public function pub(string $topic, string $body): void
|
||||
{
|
||||
if ($this->isConnected()) {
|
||||
return call(static function (): void {
|
||||
});
|
||||
}
|
||||
|
||||
return call(function (): \Generator {
|
||||
yield parent::connect();
|
||||
|
||||
$buffer = new Buffer();
|
||||
|
||||
asyncCall(function () use ($buffer): \Generator {
|
||||
while (null !== $chunk = yield $this->read()) {
|
||||
$buffer->append($chunk);
|
||||
|
||||
while ($frame = Parser::parse($buffer)) {
|
||||
switch (true) {
|
||||
case $frame instanceof Frame\Response:
|
||||
if ($frame->isHeartBeat()) {
|
||||
yield $this->write(Command::nop());
|
||||
}
|
||||
|
||||
// Ok received
|
||||
break;
|
||||
case $frame instanceof Frame\Error:
|
||||
$this->handleError($frame);
|
||||
|
||||
break;
|
||||
default:
|
||||
throw new NsqException('Unreachable statement.');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$this->close(false);
|
||||
});
|
||||
});
|
||||
$this->command('PUB', $topic, $body)->response()->okOrFail();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, string>|string $body
|
||||
* @psalm-param array<mixed, mixed> $bodies
|
||||
*
|
||||
* @psalm-param positive-int|0 $delay
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
* @psalm-suppress PossiblyFalseOperand
|
||||
*/
|
||||
public function publish(string $topic, string | array $body, int $delay = null): Promise
|
||||
public function mpub(string $topic, array $bodies): void
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
}
|
||||
$num = pack('N', \count($bodies));
|
||||
|
||||
return call(
|
||||
function (iterable $commands): \Generator {
|
||||
try {
|
||||
foreach ($commands as $command) {
|
||||
yield $this->write($command);
|
||||
}
|
||||
$mb = implode('', array_map(static function ($body): string {
|
||||
return pack('N', \strlen($body)).$body;
|
||||
}, $bodies));
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
},
|
||||
(static function () use ($topic, $body, $delay): \Generator {
|
||||
if (\is_array($body) && null === $delay) {
|
||||
yield Command::mpub($topic, $body);
|
||||
} elseif (null !== $delay) {
|
||||
foreach ((array) $body as $content) {
|
||||
yield Command::dpub($topic, $content, $delay);
|
||||
}
|
||||
} else {
|
||||
yield Command::pub($topic, $body);
|
||||
}
|
||||
})(),
|
||||
);
|
||||
$this->command('MPUB', $topic, $num.$mb)->response()->okOrFail();
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-suppress PossiblyFalseOperand
|
||||
*/
|
||||
public function dpub(string $topic, string $body, int $delay): void
|
||||
{
|
||||
$this->command('DPUB', [$topic, $delay], $body)->response()->okOrFail();
|
||||
}
|
||||
}
|
||||
|
63
src/Reconnect/ExponentialStrategy.php
Normal file
63
src/Reconnect/ExponentialStrategy.php
Normal file
@@ -0,0 +1,63 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Reconnect;
|
||||
|
||||
use Nsq\Exception\ConnectionFail;
|
||||
use Psr\Log\LoggerAwareTrait;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
use Throwable;
|
||||
|
||||
final class ExponentialStrategy implements ReconnectStrategy
|
||||
{
|
||||
use LoggerAwareTrait;
|
||||
|
||||
private int $delay;
|
||||
|
||||
private int $nextTryAfter;
|
||||
|
||||
private int $attempt = 0;
|
||||
|
||||
private TimeProvider $timeProvider;
|
||||
|
||||
public function __construct(
|
||||
private int $minDelay = 8,
|
||||
private int $maxDelay = 32,
|
||||
TimeProvider $timeProvider = null,
|
||||
LoggerInterface $logger = null,
|
||||
) {
|
||||
$this->delay = 0;
|
||||
$this->timeProvider = $timeProvider ?? new RealTimeProvider();
|
||||
$this->nextTryAfter = $this->timeProvider->time();
|
||||
$this->logger = $logger ?? new NullLogger();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public function connect(callable $callable): void
|
||||
{
|
||||
$currentTime = $this->timeProvider->time();
|
||||
|
||||
if ($currentTime < $this->nextTryAfter) {
|
||||
throw new ConnectionFail('Time to reconnect has not yet come');
|
||||
}
|
||||
|
||||
try {
|
||||
$callable();
|
||||
} catch (Throwable $e) {
|
||||
$nextDelay = 0 === $this->delay ? $this->minDelay : $this->delay * 2;
|
||||
$this->delay = $nextDelay > $this->maxDelay ? $this->maxDelay : $nextDelay;
|
||||
$this->nextTryAfter = $currentTime + $this->delay;
|
||||
|
||||
$this->logger->warning('Reconnect #{attempt} after {delay}s', ['attempt' => ++$this->attempt, 'delay' => $this->delay]);
|
||||
|
||||
throw $e;
|
||||
}
|
||||
|
||||
$this->delay = 0;
|
||||
$this->attempt = 0;
|
||||
}
|
||||
}
|
13
src/Reconnect/RealTimeProvider.php
Normal file
13
src/Reconnect/RealTimeProvider.php
Normal file
@@ -0,0 +1,13 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Reconnect;
|
||||
|
||||
final class RealTimeProvider implements TimeProvider
|
||||
{
|
||||
public function time(): int
|
||||
{
|
||||
return time();
|
||||
}
|
||||
}
|
15
src/Reconnect/ReconnectStrategy.php
Normal file
15
src/Reconnect/ReconnectStrategy.php
Normal file
@@ -0,0 +1,15 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Reconnect;
|
||||
|
||||
use Nsq\Exception\ConnectionFail;
|
||||
|
||||
interface ReconnectStrategy
|
||||
{
|
||||
/**
|
||||
* @throws ConnectionFail
|
||||
*/
|
||||
public function connect(callable $callable): void;
|
||||
}
|
10
src/Reconnect/TimeProvider.php
Normal file
10
src/Reconnect/TimeProvider.php
Normal file
@@ -0,0 +1,10 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Reconnect;
|
||||
|
||||
interface TimeProvider
|
||||
{
|
||||
public function time(): int;
|
||||
}
|
87
src/Response.php
Normal file
87
src/Response.php
Normal file
@@ -0,0 +1,87 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Nsq\Exception\NsqError;
|
||||
use Nsq\Exception\UnexpectedResponse;
|
||||
use PHPinnacle\Buffer\ByteBuffer;
|
||||
use function json_decode;
|
||||
use function sprintf;
|
||||
use const JSON_THROW_ON_ERROR;
|
||||
|
||||
final class Response
|
||||
{
|
||||
private const OK = 'OK';
|
||||
private const HEARTBEAT = '_heartbeat_';
|
||||
private const TYPE_RESPONSE = 0;
|
||||
private const TYPE_ERROR = 1;
|
||||
private const TYPE_MESSAGE = 2;
|
||||
|
||||
private int $type;
|
||||
|
||||
private ByteBuffer $buffer;
|
||||
|
||||
public function __construct(ByteBuffer $buffer)
|
||||
{
|
||||
$this->type = $buffer->consumeUint32();
|
||||
$this->buffer = $buffer;
|
||||
}
|
||||
|
||||
public function okOrFail(): void
|
||||
{
|
||||
if (self::TYPE_ERROR === $this->type) {
|
||||
throw new NsqError($this->buffer->bytes());
|
||||
}
|
||||
|
||||
if (self::TYPE_RESPONSE !== $this->type) {
|
||||
// @codeCoverageIgnoreStart
|
||||
throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type));
|
||||
// @codeCoverageIgnoreEnd
|
||||
}
|
||||
|
||||
if (self::OK !== $this->buffer->bytes()) {
|
||||
// @codeCoverageIgnoreStart
|
||||
throw new UnexpectedResponse(sprintf('OK response expected, but "%s" received.', $this->buffer->bytes()));
|
||||
// @codeCoverageIgnoreEnd
|
||||
}
|
||||
}
|
||||
|
||||
public function isHeartBeat(): bool
|
||||
{
|
||||
return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes();
|
||||
}
|
||||
|
||||
/**
|
||||
* @phpstan-ignore-next-line
|
||||
*/
|
||||
public function toArray(): array
|
||||
{
|
||||
if (self::TYPE_RESPONSE !== $this->type) {
|
||||
// @codeCoverageIgnoreStart
|
||||
throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type));
|
||||
// @codeCoverageIgnoreEnd
|
||||
}
|
||||
|
||||
return json_decode($this->buffer->bytes(), true, flags: JSON_THROW_ON_ERROR);
|
||||
}
|
||||
|
||||
public function toMessage(Consumer $reader): Message
|
||||
{
|
||||
if (self::TYPE_MESSAGE !== $this->type) {
|
||||
// @codeCoverageIgnoreStart
|
||||
throw new UnexpectedResponse(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type));
|
||||
// @codeCoverageIgnoreEnd
|
||||
}
|
||||
|
||||
$buffer = new ByteBuffer($this->buffer->bytes());
|
||||
|
||||
$timestamp = $buffer->consumeInt64();
|
||||
$attempts = $buffer->consumeUint16();
|
||||
$id = $buffer->consume(Bytes::BYTES_ID);
|
||||
$body = $buffer->flush();
|
||||
|
||||
return new Message($timestamp, $attempts, $id, $body, $reader);
|
||||
}
|
||||
}
|
@@ -1,26 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\ByteStream\ClosedException;
|
||||
use Amp\Promise;
|
||||
|
||||
interface Stream
|
||||
{
|
||||
/**
|
||||
* @psalm-return Promise<null|string>
|
||||
*/
|
||||
public function read(): Promise;
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
*/
|
||||
public function write(string $data): Promise;
|
||||
|
||||
/**
|
||||
* @throws ClosedException
|
||||
*/
|
||||
public function close(): void;
|
||||
}
|
@@ -1,104 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Stream;
|
||||
|
||||
use Amp\Promise;
|
||||
use Nsq\Buffer;
|
||||
use Nsq\Exception\StreamException;
|
||||
use Nsq\Stream;
|
||||
|
||||
use function Amp\call;
|
||||
|
||||
class GzipStream implements Stream
|
||||
{
|
||||
private ?\InflateContext $inflate = null;
|
||||
|
||||
private ?\DeflateContext $deflate = null;
|
||||
|
||||
private Buffer $buffer;
|
||||
|
||||
public function __construct(private Stream $stream, private int $level, string $bytes = '')
|
||||
{
|
||||
/** @var false|\InflateContext $inflate */
|
||||
$inflate = @inflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
|
||||
/** @var \DeflateContext|false $deflate */
|
||||
$deflate = @deflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
|
||||
|
||||
if (false === $inflate) {
|
||||
throw new StreamException('Failed initializing inflate context');
|
||||
}
|
||||
|
||||
if (false === $deflate) {
|
||||
throw new StreamException('Failed initializing deflate context');
|
||||
}
|
||||
|
||||
$this->inflate = $inflate;
|
||||
$this->deflate = $deflate;
|
||||
$this->buffer = new Buffer($bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function read(): Promise
|
||||
{
|
||||
return call(function (): \Generator {
|
||||
if (null === $this->inflate) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if ($this->buffer->empty()) {
|
||||
$chunk = yield $this->stream->read();
|
||||
|
||||
if (null !== $chunk) {
|
||||
$this->buffer->append($chunk);
|
||||
}
|
||||
}
|
||||
|
||||
$data = $this->buffer->flush();
|
||||
|
||||
if ('' === $data) {
|
||||
return null;
|
||||
}
|
||||
/** @psalm-suppress UndefinedFunction,InvalidArgument */
|
||||
$decompressed = inflate_add($this->inflate, $data, ZLIB_SYNC_FLUSH);
|
||||
|
||||
if (false === $decompressed) {
|
||||
throw new StreamException('Failed adding data to deflate context');
|
||||
}
|
||||
|
||||
return $decompressed;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function write(string $data): Promise
|
||||
{
|
||||
if (null === $this->deflate) {
|
||||
throw new StreamException('The stream has already been closed');
|
||||
}
|
||||
|
||||
/** @psalm-suppress UndefinedFunction,InvalidArgument */
|
||||
$compressed = deflate_add($this->deflate, $data, ZLIB_SYNC_FLUSH);
|
||||
|
||||
if (false === $compressed) {
|
||||
throw new StreamException('Failed adding data to deflate context');
|
||||
}
|
||||
|
||||
return $this->stream->write($compressed);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public function close(): void
|
||||
{
|
||||
$this->stream->close();
|
||||
$this->inflate = null;
|
||||
$this->deflate = null;
|
||||
}
|
||||
}
|
@@ -1,37 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Stream;
|
||||
|
||||
use Amp\Failure;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use Nsq\Exception\NsqException;
|
||||
use Nsq\Stream;
|
||||
|
||||
final class NullStream implements Stream
|
||||
{
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function read(): Promise
|
||||
{
|
||||
return new Success(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function write(string $data): Promise
|
||||
{
|
||||
return new Failure(new NsqException('Connection closed.'));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function close(): void
|
||||
{
|
||||
}
|
||||
}
|
@@ -1,122 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Stream;
|
||||
|
||||
use Amp\Promise;
|
||||
use Nsq\Buffer;
|
||||
use Nsq\Exception\SnappyException;
|
||||
use Nsq\Stream;
|
||||
|
||||
use function Amp\call;
|
||||
|
||||
class SnappyStream implements Stream
|
||||
{
|
||||
private const IDENTIFIER = [0xFF, 0x06, 0x00, 0x00, 0x73, 0x4E, 0x61, 0x50, 0x70, 0x59];
|
||||
private const SIZE_HEADER = 4;
|
||||
private const SIZE_CHECKSUM = 4;
|
||||
private const SIZE_CHUNK = 65536;
|
||||
private const TYPE_IDENTIFIER = 0xFF;
|
||||
private const TYPE_COMPRESSED = 0x00;
|
||||
private const TYPE_UNCOMPRESSED = 0x01;
|
||||
private const TYPE_PADDING = 0xFE;
|
||||
|
||||
private Buffer $buffer;
|
||||
|
||||
public function __construct(private Stream $stream, string $bytes = '')
|
||||
{
|
||||
if (!\function_exists('snappy_uncompress') || !\function_exists('snappy_compress')) {
|
||||
throw SnappyException::notInstalled();
|
||||
}
|
||||
|
||||
$this->buffer = new Buffer($bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function read(): Promise
|
||||
{
|
||||
return call(function (): \Generator {
|
||||
if ($this->buffer->size() < self::SIZE_HEADER && null !== ($chunk = yield $this->stream->read())) {
|
||||
$this->buffer->append($chunk);
|
||||
}
|
||||
|
||||
$type = $this->buffer->readUInt32LE();
|
||||
|
||||
$size = $type >> 8;
|
||||
$type &= 0xFF;
|
||||
|
||||
while ($this->buffer->size() < $size && null !== ($chunk = yield $this->stream->read())) {
|
||||
$this->buffer->append($chunk);
|
||||
}
|
||||
|
||||
switch ($type) {
|
||||
case self::TYPE_IDENTIFIER:
|
||||
$this->buffer->discard($size);
|
||||
|
||||
return $this->read();
|
||||
case self::TYPE_COMPRESSED:
|
||||
$this->buffer->discard(self::SIZE_CHECKSUM);
|
||||
|
||||
/** @psalm-suppress UndefinedFunction */
|
||||
return snappy_uncompress($this->buffer->consume($size - self::SIZE_HEADER));
|
||||
case self::TYPE_UNCOMPRESSED:
|
||||
$this->buffer->discard(self::SIZE_CHECKSUM);
|
||||
|
||||
return $this->buffer->consume($size - self::SIZE_HEADER);
|
||||
case self::TYPE_PADDING:
|
||||
return $this->read();
|
||||
default:
|
||||
throw SnappyException::invalidHeader();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function write(string $data): Promise
|
||||
{
|
||||
return call(function () use ($data): Promise {
|
||||
/** @var string $result */
|
||||
$result = pack('CCCCCCCCCC', ...self::IDENTIFIER);
|
||||
|
||||
foreach (str_split($data, self::SIZE_CHUNK) as $chunk) {
|
||||
$result .= $this->compress($chunk);
|
||||
}
|
||||
|
||||
return $this->stream->write($result);
|
||||
});
|
||||
}
|
||||
|
||||
public function close(): void
|
||||
{
|
||||
$this->stream->close();
|
||||
}
|
||||
|
||||
private function compress(string $uncompressed): string
|
||||
{
|
||||
/** @psalm-suppress UndefinedFunction */
|
||||
$compressed = snappy_compress($uncompressed);
|
||||
|
||||
\assert(\is_string($compressed));
|
||||
|
||||
[$type, $data] = \strlen($compressed) <= 0.875 * \strlen($uncompressed)
|
||||
? [self::TYPE_COMPRESSED, $compressed]
|
||||
: [self::TYPE_UNCOMPRESSED, $uncompressed];
|
||||
|
||||
/** @psalm-suppress PossiblyFalseArgument */
|
||||
$unpacked = unpack('N', hash('crc32c', $uncompressed, true));
|
||||
\assert(\is_array($unpacked));
|
||||
|
||||
$checksum = $unpacked[1];
|
||||
$checksum = (($checksum >> 15) | ($checksum << 17)) + 0xA282EAD8 & 0xFFFFFFFF;
|
||||
|
||||
$size = (\strlen($data) + 4) << 8;
|
||||
|
||||
/** @psalm-suppress PossiblyFalseOperand */
|
||||
return pack('VV', $type + $size, $checksum).$data;
|
||||
}
|
||||
}
|
@@ -1,82 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Stream;
|
||||
|
||||
use Amp\Promise;
|
||||
use Amp\Socket\ClientTlsContext;
|
||||
use Amp\Socket\ConnectContext;
|
||||
use Amp\Socket\EncryptableSocket;
|
||||
use Nsq\Stream;
|
||||
|
||||
use function Amp\call;
|
||||
use function Amp\Socket\connect;
|
||||
|
||||
class SocketStream implements Stream
|
||||
{
|
||||
public function __construct(private EncryptableSocket $socket)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<self>
|
||||
*/
|
||||
public static function connect(string $uri, int $timeout = 0, int $attempts = 0, bool $noDelay = false): Promise
|
||||
{
|
||||
return call(function () use ($uri, $timeout, $attempts, $noDelay): \Generator {
|
||||
$context = new ConnectContext();
|
||||
|
||||
if ($timeout > 0) {
|
||||
$context = $context->withConnectTimeout($timeout);
|
||||
}
|
||||
|
||||
if ($attempts > 0) {
|
||||
$context = $context->withMaxAttempts($attempts);
|
||||
}
|
||||
|
||||
if ($noDelay) {
|
||||
$context = $context->withTcpNoDelay();
|
||||
}
|
||||
|
||||
$context = $context->withTlsContext(
|
||||
(new ClientTlsContext(''))
|
||||
->withoutPeerVerification(),
|
||||
);
|
||||
|
||||
return new self(yield connect($uri, $context));
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<null|string>
|
||||
*/
|
||||
public function read(): Promise
|
||||
{
|
||||
return $this->socket->read();
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
*/
|
||||
public function write(string $data): Promise
|
||||
{
|
||||
return $this->socket->write($data);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public function close(): void
|
||||
{
|
||||
$this->socket->close();
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
*/
|
||||
public function setupTls(): Promise
|
||||
{
|
||||
return $this->socket->setupTls();
|
||||
}
|
||||
}
|
39
src/Subscriber.php
Normal file
39
src/Subscriber.php
Normal file
@@ -0,0 +1,39 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Generator;
|
||||
|
||||
final class Subscriber
|
||||
{
|
||||
public const STOP = 0;
|
||||
|
||||
private Consumer $reader;
|
||||
|
||||
public function __construct(Consumer $reader)
|
||||
{
|
||||
$this->reader = $reader;
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Generator<int, Message|float|null, int|float|null, void>
|
||||
*/
|
||||
public function subscribe(string $topic, string $channel): Generator
|
||||
{
|
||||
$this->reader->sub($topic, $channel);
|
||||
|
||||
while (true) {
|
||||
$this->reader->rdy(1);
|
||||
|
||||
$command = yield $this->reader->receive()?->toMessage($this->reader);
|
||||
|
||||
if (self::STOP === $command) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
$this->reader->disconnect();
|
||||
}
|
||||
}
|
@@ -7,11 +7,6 @@ use PHPUnit\Framework\TestCase;
|
||||
|
||||
final class ClientConfigTest extends TestCase
|
||||
{
|
||||
public function testNegotiationPayload(): void
|
||||
{
|
||||
self::assertJson((new ClientConfig())->asNegotiationPayload());
|
||||
}
|
||||
|
||||
public function testInvalidCompression(): void
|
||||
{
|
||||
$this->expectException(InvalidArgumentException::class);
|
||||
@@ -19,42 +14,4 @@ final class ClientConfigTest extends TestCase
|
||||
|
||||
new ClientConfig(deflate: true, snappy: true);
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider array
|
||||
*/
|
||||
public function testFromArray(array $data, array $expected): void
|
||||
{
|
||||
self::assertSame($expected, get_object_vars(ClientConfig::fromArray($data)));
|
||||
}
|
||||
|
||||
public function array(): Generator
|
||||
{
|
||||
$default = get_object_vars(new ClientConfig());
|
||||
|
||||
yield 'Empty array' => [[], $default];
|
||||
|
||||
yield 'With wrong keys' => [['bla' => 'bla'], $default];
|
||||
|
||||
$custom = [
|
||||
'authSecret' => 'SomeSecret',
|
||||
'connectTimeout' => 100,
|
||||
'maxAttempts' => 10,
|
||||
'tcpNoDelay' => true,
|
||||
'rdyCount' => 1,
|
||||
'featureNegotiation' => true,
|
||||
'clientId' => 'SomeGorgeousClientId',
|
||||
'deflate' => true,
|
||||
'deflateLevel' => 1,
|
||||
'heartbeatInterval' => 31111,
|
||||
'hostname' => gethostname(),
|
||||
'msgTimeout' => 59999,
|
||||
'sampleRate' => 25,
|
||||
'tls' => true,
|
||||
'snappy' => false,
|
||||
'userAgent' => 'nsqphp/test',
|
||||
];
|
||||
|
||||
yield 'Full filled' => [$custom, $custom];
|
||||
}
|
||||
}
|
||||
|
@@ -1,28 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use Nsq\ErrorType;
|
||||
use Nsq\Frame\Error;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
final class ErrorTypeTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @dataProvider data
|
||||
*/
|
||||
public function testConstructor(Error $frame, bool $isConnectionTerminated): void
|
||||
{
|
||||
self::assertSame($isConnectionTerminated, ErrorType::terminable($frame));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \Generator<int, array{0: Error, 1: bool}>
|
||||
*/
|
||||
public function data(): Generator
|
||||
{
|
||||
yield [new Error('E_BAD_BODY'), true];
|
||||
yield [new Error('bla_bla'), true];
|
||||
yield [new Error('E_REQ_FAILED'), false];
|
||||
}
|
||||
}
|
90
tests/ExponentialStrategyTest.php
Normal file
90
tests/ExponentialStrategyTest.php
Normal file
@@ -0,0 +1,90 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use Nsq\Exception\ConnectionFail;
|
||||
use Nsq\Reconnect\ExponentialStrategy;
|
||||
use Nsq\Reconnect\TimeProvider;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
final class ExponentialStrategyTest extends TestCase
|
||||
{
|
||||
public function testTimeNotYetCome(): void
|
||||
{
|
||||
$timeProvider = new FakeTimeProvider();
|
||||
$strategy = new ExponentialStrategy(
|
||||
minDelay: 8,
|
||||
maxDelay: 32,
|
||||
timeProvider: $timeProvider,
|
||||
);
|
||||
|
||||
$successConnect = static function (int $time = null) use ($strategy, $timeProvider): void {
|
||||
$timeProvider($time);
|
||||
|
||||
$strategy->connect(static function (): void {
|
||||
});
|
||||
};
|
||||
$failConnect = static function (int $time = null) use ($strategy, $timeProvider): void {
|
||||
$timeProvider($time);
|
||||
|
||||
try {
|
||||
$strategy->connect(function (): void {
|
||||
throw new ConnectionFail('Time come but failed');
|
||||
});
|
||||
} catch (ConnectionFail $e) {
|
||||
self::assertSame('Time come but failed', $e->getMessage());
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
self::fail('Expecting exception with message "Time come but failed"');
|
||||
};
|
||||
$timeNotCome = static function (int $time = null) use ($strategy, $timeProvider): void {
|
||||
$timeProvider($time);
|
||||
|
||||
try {
|
||||
$strategy->connect(function (): void {
|
||||
throw new ConnectionFail('');
|
||||
});
|
||||
} catch (ConnectionFail $e) {
|
||||
self::assertSame('Time to reconnect has not yet come', $e->getMessage());
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
self::fail('Was expecting exception with message "Time to reconnect has not yet come"');
|
||||
};
|
||||
|
||||
$failConnect(0);
|
||||
$timeNotCome(7);
|
||||
$failConnect(8);
|
||||
$timeNotCome(22);
|
||||
$timeNotCome(13);
|
||||
$failConnect(24);
|
||||
$successConnect(56);
|
||||
$failConnect();
|
||||
$timeNotCome();
|
||||
$timeNotCome(63);
|
||||
$failConnect(64);
|
||||
|
||||
$this->expectException(ConnectionFail::class);
|
||||
$this->expectExceptionMessage('Time to reconnect has not yet come');
|
||||
|
||||
$successConnect();
|
||||
}
|
||||
}
|
||||
|
||||
class FakeTimeProvider implements TimeProvider
|
||||
{
|
||||
public int $time = 0;
|
||||
|
||||
public function time(): int
|
||||
{
|
||||
return $this->time;
|
||||
}
|
||||
|
||||
public function __invoke(int $time = null): void
|
||||
{
|
||||
$this->time = $time ?? $this->time;
|
||||
}
|
||||
}
|
@@ -2,10 +2,8 @@
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use Amp\Loop;
|
||||
use Amp\Success;
|
||||
use Nsq\Consumer;
|
||||
use Nsq\Exception\MessageException;
|
||||
use Nsq\Exception\MessageAlreadyFinished;
|
||||
use Nsq\Message;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
@@ -16,12 +14,16 @@ final class MessageTest extends TestCase
|
||||
*/
|
||||
public function testDoubleFinish(Message $message): void
|
||||
{
|
||||
$this->expectException(MessageException::class);
|
||||
self::assertFalse($message->isFinished());
|
||||
|
||||
Loop::run(function () use ($message): Generator {
|
||||
yield $message->finish();
|
||||
yield $message->finish();
|
||||
});
|
||||
$message->finish();
|
||||
|
||||
self::assertTrue($message->isFinished());
|
||||
|
||||
$this->expectException(MessageAlreadyFinished::class);
|
||||
$this->expectExceptionMessage('Can\'t finish message as it already finished.');
|
||||
|
||||
$message->finish();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -29,12 +31,16 @@ final class MessageTest extends TestCase
|
||||
*/
|
||||
public function testDoubleRequeue(Message $message): void
|
||||
{
|
||||
$this->expectException(MessageException::class);
|
||||
self::assertFalse($message->isFinished());
|
||||
|
||||
Loop::run(function () use ($message): Generator {
|
||||
yield $message->requeue(1);
|
||||
yield $message->requeue(5);
|
||||
});
|
||||
$message->requeue(1);
|
||||
|
||||
self::assertTrue($message->isFinished());
|
||||
|
||||
$this->expectException(MessageAlreadyFinished::class);
|
||||
$this->expectExceptionMessage('Can\'t requeue message as it already finished.');
|
||||
|
||||
$message->requeue(5);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -42,12 +48,14 @@ final class MessageTest extends TestCase
|
||||
*/
|
||||
public function testTouchAfterFinish(Message $message): void
|
||||
{
|
||||
$this->expectException(MessageException::class);
|
||||
self::assertFalse($message->isFinished());
|
||||
|
||||
Loop::run(function () use ($message): Generator {
|
||||
yield $message->finish();
|
||||
yield $message->touch();
|
||||
});
|
||||
$message->finish();
|
||||
|
||||
$this->expectException(MessageAlreadyFinished::class);
|
||||
$this->expectExceptionMessage('Can\'t touch message as it already finished.');
|
||||
|
||||
$message->touch();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -55,12 +63,6 @@ final class MessageTest extends TestCase
|
||||
*/
|
||||
public function messages(): Generator
|
||||
{
|
||||
/** @phpstan-ignore-next-line */
|
||||
$consumer = $this->createMock(Consumer::class);
|
||||
$consumer->method('fin')->willReturn(new Success());
|
||||
$consumer->method('touch')->willReturn(new Success());
|
||||
$consumer->method('req')->willReturn(new Success());
|
||||
|
||||
yield [new Message('id', 'body', 0, 0, $consumer)];
|
||||
yield [new Message(0, 0, 'id', 'body', $this->createStub(Consumer::class))];
|
||||
}
|
||||
}
|
||||
|
@@ -3,35 +3,90 @@
|
||||
declare(strict_types=1);
|
||||
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Consumer;
|
||||
use Nsq\Message;
|
||||
use Nsq\Producer;
|
||||
use Nsq\Subscriber;
|
||||
use Nyholm\NSA;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
final class NsqTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @dataProvider configs
|
||||
*/
|
||||
public function test(ClientConfig $clientConfig): void
|
||||
public function test(): void
|
||||
{
|
||||
self::markTestSkipped('');
|
||||
}
|
||||
$producer = new Producer('tcp://localhost:4150');
|
||||
$producer->pub(__FUNCTION__, __FUNCTION__);
|
||||
|
||||
/**
|
||||
* @return Generator<string, array<int, ClientConfig>>
|
||||
*/
|
||||
public function configs(): Generator
|
||||
{
|
||||
yield 'default' => [
|
||||
new ClientConfig(
|
||||
heartbeatInterval: 3000,
|
||||
snappy: false,
|
||||
$consumer = new Consumer(
|
||||
address: 'tcp://localhost:4150',
|
||||
clientConfig: new ClientConfig(
|
||||
heartbeatInterval: 1000,
|
||||
readTimeout: 1,
|
||||
),
|
||||
];
|
||||
);
|
||||
$subscriber = new Subscriber($consumer);
|
||||
$generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__);
|
||||
|
||||
yield 'snappy' => [
|
||||
new ClientConfig(
|
||||
heartbeatInterval: 3000,
|
||||
snappy: true,
|
||||
),
|
||||
];
|
||||
/** @var null|Message $message */
|
||||
$message = $generator->current();
|
||||
|
||||
self::assertInstanceOf(Message::class, $message);
|
||||
self::assertSame(__FUNCTION__, $message->body);
|
||||
$message->finish();
|
||||
|
||||
$generator->next();
|
||||
self::assertNull($generator->current());
|
||||
|
||||
$producer->mpub(__FUNCTION__, [
|
||||
'First mpub message.',
|
||||
'Second mpub message.',
|
||||
]);
|
||||
|
||||
$generator->next();
|
||||
/** @var null|Message $message */
|
||||
$message = $generator->current();
|
||||
self::assertInstanceOf(Message::class, $message);
|
||||
self::assertSame('First mpub message.', $message->body);
|
||||
$message->finish();
|
||||
|
||||
$generator->next();
|
||||
/** @var null|Message $message */
|
||||
$message = $generator->current();
|
||||
self::assertInstanceOf(Message::class, $message);
|
||||
self::assertSame('Second mpub message.', $message->body);
|
||||
$message->requeue(0);
|
||||
|
||||
$generator->next();
|
||||
/** @var null|Message $message */
|
||||
$message = $generator->current();
|
||||
self::assertInstanceOf(Message::class, $message);
|
||||
self::assertSame('Second mpub message.', $message->body);
|
||||
$message->finish();
|
||||
|
||||
$producer->dpub(__FUNCTION__, 'Deferred message.', 2000);
|
||||
|
||||
$generator->next();
|
||||
/** @var null|Message $message */
|
||||
$message = $generator->current();
|
||||
self::assertNull($message);
|
||||
|
||||
NSA::setProperty(
|
||||
NSA::getProperty($consumer, 'clientConfig'),
|
||||
'readTimeout',
|
||||
10,
|
||||
);
|
||||
|
||||
$generator->next();
|
||||
|
||||
/** @var null|Message $message */
|
||||
$message = $generator->current();
|
||||
self::assertInstanceOf(Message::class, $message);
|
||||
self::assertSame('Deferred message.', $message->body);
|
||||
$message->touch();
|
||||
$message->finish();
|
||||
|
||||
self::assertTrue($consumer->isReady());
|
||||
$generator->send(Subscriber::STOP);
|
||||
self::assertFalse($consumer->isReady());
|
||||
}
|
||||
}
|
||||
|
@@ -1,95 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use Symfony\Component\Filesystem\Filesystem;
|
||||
use Symfony\Component\Process\Process;
|
||||
|
||||
final class Nsqd
|
||||
{
|
||||
public Process $process;
|
||||
|
||||
public string $address;
|
||||
|
||||
private static Filesystem $fs;
|
||||
|
||||
public function __construct(
|
||||
public readonly string $dataPath,
|
||||
public readonly int $httpPort,
|
||||
public readonly int $tcpPort,
|
||||
) {
|
||||
self::$fs ??= new Filesystem();
|
||||
self::$fs->mkdir($this->dataPath);
|
||||
|
||||
$nsqd = new Process([
|
||||
'./nsqd',
|
||||
sprintf('-data-path=%s', $this->dataPath),
|
||||
sprintf('-http-address=0.0.0.0:%s', $this->httpPort),
|
||||
sprintf('-tcp-address=0.0.0.0:%s', $this->tcpPort),
|
||||
'-log-level=debug',
|
||||
], dirname(__DIR__).'/bin');
|
||||
|
||||
$nsqd->start();
|
||||
|
||||
while (false === @fsockopen('localhost', $this->tcpPort)) {
|
||||
if (!$nsqd->isRunning()) {
|
||||
throw new RuntimeException($nsqd->getErrorOutput());
|
||||
}
|
||||
|
||||
usleep(10000);
|
||||
}
|
||||
|
||||
$this->process = $nsqd;
|
||||
$this->address = sprintf('tcp://localhost:%s', $this->tcpPort);
|
||||
}
|
||||
|
||||
public static function create(): self
|
||||
{
|
||||
do {
|
||||
$dir = sprintf('/tmp/%s', bin2hex(random_bytes(5)));
|
||||
} while (is_dir($dir));
|
||||
|
||||
return new self(
|
||||
$dir,
|
||||
findFreePort(),
|
||||
findFreePort(),
|
||||
);
|
||||
}
|
||||
|
||||
public function tail(string $topic, string $channel, int $messages): Process
|
||||
{
|
||||
$tail = new Process(
|
||||
[
|
||||
'./nsq_tail',
|
||||
sprintf('-nsqd-tcp-address=localhost:%s', $this->tcpPort),
|
||||
sprintf('-topic=%s', $topic),
|
||||
sprintf('-channel=%s', $channel),
|
||||
sprintf('-n=%s', $messages),
|
||||
'-print-topic',
|
||||
],
|
||||
dirname(__DIR__).'/bin',
|
||||
timeout: 10,
|
||||
);
|
||||
|
||||
$tail->start();
|
||||
|
||||
return $tail;
|
||||
}
|
||||
|
||||
public function __destruct()
|
||||
{
|
||||
$this->process->stop();
|
||||
self::$fs->remove($this->dataPath);
|
||||
}
|
||||
}
|
||||
|
||||
function findFreePort(): int
|
||||
{
|
||||
$sock = socket_create_listen(0);
|
||||
assert($sock instanceof \Socket);
|
||||
|
||||
socket_getsockname($sock, $addr, $port);
|
||||
socket_close($sock);
|
||||
|
||||
return $port;
|
||||
}
|
@@ -2,97 +2,30 @@
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use Amp\Loop;
|
||||
use Nsq\Exception\ServerException;
|
||||
use Nsq\Exception\NsqError;
|
||||
use Nsq\Producer;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
use function Amp\Promise\wait;
|
||||
|
||||
final class ProducerTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @dataProvider bodies
|
||||
*/
|
||||
public function testPublish(string $body): void
|
||||
{
|
||||
$nsqd = Nsqd::create();
|
||||
$tail = $nsqd->tail('test', 'test', 1);
|
||||
|
||||
$producer = Producer::create($nsqd->address);
|
||||
|
||||
wait($producer->connect());
|
||||
self::assertTrue(wait($producer->publish('test', $body)));
|
||||
|
||||
self::assertSame(0, $tail->wait());
|
||||
|
||||
self::assertSame("test | {$body}", trim($tail->getOutput()));
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider bodies
|
||||
*/
|
||||
public function testPublishMultiple(string $body): void
|
||||
{
|
||||
$nsqd = Nsqd::create();
|
||||
$tail = $nsqd->tail('test', 'test', 2);
|
||||
|
||||
$producer = Producer::create($nsqd->address);
|
||||
|
||||
wait($producer->connect());
|
||||
self::assertTrue(wait($producer->publish('test', [$body, $body])));
|
||||
|
||||
self::assertSame(0, $tail->wait());
|
||||
|
||||
self::assertSame("test | {$body}\ntest | {$body}", trim($tail->getOutput()));
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider bodies
|
||||
*/
|
||||
public function testPublishDeferred(string $body): void
|
||||
{
|
||||
$nsqd = Nsqd::create();
|
||||
$tail = $nsqd->tail('test', 'test', 1);
|
||||
|
||||
$producer = Producer::create($nsqd->address);
|
||||
|
||||
wait($producer->connect());
|
||||
self::assertTrue(wait($producer->publish('test', $body, 1)));
|
||||
|
||||
self::assertSame(0, $tail->wait());
|
||||
|
||||
self::assertSame("test | {$body}", trim($tail->getOutput()));
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider pubFails
|
||||
*/
|
||||
public function testPubFail(string $topic, string $body, string $exceptionMessage): void
|
||||
{
|
||||
$nsqd = Nsqd::create();
|
||||
|
||||
$this->expectException(ServerException::class);
|
||||
$this->expectException(NsqError::class);
|
||||
$this->expectExceptionMessage($exceptionMessage);
|
||||
|
||||
$producer = Producer::create($nsqd->address);
|
||||
|
||||
Loop::run(static function () use ($producer, $topic, $body): Generator {
|
||||
yield $producer->connect();
|
||||
|
||||
yield $producer->publish($topic, $body);
|
||||
});
|
||||
$producer = new Producer('tcp://localhost:4150');
|
||||
$producer->pub($topic, $body);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Generator<string, array>
|
||||
*/
|
||||
public function pubFails(): Generator
|
||||
{
|
||||
yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0'];
|
||||
yield 'Invalid topic' => ['test$%^&', '', 'E_BAD_TOPIC PUB topic name "test$%^&" is not valid'];
|
||||
}
|
||||
|
||||
public function bodies(): Generator
|
||||
{
|
||||
yield 'Simple Body' => ['Simple Body'];
|
||||
yield 'Body with special chars' => ['test$%^&'];
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user