Compare commits
3 Commits
renovate/c
...
tests
Author | SHA1 | Date | |
---|---|---|---|
f5ab37c579 | |||
809f967fb1 | |||
1b5e1ffb95 |
211
.github/workflows/ci.yaml
vendored
Normal file
211
.github/workflows/ci.yaml
vendored
Normal file
@@ -0,0 +1,211 @@
|
||||
name: CI
|
||||
|
||||
on:
|
||||
- pull_request
|
||||
- push
|
||||
|
||||
jobs:
|
||||
tests:
|
||||
name: Tests
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os:
|
||||
- ubuntu-latest
|
||||
php:
|
||||
- '8.0'
|
||||
dependencies:
|
||||
- lowest
|
||||
- highest
|
||||
services:
|
||||
nsqd:
|
||||
image: nsqio/nsq:v1.2.0
|
||||
options: --entrypoint /nsqd
|
||||
ports:
|
||||
- 4150:4150
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: ${{ matrix.php }}
|
||||
coverage: pcov
|
||||
extensions: kjdev/php-ext-snappy@0.2.1
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Setup Problem Matchers for PHPUnit
|
||||
run: echo "::add-matcher::${{ runner.tool_cache }}/phpunit.json"
|
||||
|
||||
- name: Determine Composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=directory::$(composer config cache-dir)"
|
||||
|
||||
- name: Cache Composer dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.directory }}
|
||||
key: ${{ runner.os }}-${{ matrix.php }}-composer-${{ matrix.dependencies }}-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-${{ matrix.php }}-${{ matrix.dependencies }}-composer-
|
||||
|
||||
- name: Install highest dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
if: ${{ matrix.dependencies == 'highest' }}
|
||||
|
||||
- name: Install lowest dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest
|
||||
if: ${{ matrix.dependencies == 'lowest' }}
|
||||
|
||||
- name: Install nsq bin
|
||||
run: curl -L https://github.com/nsqio/nsq/releases/download/v1.2.0/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz | tar xz --strip 1
|
||||
|
||||
- name: Run tests
|
||||
run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml
|
||||
|
||||
- name: Upload code coverage
|
||||
uses: codecov/codecov-action@v1
|
||||
with:
|
||||
file: build/coverage-report.xml
|
||||
|
||||
php-cs-fixer:
|
||||
name: PHP-CS-Fixer
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.0'
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Run script
|
||||
run: composer cs-check
|
||||
|
||||
phpstan:
|
||||
name: PHPStan
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.0'
|
||||
extensions: snappy-kjdev/php-ext-snappy@0.2.1
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Run script
|
||||
run: composer phpstan
|
||||
|
||||
psalm:
|
||||
name: Psalm
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.0'
|
||||
extensions: snappy-kjdev/php-ext-snappy@0.2.1
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Run script
|
||||
run: vendor/bin/psalm --output-format=github
|
||||
|
||||
infection:
|
||||
name: Infection
|
||||
runs-on: ubuntu-latest
|
||||
services:
|
||||
nsqd:
|
||||
image: nsqio/nsq:v1.2.0
|
||||
options: --entrypoint /nsqd
|
||||
ports:
|
||||
- 4150:4150
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.0'
|
||||
coverage: pcov
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Install nsq bin
|
||||
run: curl -L https://github.com/nsqio/nsq/releases/download/v1.2.0/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz | tar xz --strip 1
|
||||
|
||||
- name: Run script
|
||||
env:
|
||||
STRYKER_DASHBOARD_API_KEY: ${{ secrets.STRYKER_DASHBOARD_API_KEY }}
|
||||
run: |
|
||||
git fetch --depth=1 origin $GITHUB_BASE_REF
|
||||
php vendor/bin/infection -j2 --git-diff-filter=A --git-diff-base=origin/$GITHUB_BASE_REF --logger-github --ignore-msi-with-no-mutations --only-covered
|
37
.github/workflows/code_style.yaml
vendored
37
.github/workflows/code_style.yaml
vendored
@@ -1,37 +0,0 @@
|
||||
name: Code Style
|
||||
|
||||
on:
|
||||
- pull_request
|
||||
- push
|
||||
|
||||
jobs:
|
||||
php-cs-fixer:
|
||||
name: PHP-CS-Fixer
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.1'
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Run script
|
||||
run: composer cs-check
|
71
.github/workflows/phpunit.yaml
vendored
71
.github/workflows/phpunit.yaml
vendored
@@ -1,71 +0,0 @@
|
||||
name: phpunit
|
||||
|
||||
on:
|
||||
- pull_request
|
||||
- push
|
||||
|
||||
jobs:
|
||||
phpunit:
|
||||
name: phpunit
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os:
|
||||
- ubuntu-latest
|
||||
php:
|
||||
- '8.1'
|
||||
nsq:
|
||||
- nsq-1.2.0.linux-amd64.go1.12.9
|
||||
- nsq-1.2.1.linux-amd64.go1.16.6
|
||||
dependencies:
|
||||
- lowest
|
||||
- highest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: ${{ matrix.php }}
|
||||
coverage: pcov
|
||||
extensions: kjdev/php-ext-snappy@0.2.1
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Download NSQ
|
||||
run: |
|
||||
curl -sSL "http://bitly-downloads.s3.amazonaws.com/nsq/${{ matrix.nsq }}.tar.gz" \
|
||||
| tar -xzv --strip-components=1
|
||||
./bin/nsqd --version
|
||||
|
||||
- name: Setup Problem Matchers for PHPUnit
|
||||
run: echo "::add-matcher::${{ runner.tool_cache }}/phpunit.json"
|
||||
|
||||
- name: Determine Composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=directory::$(composer config cache-dir)"
|
||||
|
||||
- name: Cache Composer dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.directory }}
|
||||
key: ${{ runner.os }}-${{ matrix.php }}-composer-${{ matrix.dependencies }}-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-${{ matrix.php }}-${{ matrix.dependencies }}-composer-
|
||||
|
||||
- name: Install highest dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
if: ${{ matrix.dependencies == 'highest' }}
|
||||
|
||||
- name: Install lowest dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest
|
||||
if: ${{ matrix.dependencies == 'lowest' }}
|
||||
|
||||
- name: Run tests
|
||||
run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml
|
||||
|
||||
- name: Upload code coverage
|
||||
uses: codecov/codecov-action@v1
|
||||
with:
|
||||
file: build/coverage-report.xml
|
70
.github/workflows/static_analyze.yaml
vendored
70
.github/workflows/static_analyze.yaml
vendored
@@ -1,70 +0,0 @@
|
||||
name: Static Analyze
|
||||
|
||||
on:
|
||||
- pull_request
|
||||
- push
|
||||
|
||||
jobs:
|
||||
phpstan:
|
||||
name: PHPStan
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.1'
|
||||
extensions: snappy-kjdev/php-ext-snappy@0.2.1
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Run script
|
||||
run: composer phpstan
|
||||
|
||||
psalm:
|
||||
name: Psalm
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.1'
|
||||
extensions: snappy-kjdev/php-ext-snappy@0.2.1
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Run script
|
||||
run: vendor/bin/psalm --output-format=github
|
2
.gitignore
vendored
2
.gitignore
vendored
@@ -1,7 +1,7 @@
|
||||
/vendor/
|
||||
/composer.lock
|
||||
|
||||
/.php-cs-fixer.cache
|
||||
/.php_cs.cache
|
||||
/.phpunit.result.cache
|
||||
/infection.log
|
||||
|
||||
|
@@ -1,45 +0,0 @@
|
||||
<?php
|
||||
|
||||
$finder = PhpCsFixer\Finder::create()
|
||||
->in(__DIR__)
|
||||
->exclude('vendor');
|
||||
|
||||
return (new PhpCsFixer\Config())
|
||||
->setRiskyAllowed(true)
|
||||
->setRules([
|
||||
'@PhpCsFixer' => true,
|
||||
'@PhpCsFixer:risky' => true,
|
||||
'@PSR12' => true,
|
||||
'@PSR12:risky' => true,
|
||||
'blank_line_before_statement' => [
|
||||
'statements' => [
|
||||
'continue',
|
||||
'do',
|
||||
'exit',
|
||||
'goto',
|
||||
'if',
|
||||
'return',
|
||||
'switch',
|
||||
'throw',
|
||||
'try',
|
||||
],
|
||||
],
|
||||
'declare_strict_types' => true,
|
||||
'global_namespace_import' => [
|
||||
'import_classes' => false,
|
||||
'import_constants' => false,
|
||||
'import_functions' => false,
|
||||
],
|
||||
'php_unit_internal_class' => false,
|
||||
'php_unit_test_case_static_method_calls' => ['call_type' => 'self'],
|
||||
'php_unit_test_class_requires_covers' => false,
|
||||
'phpdoc_to_comment' => false,
|
||||
'yoda_style' => true,
|
||||
'trailing_comma_in_multiline' => [
|
||||
'after_heredoc' => true,
|
||||
'elements' => ['arrays', 'arguments', 'parameters'],
|
||||
],
|
||||
'types_spaces' => ['space' => 'single'],
|
||||
])
|
||||
->setFinder($finder)
|
||||
->setCacheFile(__DIR__.'/.php-cs-fixer.cache');
|
28
.php_cs.dist
Normal file
28
.php_cs.dist
Normal file
@@ -0,0 +1,28 @@
|
||||
#!/usr/bin/env php
|
||||
<?php
|
||||
|
||||
return (new PhpCsFixer\Config())
|
||||
->setRiskyAllowed(true)
|
||||
->setRules([
|
||||
'@PhpCsFixer' => true,
|
||||
'@PhpCsFixer:risky' => true,
|
||||
'@PSR12' => true,
|
||||
'@PSR12:risky' => true,
|
||||
'braces' => [
|
||||
'allow_single_line_closure' => true,
|
||||
],
|
||||
'blank_line_before_statement' => [
|
||||
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try'],
|
||||
],
|
||||
'declare_strict_types' => true,
|
||||
'global_namespace_import' => ['import_classes' => false, 'import_constants' => false, 'import_functions' => false],
|
||||
'php_unit_internal_class' => false,
|
||||
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
|
||||
'php_unit_test_class_requires_covers' => false,
|
||||
'phpdoc_to_comment' => false,
|
||||
'yoda_style' => true,
|
||||
])
|
||||
->setFinder(
|
||||
PhpCsFixer\Finder::create()
|
||||
->in(__DIR__)
|
||||
);
|
20
Makefile
20
Makefile
@@ -1,20 +0,0 @@
|
||||
|
||||
all: install composer-validate php-cs-fixer psalm phpstan phpunit
|
||||
|
||||
install:
|
||||
composer install
|
||||
|
||||
psalm:
|
||||
php vendor/bin/psalm
|
||||
|
||||
phpstan:
|
||||
php vendor/bin/phpstan analyse
|
||||
|
||||
phpunit:
|
||||
php vendor/bin/phpunit
|
||||
|
||||
php-cs-fixer:
|
||||
php vendor/bin/php-cs-fixer fix
|
||||
|
||||
composer-validate:
|
||||
composer validate
|
33
README.md
33
README.md
@@ -1,12 +1,11 @@
|
||||
# Nsq PHP
|
||||
|
||||
<img src="https://github.com/nsqphp/nsqphp/raw/master/docs/logo.png" alt="" align="left" width="150">
|
||||
<img src="https://github.com/nsqphp/nsqphp/raw/main/docs/logo.png" alt="" align="left" width="150">
|
||||
|
||||
PHP Client for [NSQ](https://nsq.io/).
|
||||
|
||||
[](//packagist.org/packages/nsq/nsq) [](//packagist.org/packages/nsq/nsq) [](//packagist.org/packages/nsq/nsq)
|
||||
[](https://codecov.io/gh/nsqphp/nsqphp) [](https://dashboard.stryker-mutator.io/reports/github.com/nsqphp/nsqphp/master) [](http://t.me/grachevko)
|
||||
|
||||
[](https://codecov.io/gh/nsqphp/nsqphp) [](https://dashboard.stryker-mutator.io/reports/github.com/nsqphp/nsqphp/main)
|
||||
|
||||
This library follow [SemVer](https://semver.org/). Until version 1.0 will be released anything MAY change at any time, public API SHOULD NOT be considered stable. If you want use it before stable version was released install strict version without range.
|
||||
|
||||
@@ -32,10 +31,10 @@ Features
|
||||
- [x] PUB
|
||||
- [x] SUB
|
||||
- [X] Feature Negotiation
|
||||
- [X] Discovery
|
||||
- [ ] Discovery
|
||||
- [ ] Backoff
|
||||
- [X] TLS
|
||||
- [X] Deflate
|
||||
- [ ] TLS
|
||||
- [ ] Deflate
|
||||
- [X] Snappy
|
||||
- [X] Sampling
|
||||
- [X] AUTH
|
||||
@@ -81,28 +80,6 @@ $consumer = Consumer::create(
|
||||
);
|
||||
```
|
||||
|
||||
### Lookup
|
||||
|
||||
```php
|
||||
use Nsq\Lookup;
|
||||
use Nsq\Message;
|
||||
|
||||
$lookup = new Lookup('http://nsqlookupd0:4161');
|
||||
$lookup = new Lookup(['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161']);
|
||||
|
||||
$callable = static function (Message $message): Generator {
|
||||
yield $message->touch(); // Reset the timeout for an in-flight message
|
||||
yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
|
||||
yield $message->finish(); // Finish a message (indicate successful processing)
|
||||
};
|
||||
|
||||
$lookup->subscribe(topic: 'topic', channel: 'channel', onMessage: $callable);
|
||||
$lookup->subscribe(topic: 'anotherTopic', channel: 'channel', onMessage: $callable);
|
||||
|
||||
$lookup->unsubscribe(topic: 'local', channel: 'channel');
|
||||
$lookup->stop(); // unsubscribe all
|
||||
```
|
||||
|
||||
### Integrations
|
||||
|
||||
- [Symfony](https://github.com/nsqphp/NsqBundle)
|
||||
|
0
bin/.gitkeep
Normal file
0
bin/.gitkeep
Normal file
@@ -11,49 +11,34 @@
|
||||
}
|
||||
],
|
||||
"require": {
|
||||
"php": "^8.1",
|
||||
"php": "^8.0.1",
|
||||
"ext-json": "*",
|
||||
"amphp/http-client": "^4.6",
|
||||
"amphp/socket": "^1.1",
|
||||
"composer/semver": "^3.2",
|
||||
"phpinnacle/buffer": "^1.2",
|
||||
"psr/log": "^3.0"
|
||||
"psr/log": "^1.1"
|
||||
},
|
||||
"require-dev": {
|
||||
"amphp/log": "^1.1",
|
||||
"dg/bypass-finals": "^1.3",
|
||||
"ergebnis/composer-normalize": "^2.15",
|
||||
"friendsofphp/php-cs-fixer": "^3.4",
|
||||
"ergebnis/composer-normalize": "9999999-dev",
|
||||
"friendsofphp/php-cs-fixer": "^2.18",
|
||||
"infection/infection": "^0.20.2",
|
||||
"nyholm/nsa": "^1.2",
|
||||
"phpstan/phpstan": "^1.8",
|
||||
"phpstan/phpstan-phpunit": "^1.1",
|
||||
"phpstan/phpstan-strict-rules": "^1.3",
|
||||
"phpstan/phpstan": "^0.12.68",
|
||||
"phpstan/phpstan-phpunit": "^0.12.17",
|
||||
"phpstan/phpstan-strict-rules": "^0.12.9",
|
||||
"phpunit/phpunit": "^9.5",
|
||||
"symfony/filesystem": "^6.1",
|
||||
"symfony/process": "^6.1",
|
||||
"symfony/var-dumper": "^6.1",
|
||||
"vimeo/psalm": "^4.4"
|
||||
},
|
||||
"config": {
|
||||
"sort-packages": true,
|
||||
"allow-plugins": {
|
||||
"ergebnis/composer-normalize": true,
|
||||
"infection/extension-installer": true
|
||||
}
|
||||
"sort-packages": true
|
||||
},
|
||||
"autoload": {
|
||||
"psr-4": {
|
||||
"Nsq\\": "src/"
|
||||
}
|
||||
},
|
||||
"autoload-dev": {
|
||||
"classmap": [
|
||||
"tests/"
|
||||
],
|
||||
"files": [
|
||||
"vendor/symfony/var-dumper/Resources/functions/dump.php"
|
||||
]
|
||||
},
|
||||
"minimum-stability": "dev",
|
||||
"prefer-stable": true,
|
||||
"scripts": {
|
||||
|
@@ -2,56 +2,23 @@ version: '3.7'
|
||||
|
||||
services:
|
||||
nsqd:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
image: nsqio/nsq:v1.2.0
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqd'
|
||||
command: >-
|
||||
nsqd
|
||||
--log-level debug
|
||||
--lookupd-tcp-address nsqlookupd0:4160
|
||||
--lookupd-tcp-address nsqlookupd1:4160
|
||||
--lookupd-tcp-address nsqlookupd2:4160
|
||||
|
||||
nsqlookupd0:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqlookupd0'
|
||||
command: /nsqlookupd -log-level debug
|
||||
|
||||
nsqlookupd1:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqlookupd1'
|
||||
command: /nsqlookupd -log-level debug
|
||||
|
||||
nsqlookupd2:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqlookupd2'
|
||||
command: /nsqlookupd -log-level debug
|
||||
command: /nsqd -log-level debug
|
||||
# command: /nsqd
|
||||
ports:
|
||||
- 4150:4150
|
||||
- 4151:4151
|
||||
|
||||
nsqadmin:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
image: nsqio/nsq:v1.2.0
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqadmin'
|
||||
command:
|
||||
- nsqadmin
|
||||
- --http-address=0.0.0.0:4171
|
||||
- --lookupd-http-address=nsqlookupd0:4161
|
||||
- --lookupd-http-address=nsqlookupd1:4161
|
||||
- --lookupd-http-address=nsqlookupd2:4161
|
||||
depends_on:
|
||||
- nsqlookupd0
|
||||
- nsqlookupd1
|
||||
- nsqlookupd2
|
||||
command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171
|
||||
ports:
|
||||
- 4171:4171
|
||||
|
||||
tail:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
command: >-
|
||||
nsq_tail
|
||||
--channel nsq_tail
|
||||
--topic local
|
||||
--lookupd-http-address nsqlookupd1:4161
|
||||
depends_on:
|
||||
- nsqd
|
||||
- nsqlookupd1
|
||||
image: nsqio/nsq:v1.2.0
|
||||
command: nsq_tail -channel nsq_tail -topic local -nsqd-tcp-address nsqd:4150
|
||||
|
@@ -14,7 +14,6 @@ use Monolog\Processor\PsrLogMessageProcessor;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Consumer;
|
||||
use Nsq\Message;
|
||||
|
||||
use function Amp\call;
|
||||
|
||||
Loop::run(static function () {
|
@@ -1,47 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
require dirname(__DIR__).'/../vendor/autoload.php';
|
||||
|
||||
use Amp\ByteStream;
|
||||
use Amp\Log\ConsoleFormatter;
|
||||
use Amp\Log\StreamHandler;
|
||||
use Amp\Loop;
|
||||
use Monolog\Logger;
|
||||
use Monolog\Processor\PsrLogMessageProcessor;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Config\LookupConfig;
|
||||
use Nsq\Lookup;
|
||||
use Nsq\Message;
|
||||
|
||||
Loop::run(static function () {
|
||||
$handler = new StreamHandler(ByteStream\getStdout());
|
||||
$handler->setFormatter(new ConsoleFormatter());
|
||||
$logger = new Logger('consumer', [$handler], [new PsrLogMessageProcessor()]);
|
||||
|
||||
$callable = static function (Message $message) {
|
||||
yield $message->finish();
|
||||
};
|
||||
|
||||
$clientConfig = new ClientConfig();
|
||||
|
||||
$lookupConfig = new LookupConfig();
|
||||
|
||||
$watcherId = Loop::repeat(5000, function () {
|
||||
yield Amp\Dns\resolver()->reloadConfig();
|
||||
});
|
||||
|
||||
$lookup = Lookup::create(
|
||||
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
|
||||
$lookupConfig,
|
||||
$logger,
|
||||
);
|
||||
|
||||
$lookup->subscribe('local', 'local', $callable, $clientConfig);
|
||||
|
||||
Loop::delay(10000, function () use ($lookup, $watcherId) {
|
||||
$lookup->stop();
|
||||
Loop::cancel($watcherId);
|
||||
});
|
||||
});
|
@@ -1,102 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
require dirname(__DIR__).'/../vendor/autoload.php';
|
||||
|
||||
use Amp\ByteStream;
|
||||
use Amp\Log\ConsoleFormatter;
|
||||
use Amp\Log\StreamHandler;
|
||||
use Amp\Loop;
|
||||
use Monolog\Logger;
|
||||
use Monolog\Processor\PsrLogMessageProcessor;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Config\LookupConfig;
|
||||
use Nsq\Lookup;
|
||||
use Nsq\Producer;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\delay;
|
||||
|
||||
Loop::run(static function () {
|
||||
$handler = new StreamHandler(ByteStream\getStdout());
|
||||
$handler->setFormatter(new ConsoleFormatter());
|
||||
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
|
||||
|
||||
$clientConfig = new ClientConfig();
|
||||
|
||||
/** @var Producer[] $producers */
|
||||
$producers = [];
|
||||
|
||||
$lookupConfig = new LookupConfig();
|
||||
|
||||
$lookup = Lookup::create(
|
||||
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
|
||||
$lookupConfig,
|
||||
$logger,
|
||||
);
|
||||
|
||||
$isRunning = true;
|
||||
|
||||
asyncCall(static function () use ($lookup, $clientConfig, $logger, &$producers, &$isRunning) {
|
||||
while ($isRunning) {
|
||||
/** @var Lookup\Producer[] $nodes */
|
||||
$nodes = yield $lookup->nodes();
|
||||
|
||||
foreach ($nodes as $node) {
|
||||
$address = $node->toTcpUri();
|
||||
|
||||
if (array_key_exists($address, $producers)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
asyncCall(function () use ($address, $clientConfig, $logger, &$producers) {
|
||||
yield ($producers[$address] = Producer::create($address, $clientConfig, $logger))
|
||||
->onClose(function () use (&$producers, $address) {
|
||||
unset($producers[$address]);
|
||||
})
|
||||
->connect()
|
||||
;
|
||||
});
|
||||
}
|
||||
|
||||
yield delay(5000);
|
||||
|
||||
yield Amp\Dns\resolver()->reloadConfig(); // for reload /etc/hosts
|
||||
}
|
||||
});
|
||||
|
||||
Loop::delay(5000, function () use (&$isRunning, $logger) {
|
||||
$logger->info('Stopping producer.');
|
||||
|
||||
$isRunning = false;
|
||||
});
|
||||
|
||||
$counter = 0;
|
||||
while (true) {
|
||||
if (!$isRunning) {
|
||||
foreach ($producers as $producer) {
|
||||
$producer->close();
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if ([] === $producers) {
|
||||
yield delay(200);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
$index = array_rand($producers);
|
||||
$producer = $producers[$index];
|
||||
|
||||
if (!$producer->isConnected()) {
|
||||
yield delay(100);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
yield $producer->publish('local', 'This is message of count '.$counter++);
|
||||
}
|
||||
});
|
@@ -8,9 +8,3 @@ parameters:
|
||||
paths:
|
||||
- src
|
||||
- tests
|
||||
ignoreErrors:
|
||||
-
|
||||
message: '#no value type specified in iterable type array#'
|
||||
paths:
|
||||
- %currentWorkingDirectory%/src
|
||||
- %currentWorkingDirectory%/tests
|
||||
|
@@ -1,5 +1,6 @@
|
||||
<?xml version="1.0"?>
|
||||
<psalm
|
||||
allowPhpStormGenerics="true"
|
||||
ignoreInternalFunctionFalseReturn="false"
|
||||
ignoreInternalFunctionNullReturn="false"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
|
@@ -1,3 +0,0 @@
|
||||
{
|
||||
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
|
||||
}
|
@@ -13,11 +13,8 @@ final class Buffer extends ByteBuffer
|
||||
{
|
||||
public function readUInt32LE(): int
|
||||
{
|
||||
$unpacked = unpack('V', $this->consume(4));
|
||||
|
||||
\assert(\is_array($unpacked) && \array_key_exists(1, $unpacked));
|
||||
|
||||
return $unpacked[1];
|
||||
/** @phpstan-ignore-next-line */
|
||||
return unpack('V', $this->consume(4))[1];
|
||||
}
|
||||
|
||||
public function consumeTimestamp(): int
|
||||
|
@@ -13,7 +13,7 @@ use Composer\InstalledVersions;
|
||||
*
|
||||
* @psalm-immutable
|
||||
*/
|
||||
final class ClientConfig
|
||||
final class ClientConfig implements \JsonSerializable
|
||||
{
|
||||
/**
|
||||
* @psalm-suppress ImpureFunctionCall
|
||||
@@ -26,26 +26,9 @@ final class ClientConfig
|
||||
public ?string $authSecret = null,
|
||||
|
||||
/**
|
||||
* The timeout for establishing a connection in milliseconds.
|
||||
* The timeout for establishing a connection in seconds.
|
||||
*/
|
||||
public int $connectTimeout = 10000,
|
||||
|
||||
/**
|
||||
* The max attempts for establishing a connection.
|
||||
*/
|
||||
public int $maxAttempts = 0,
|
||||
|
||||
/**
|
||||
* Use tcp_nodelay for establishing a connection.
|
||||
*/
|
||||
public bool $tcpNoDelay = false,
|
||||
public int $rdyCount = 100,
|
||||
|
||||
/**
|
||||
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
||||
* it will send back a JSON payload of supported features and metadata.
|
||||
*/
|
||||
public bool $featureNegotiation = true,
|
||||
public int $connectTimeout = 10,
|
||||
|
||||
/**
|
||||
* An identifier used to disambiguate this client (i.e. something specific to the consumer).
|
||||
@@ -90,6 +73,12 @@ final class ClientConfig
|
||||
*/
|
||||
public int $sampleRate = 0,
|
||||
|
||||
/**
|
||||
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
||||
* it will send back a JSON payload of supported features and metadata.
|
||||
*/
|
||||
public bool $featureNegotiation = true,
|
||||
|
||||
/**
|
||||
* Enable TLS for this connection.
|
||||
*/
|
||||
@@ -100,6 +89,11 @@ final class ClientConfig
|
||||
*/
|
||||
public bool $snappy = false,
|
||||
|
||||
/**
|
||||
* The read timeout for connection sockets and for awaiting responses from nsq.
|
||||
*/
|
||||
public int $readTimeout = 5,
|
||||
|
||||
/**
|
||||
* A string identifying the agent for this client in the spirit of HTTP.
|
||||
*/
|
||||
@@ -120,14 +114,12 @@ final class ClientConfig
|
||||
}
|
||||
}
|
||||
|
||||
public static function fromArray(array $array): self
|
||||
/**
|
||||
* @phpstan-ignore-next-line
|
||||
*/
|
||||
public function jsonSerialize(): array
|
||||
{
|
||||
return new self(...array_intersect_key($array, get_class_vars(self::class)));
|
||||
}
|
||||
|
||||
public function asNegotiationPayload(): string
|
||||
{
|
||||
$data = [
|
||||
return [
|
||||
'client_id' => $this->clientId,
|
||||
'deflate' => $this->deflate,
|
||||
'deflate_level' => $this->deflateLevel,
|
||||
@@ -140,7 +132,10 @@ final class ClientConfig
|
||||
'tls_v1' => $this->tls,
|
||||
'user_agent' => $this->userAgent,
|
||||
];
|
||||
}
|
||||
|
||||
return json_encode($data, JSON_THROW_ON_ERROR);
|
||||
public function toString(): string
|
||||
{
|
||||
return json_encode($this, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
||||
}
|
||||
}
|
||||
|
@@ -8,10 +8,8 @@ namespace Nsq\Config;
|
||||
* The configuration object that holds the config status for a single Connection.
|
||||
*
|
||||
* @psalm-immutable
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
final class ServerConfig
|
||||
final class ConnectionConfig
|
||||
{
|
||||
public function __construct(
|
||||
/**
|
||||
@@ -84,6 +82,9 @@ final class ServerConfig
|
||||
) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @phpstan-ignore-next-line
|
||||
*/
|
||||
public static function fromArray(array $array): self
|
||||
{
|
||||
return new self(
|
@@ -1,13 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Config;
|
||||
|
||||
final class LookupConfig
|
||||
{
|
||||
public function __construct(
|
||||
public int $pollingInterval = 10000,
|
||||
) {
|
||||
}
|
||||
}
|
@@ -4,11 +4,9 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\ByteStream\ClosedException;
|
||||
use Amp\Failure;
|
||||
use Amp\Promise;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Config\ServerConfig;
|
||||
use Nsq\Config\ConnectionConfig;
|
||||
use Nsq\Exception\AuthenticationRequired;
|
||||
use Nsq\Exception\NsqException;
|
||||
use Nsq\Frame\Response;
|
||||
@@ -17,8 +15,6 @@ use Nsq\Stream\NullStream;
|
||||
use Nsq\Stream\SnappyStream;
|
||||
use Nsq\Stream\SocketStream;
|
||||
use Psr\Log\LoggerInterface;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\call;
|
||||
|
||||
/**
|
||||
@@ -26,45 +22,23 @@ use function Amp\call;
|
||||
*/
|
||||
abstract class Connection
|
||||
{
|
||||
private Stream $stream;
|
||||
|
||||
/**
|
||||
* @var callable
|
||||
*/
|
||||
private $onConnectCallback;
|
||||
|
||||
/**
|
||||
* @var callable
|
||||
*/
|
||||
private $onCloseCallback;
|
||||
protected Stream $stream;
|
||||
|
||||
public function __construct(
|
||||
/**
|
||||
* @readonly
|
||||
*/
|
||||
public string $address,
|
||||
protected ClientConfig $clientConfig,
|
||||
protected LoggerInterface $logger,
|
||||
private string $address,
|
||||
private ClientConfig $clientConfig,
|
||||
private LoggerInterface $logger,
|
||||
) {
|
||||
$this->stream = new NullStream();
|
||||
$this->onConnectCallback = static function (): void {
|
||||
};
|
||||
$this->onCloseCallback = static function (): void {
|
||||
};
|
||||
}
|
||||
|
||||
public function __destruct()
|
||||
{
|
||||
$this->close(false);
|
||||
}
|
||||
|
||||
public function isConnected(): bool
|
||||
{
|
||||
return !$this->stream instanceof NullStream;
|
||||
$this->close();
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public function connect(): Promise
|
||||
{
|
||||
@@ -72,32 +46,16 @@ abstract class Connection
|
||||
$buffer = new Buffer();
|
||||
|
||||
/** @var SocketStream $stream */
|
||||
$stream = yield SocketStream::connect(
|
||||
$this->address,
|
||||
$this->clientConfig->connectTimeout,
|
||||
$this->clientConfig->maxAttempts,
|
||||
$this->clientConfig->tcpNoDelay,
|
||||
);
|
||||
$stream = yield SocketStream::connect($this->address);
|
||||
|
||||
yield $stream->write(Command::magic());
|
||||
yield $stream->write(Command::identify($this->clientConfig->asNegotiationPayload()));
|
||||
yield $stream->write(Command::identify($this->clientConfig->toString()));
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->response($stream, $buffer);
|
||||
$serverConfig = ServerConfig::fromArray($response->toArray());
|
||||
$connectionConfig = ConnectionConfig::fromArray($response->toArray());
|
||||
|
||||
if ($serverConfig->tls) {
|
||||
yield $stream->setupTls();
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->response($stream, $buffer);
|
||||
|
||||
if (!$response->isOk()) {
|
||||
throw new NsqException();
|
||||
}
|
||||
}
|
||||
|
||||
if ($serverConfig->snappy) {
|
||||
if ($connectionConfig->snappy) {
|
||||
$stream = new SnappyStream($stream, $buffer->flush());
|
||||
|
||||
/** @var Response $response */
|
||||
@@ -108,8 +66,8 @@ abstract class Connection
|
||||
}
|
||||
}
|
||||
|
||||
if ($serverConfig->deflate) {
|
||||
$stream = new GzipStream($stream, $serverConfig->deflateLevel, $buffer->flush());
|
||||
if ($connectionConfig->deflate) {
|
||||
$stream = new GzipStream($stream);
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->response($stream, $buffer);
|
||||
@@ -119,7 +77,7 @@ abstract class Connection
|
||||
}
|
||||
}
|
||||
|
||||
if ($serverConfig->authRequired) {
|
||||
if ($connectionConfig->authRequired) {
|
||||
if (null === $this->clientConfig->authSecret) {
|
||||
throw new AuthenticationRequired();
|
||||
}
|
||||
@@ -133,103 +91,15 @@ abstract class Connection
|
||||
}
|
||||
|
||||
$this->stream = $stream;
|
||||
|
||||
($this->onConnectCallback)();
|
||||
});
|
||||
}
|
||||
|
||||
public function close(bool $graceful = true): void
|
||||
public function close(): void
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return;
|
||||
}
|
||||
// $this->stream->write(Command::cls());
|
||||
|
||||
$logger = $this->logger;
|
||||
[$stream, $this->stream] = [$this->stream, new NullStream()];
|
||||
|
||||
if ($graceful) {
|
||||
$this->logger->debug('Graceful disconnect.', [
|
||||
'class' => static::class,
|
||||
'address' => $this->address,
|
||||
]);
|
||||
|
||||
asyncCall(static function () use ($stream, $logger): \Generator {
|
||||
try {
|
||||
yield $stream->write(Command::cls());
|
||||
} catch (\Throwable $e) {
|
||||
$logger->warning($e->getMessage(), ['exception' => $e]);
|
||||
}
|
||||
|
||||
$stream->close();
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$stream->close();
|
||||
} catch (ClosedException) {
|
||||
}
|
||||
|
||||
($this->onCloseCallback)();
|
||||
}
|
||||
|
||||
public function onConnect(callable $callback): static
|
||||
{
|
||||
$previous = $this->onConnectCallback;
|
||||
$this->onConnectCallback = static function () use ($previous, $callback): void {
|
||||
$previous();
|
||||
$callback();
|
||||
};
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function onClose(callable $callback): static
|
||||
{
|
||||
$previous = $this->onCloseCallback;
|
||||
$this->onCloseCallback = static function () use ($previous, $callback): void {
|
||||
$previous();
|
||||
$callback();
|
||||
};
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<null|string>
|
||||
*/
|
||||
protected function read(): Promise
|
||||
{
|
||||
return call(function (): \Generator {
|
||||
try {
|
||||
return yield $this->stream->read();
|
||||
} catch (\Throwable $e) {
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
$this->close(false);
|
||||
|
||||
return new Failure($e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
*/
|
||||
protected function write(string $data): Promise
|
||||
{
|
||||
return call(function () use ($data): \Generator {
|
||||
try {
|
||||
return yield $this->stream->write($data);
|
||||
} catch (\Throwable $e) {
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
$this->close(false);
|
||||
|
||||
return new Failure($e);
|
||||
}
|
||||
});
|
||||
$this->stream->close();
|
||||
$this->stream = new NullStream();
|
||||
}
|
||||
|
||||
protected function handleError(Frame\Error $error): void
|
||||
@@ -244,7 +114,7 @@ abstract class Connection
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<Frame\Response>
|
||||
* @return Promise<Frame\Response>
|
||||
*/
|
||||
private function response(Stream $stream, Buffer $buffer): Promise
|
||||
{
|
||||
|
125
src/Consumer.php
125
src/Consumer.php
@@ -6,19 +6,15 @@ namespace Nsq;
|
||||
|
||||
use Amp\Failure;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Exception\ConsumerException;
|
||||
use Nsq\Frame\Response;
|
||||
use Nsq\Stream\NullStream;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\call;
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
final class Consumer extends Connection
|
||||
{
|
||||
private int $rdy = 0;
|
||||
@@ -29,35 +25,20 @@ final class Consumer extends Connection
|
||||
private $onMessage;
|
||||
|
||||
public function __construct(
|
||||
string $address,
|
||||
/**
|
||||
* @readonly
|
||||
*/
|
||||
public string $topic,
|
||||
/**
|
||||
* @readonly
|
||||
*/
|
||||
public string $channel,
|
||||
private string $address,
|
||||
private string $topic,
|
||||
private string $channel,
|
||||
callable $onMessage,
|
||||
ClientConfig $clientConfig,
|
||||
LoggerInterface $logger,
|
||||
private LoggerInterface $logger,
|
||||
) {
|
||||
parent::__construct(
|
||||
$address,
|
||||
$this->address,
|
||||
$clientConfig,
|
||||
$logger,
|
||||
$this->logger,
|
||||
);
|
||||
|
||||
$this->onMessage = $onMessage;
|
||||
|
||||
$context = compact('address', 'topic', 'channel');
|
||||
$this->onConnect(function () use ($context): void {
|
||||
$this->logger->debug('Consumer connected.', $context);
|
||||
});
|
||||
$this->onClose(function () use ($context): void {
|
||||
$this->logger->debug('Consumer disconnected.', $context);
|
||||
});
|
||||
$this->logger->debug('Consumer created.', $context);
|
||||
}
|
||||
|
||||
public static function create(
|
||||
@@ -80,7 +61,7 @@ final class Consumer extends Connection
|
||||
|
||||
public function connect(): Promise
|
||||
{
|
||||
if ($this->isConnected()) {
|
||||
if (!$this->stream instanceof NullStream) {
|
||||
return call(static function (): void {
|
||||
});
|
||||
}
|
||||
@@ -88,12 +69,18 @@ final class Consumer extends Connection
|
||||
return call(function (): \Generator {
|
||||
yield parent::connect();
|
||||
|
||||
$this->run();
|
||||
});
|
||||
}
|
||||
|
||||
private function run(): void
|
||||
{
|
||||
$buffer = new Buffer();
|
||||
|
||||
asyncCall(function () use ($buffer): \Generator {
|
||||
yield $this->write(Command::sub($this->topic, $this->channel));
|
||||
yield $this->stream->write(Command::sub($this->topic, $this->channel));
|
||||
|
||||
if (null !== ($chunk = yield $this->read())) {
|
||||
if (null !== ($chunk = yield $this->stream->read())) {
|
||||
$buffer->append($chunk);
|
||||
}
|
||||
|
||||
@@ -104,18 +91,18 @@ final class Consumer extends Connection
|
||||
return new Failure(new ConsumerException('Fail subscription.'));
|
||||
}
|
||||
|
||||
yield $this->rdy(1);
|
||||
yield $this->rdy(2500);
|
||||
|
||||
/** @phpstan-ignore-next-line */
|
||||
asyncCall(function () use ($buffer): \Generator {
|
||||
while (null !== $chunk = yield $this->read()) {
|
||||
while (null !== $chunk = yield $this->stream->read()) {
|
||||
$buffer->append($chunk);
|
||||
|
||||
while ($frame = Parser::parse($buffer)) {
|
||||
switch (true) {
|
||||
case $frame instanceof Frame\Response:
|
||||
if ($frame->isHeartBeat()) {
|
||||
yield $this->write(Command::nop());
|
||||
yield $this->stream->write(Command::nop());
|
||||
|
||||
break;
|
||||
}
|
||||
@@ -130,15 +117,10 @@ final class Consumer extends Connection
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if ($this->rdy !== $this->clientConfig->rdyCount) {
|
||||
yield $this->rdy($this->clientConfig->rdyCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$this->close(false);
|
||||
});
|
||||
$this->stream = new NullStream();
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -146,53 +128,32 @@ final class Consumer extends Connection
|
||||
/**
|
||||
* Update RDY state (indicate you are ready to receive N messages).
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public function rdy(int $count): Promise
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
}
|
||||
|
||||
if ($this->rdy === $count) {
|
||||
return new Success(true);
|
||||
return call(static function (): void {
|
||||
});
|
||||
}
|
||||
|
||||
$this->rdy = $count;
|
||||
|
||||
return call(function () use ($count): \Generator {
|
||||
try {
|
||||
yield $this->write(Command::rdy($count));
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
return $this->stream->write(Command::rdy($count));
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish a message (indicate successful processing).
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
* @return Promise<void>
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
public function fin(string $id): Promise
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
}
|
||||
--$this->rdy;
|
||||
|
||||
return call(function () use ($id): \Generator {
|
||||
try {
|
||||
yield $this->write(Command::fin($id));
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
return $this->stream->write(Command::fin($id));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -201,48 +162,26 @@ final class Consumer extends Connection
|
||||
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
|
||||
* behaves identically to an explicit REQ.
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
* @return Promise<void>
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
public function req(string $id, int $timeout): Promise
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
}
|
||||
--$this->rdy;
|
||||
|
||||
return call(function () use ($id, $timeout): \Generator {
|
||||
try {
|
||||
yield $this->write(Command::req($id, $timeout));
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
return $this->stream->write(Command::req($id, $timeout));
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the timeout for an in-flight message.
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
* @return Promise<void>
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
public function touch(string $id): Promise
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
}
|
||||
|
||||
return call(function () use ($id): \Generator {
|
||||
try {
|
||||
yield $this->write(Command::touch($id));
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
return $this->stream->write(Command::touch($id));
|
||||
}
|
||||
}
|
||||
|
@@ -1,18 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use Psr\Log\LogLevel;
|
||||
|
||||
final class LookupException extends NsqException
|
||||
{
|
||||
public function level(): string
|
||||
{
|
||||
return match ($this->getMessage()) {
|
||||
'TOPIC_NOT_FOUND' => LogLevel::DEBUG,
|
||||
default => LogLevel::WARNING,
|
||||
};
|
||||
}
|
||||
}
|
@@ -1,9 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
final class StreamException extends NsqException
|
||||
{
|
||||
}
|
@@ -6,9 +6,10 @@ namespace Nsq;
|
||||
|
||||
abstract class Frame
|
||||
{
|
||||
public const TYPE_RESPONSE = 0;
|
||||
public const TYPE_ERROR = 1;
|
||||
public const TYPE_MESSAGE = 2;
|
||||
public const TYPE_RESPONSE = 0,
|
||||
TYPE_ERROR = 1,
|
||||
TYPE_MESSAGE = 2
|
||||
;
|
||||
|
||||
public function __construct(public int $type)
|
||||
{
|
||||
|
@@ -30,7 +30,7 @@ final class Response extends Frame
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return array<mixed, mixed>
|
||||
* @return array<mixed, mixed>
|
||||
*/
|
||||
public function toArray(): array
|
||||
{
|
||||
|
274
src/Lookup.php
274
src/Lookup.php
@@ -1,274 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\Deferred;
|
||||
use Amp\Dns\DnsException;
|
||||
use Amp\Http\Client\DelegateHttpClient;
|
||||
use Amp\Http\Client\HttpClientBuilder;
|
||||
use Amp\Http\Client\Request;
|
||||
use Amp\Http\Client\Response;
|
||||
use Amp\NullCancellationToken;
|
||||
use Amp\Promise;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Config\LookupConfig;
|
||||
use Nsq\Exception\LookupException;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\call;
|
||||
use function Amp\delay;
|
||||
|
||||
final class Lookup
|
||||
{
|
||||
/**
|
||||
* @psalm-var array<string, array<string, Lookup\Producer>>
|
||||
*/
|
||||
private array $producers = [];
|
||||
|
||||
private array $consumers = [];
|
||||
|
||||
private array $running = [];
|
||||
|
||||
private array $topicWatchers = [];
|
||||
|
||||
public function __construct(
|
||||
private array $addresses,
|
||||
private LookupConfig $config,
|
||||
private LoggerInterface $logger,
|
||||
private DelegateHttpClient $httpClient,
|
||||
) {
|
||||
}
|
||||
|
||||
public static function create(
|
||||
string | array $address,
|
||||
LookupConfig $config = null,
|
||||
LoggerInterface $logger = null,
|
||||
DelegateHttpClient $httpClient = null,
|
||||
): self {
|
||||
return new self(
|
||||
(array) $address,
|
||||
$config ?? new LookupConfig(),
|
||||
$logger ?? new NullLogger(),
|
||||
$httpClient ?? HttpClientBuilder::buildDefault(),
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<Lookup\Producer[]>
|
||||
*/
|
||||
public function nodes(): Promise
|
||||
{
|
||||
return call(function (): \Generator {
|
||||
$requestHandler = function (string $uri): \Generator {
|
||||
/** @var Response $response */
|
||||
$response = yield $this->httpClient->request(new Request($uri.'/nodes'), new NullCancellationToken());
|
||||
|
||||
try {
|
||||
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
|
||||
} catch (LookupException $e) {
|
||||
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
|
||||
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
$promises = [];
|
||||
foreach ($this->addresses as $address) {
|
||||
$promises[$address] = call($requestHandler, $address);
|
||||
}
|
||||
|
||||
$nodes = [];
|
||||
/** @var Lookup\Response $response */
|
||||
foreach (yield $promises as $response) {
|
||||
foreach ($response->producers as $producer) {
|
||||
$nodes[$producer->toTcpUri()] = $producer;
|
||||
}
|
||||
}
|
||||
|
||||
return array_values($nodes);
|
||||
});
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
$this->producers = [];
|
||||
$this->consumers = [];
|
||||
$this->running = [];
|
||||
$this->topicWatchers = [];
|
||||
|
||||
$this->logger->info('Lookup stopped.');
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-suppress InvalidPropertyAssignmentValue
|
||||
*/
|
||||
public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void
|
||||
{
|
||||
if (null !== ($this->running[$topic][$channel] ?? null)) {
|
||||
throw new \InvalidArgumentException('Subscription already exists.');
|
||||
}
|
||||
|
||||
$this->running[$topic][$channel] = true;
|
||||
|
||||
asyncCall(function () use ($topic, $channel, $onMessage, $config): \Generator {
|
||||
while (true) {
|
||||
if (null === ($this->running[$topic][$channel] ?? null)) {
|
||||
return;
|
||||
}
|
||||
|
||||
/** @phpstan-ignore-next-line */
|
||||
$producers = $this->producers[$topic] ??= new Deferred();
|
||||
|
||||
if ($producers instanceof Deferred) {
|
||||
/** @var array<string, Lookup\Producer> $producers */
|
||||
$producers = yield $producers->promise();
|
||||
}
|
||||
|
||||
foreach (array_diff_key($this->consumers, $producers) as $address => $producer) {
|
||||
unset($this->consumers[$address]);
|
||||
}
|
||||
|
||||
foreach ($producers as $address => $producer) {
|
||||
if (null !== ($this->consumers[$address][$topic][$channel] ?? null)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->keepConnection(
|
||||
Consumer::create(
|
||||
$address,
|
||||
$topic,
|
||||
$channel,
|
||||
$onMessage,
|
||||
$config,
|
||||
$this->logger,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
yield delay($this->config->pollingInterval);
|
||||
}
|
||||
});
|
||||
|
||||
$this->watch($topic);
|
||||
|
||||
$this->logger->info('Subscribed.', compact('topic', 'channel'));
|
||||
}
|
||||
|
||||
public function unsubscribe(string $topic, string $channel): void
|
||||
{
|
||||
if (null === ($this->running[$topic][$channel] ?? null)) {
|
||||
$this->logger->debug('Not subscribed.', compact('topic', 'channel'));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
unset($this->running[$topic][$channel]);
|
||||
|
||||
if ([] === $this->running[$topic]) {
|
||||
unset($this->running[$topic]);
|
||||
}
|
||||
|
||||
$this->logger->info('Unsubscribed.', compact('topic', 'channel'));
|
||||
}
|
||||
|
||||
private function keepConnection(Consumer $consumer): void
|
||||
{
|
||||
$this->consumers[$consumer->address][$consumer->topic][$consumer->channel] = $consumer;
|
||||
|
||||
asyncCall(function () use ($consumer): \Generator {
|
||||
while (null !== ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
|
||||
try {
|
||||
yield $consumer->connect();
|
||||
} catch (DnsException $e) {
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
unset(
|
||||
$this->consumers[$consumer->address],
|
||||
$this->producers[$consumer->topic][$consumer->address],
|
||||
);
|
||||
|
||||
return;
|
||||
} catch (\Throwable $e) {
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
yield delay($this->config->pollingInterval);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
/** @phpstan-ignore-next-line */
|
||||
if (null === ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
|
||||
$consumer->close();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (!$consumer->isConnected()) {
|
||||
break;
|
||||
}
|
||||
|
||||
yield delay(500);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private function watch(string $topic): void
|
||||
{
|
||||
if (\array_key_exists($topic, $this->topicWatchers)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->topicWatchers[$topic] = true;
|
||||
|
||||
asyncCall(function () use ($topic): \Generator {
|
||||
$cancellationToken = new NullCancellationToken();
|
||||
$requestHandler = function (string $uri) use ($topic, $cancellationToken): \Generator {
|
||||
$this->logger->debug('Lookup', compact('topic'));
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->httpClient->request(new Request($uri.'/lookup?topic='.$topic), $cancellationToken);
|
||||
|
||||
try {
|
||||
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
|
||||
} catch (LookupException $e) {
|
||||
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
|
||||
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
while (\array_key_exists($topic, $this->running)) {
|
||||
$promises = [];
|
||||
foreach ($this->addresses as $address) {
|
||||
$promises[$address] = call($requestHandler, $address);
|
||||
}
|
||||
|
||||
/** @var Lookup\Response[] $responses */
|
||||
$responses = yield $promises;
|
||||
|
||||
$producers = [];
|
||||
foreach ($responses as $response) {
|
||||
foreach ($response->producers as $producer) {
|
||||
$producers[$producer->toTcpUri()] = $producer;
|
||||
}
|
||||
}
|
||||
|
||||
/** @phpstan-ignore-next-line */
|
||||
if (($deferred = ($this->producers[$topic] ?? null)) instanceof Deferred) {
|
||||
$deferred->resolve($producers);
|
||||
}
|
||||
$this->producers[$topic] = $producers;
|
||||
|
||||
yield delay($this->config->pollingInterval);
|
||||
}
|
||||
|
||||
unset($this->topicWatchers[$topic]);
|
||||
});
|
||||
}
|
||||
}
|
@@ -1,39 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Lookup;
|
||||
|
||||
final class Producer
|
||||
{
|
||||
public function __construct(
|
||||
public string $broadcastAddress,
|
||||
public string $hostname,
|
||||
public string $remoteAddress,
|
||||
public int $tcpPort,
|
||||
public int $httpPort,
|
||||
public string $version,
|
||||
public array $tombstones,
|
||||
public array $topics,
|
||||
) {
|
||||
}
|
||||
|
||||
public static function fromArray(array $array): self
|
||||
{
|
||||
return new self(
|
||||
$array['broadcast_address'],
|
||||
$array['hostname'],
|
||||
$array['remote_address'],
|
||||
$array['tcp_port'],
|
||||
$array['http_port'],
|
||||
$array['version'],
|
||||
$array['tombstones'] ?? [],
|
||||
$array['topics'] ?? [],
|
||||
);
|
||||
}
|
||||
|
||||
public function toTcpUri(): string
|
||||
{
|
||||
return sprintf('%s:%s', $this->broadcastAddress, $this->tcpPort);
|
||||
}
|
||||
}
|
@@ -1,34 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Lookup;
|
||||
|
||||
use Nsq\Exception\LookupException;
|
||||
|
||||
final class Response
|
||||
{
|
||||
/**
|
||||
* @param string[] $channels
|
||||
* @param Producer[] $producers
|
||||
*/
|
||||
public function __construct(
|
||||
public array $channels,
|
||||
public array $producers,
|
||||
) {
|
||||
}
|
||||
|
||||
public static function fromJson(string $json): self
|
||||
{
|
||||
$array = json_decode($json, true, flags: JSON_THROW_ON_ERROR);
|
||||
|
||||
if (\array_key_exists('message', $array)) {
|
||||
throw new LookupException($array['message']);
|
||||
}
|
||||
|
||||
return new self(
|
||||
$array['channels'] ?? [],
|
||||
array_map([Producer::class, 'fromArray'], $array['producers']),
|
||||
);
|
||||
}
|
||||
}
|
@@ -6,6 +6,7 @@ namespace Nsq;
|
||||
|
||||
use Amp\Promise;
|
||||
use Nsq\Exception\MessageException;
|
||||
use function Amp\call;
|
||||
|
||||
final class Message
|
||||
{
|
||||
@@ -31,51 +32,51 @@ final class Message
|
||||
);
|
||||
}
|
||||
|
||||
public function isProcessed(): bool
|
||||
{
|
||||
return $this->processed;
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<bool>
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public function finish(): Promise
|
||||
{
|
||||
$this->markAsProcessedOrFail();
|
||||
return call(function (): \Generator {
|
||||
if ($this->processed) {
|
||||
throw MessageException::processed($this);
|
||||
}
|
||||
|
||||
return $this->consumer->fin($this->id);
|
||||
yield $this->consumer->fin($this->id);
|
||||
|
||||
$this->processed = true;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-param positive-int|0 $timeout
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public function requeue(int $timeout): Promise
|
||||
{
|
||||
$this->markAsProcessedOrFail();
|
||||
return call(function () use ($timeout): \Generator {
|
||||
if ($this->processed) {
|
||||
throw MessageException::processed($this);
|
||||
}
|
||||
|
||||
return $this->consumer->req($this->id, $timeout);
|
||||
yield $this->consumer->req($this->id, $timeout);
|
||||
|
||||
$this->processed = true;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<bool>
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public function touch(): Promise
|
||||
{
|
||||
return call(function (): \Generator {
|
||||
if ($this->processed) {
|
||||
throw MessageException::processed($this);
|
||||
}
|
||||
|
||||
return $this->consumer->touch($this->id);
|
||||
}
|
||||
|
||||
private function markAsProcessedOrFail(): void
|
||||
{
|
||||
if ($this->processed) {
|
||||
throw MessageException::processed($this);
|
||||
}
|
||||
yield $this->consumer->touch($this->id);
|
||||
|
||||
$this->processed = true;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
103
src/Producer.php
103
src/Producer.php
@@ -5,38 +5,16 @@ declare(strict_types=1);
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Exception\NsqException;
|
||||
use Nsq\Stream\NullStream;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\call;
|
||||
|
||||
final class Producer extends Connection
|
||||
{
|
||||
public function __construct(
|
||||
string $address,
|
||||
ClientConfig $clientConfig,
|
||||
LoggerInterface $logger,
|
||||
) {
|
||||
parent::__construct(
|
||||
$address,
|
||||
$clientConfig,
|
||||
$logger,
|
||||
);
|
||||
|
||||
$context = compact('address');
|
||||
$this->onConnect(function () use ($context): void {
|
||||
$this->logger->debug('Producer connected.', $context);
|
||||
});
|
||||
$this->onClose(function () use ($context): void {
|
||||
$this->logger->debug('Producer disconnected.', $context);
|
||||
});
|
||||
$this->logger->debug('Producer created.', $context);
|
||||
}
|
||||
|
||||
public static function create(
|
||||
string $address,
|
||||
ClientConfig $clientConfig = null,
|
||||
@@ -51,7 +29,7 @@ final class Producer extends Connection
|
||||
|
||||
public function connect(): Promise
|
||||
{
|
||||
if ($this->isConnected()) {
|
||||
if (!$this->stream instanceof NullStream) {
|
||||
return call(static function (): void {
|
||||
});
|
||||
}
|
||||
@@ -59,17 +37,48 @@ final class Producer extends Connection
|
||||
return call(function (): \Generator {
|
||||
yield parent::connect();
|
||||
|
||||
$this->run();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, string>|string $body
|
||||
*
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public function publish(string $topic, string | array $body, int $delay = 0): Promise
|
||||
{
|
||||
if (0 < $delay) {
|
||||
return call(
|
||||
function (array $bodies) use ($topic, $delay): \Generator {
|
||||
foreach ($bodies as $body) {
|
||||
yield $this->stream->write(Command::dpub($topic, $body, $delay));
|
||||
}
|
||||
},
|
||||
(array) $body,
|
||||
);
|
||||
}
|
||||
|
||||
$command = \is_array($body)
|
||||
? Command::mpub($topic, $body)
|
||||
: Command::pub($topic, $body);
|
||||
|
||||
return $this->stream->write($command);
|
||||
}
|
||||
|
||||
private function run(): void
|
||||
{
|
||||
$buffer = new Buffer();
|
||||
|
||||
asyncCall(function () use ($buffer): \Generator {
|
||||
while (null !== $chunk = yield $this->read()) {
|
||||
while (null !== $chunk = yield $this->stream->read()) {
|
||||
$buffer->append($chunk);
|
||||
|
||||
while ($frame = Parser::parse($buffer)) {
|
||||
switch (true) {
|
||||
case $frame instanceof Frame\Response:
|
||||
if ($frame->isHeartBeat()) {
|
||||
yield $this->write(Command::nop());
|
||||
yield $this->stream->write(Command::nop());
|
||||
}
|
||||
|
||||
// Ok received
|
||||
@@ -84,47 +93,7 @@ final class Producer extends Connection
|
||||
}
|
||||
}
|
||||
|
||||
$this->close(false);
|
||||
$this->stream = new NullStream();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, string>|string $body
|
||||
*
|
||||
* @psalm-param positive-int|0 $delay
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
*/
|
||||
public function publish(string $topic, string | array $body, int $delay = null): Promise
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
}
|
||||
|
||||
return call(
|
||||
function (iterable $commands): \Generator {
|
||||
try {
|
||||
foreach ($commands as $command) {
|
||||
yield $this->write($command);
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
},
|
||||
(static function () use ($topic, $body, $delay): \Generator {
|
||||
if (\is_array($body) && null === $delay) {
|
||||
yield Command::mpub($topic, $body);
|
||||
} elseif (null !== $delay) {
|
||||
foreach ((array) $body as $content) {
|
||||
yield Command::dpub($topic, $content, $delay);
|
||||
}
|
||||
} else {
|
||||
yield Command::pub($topic, $body);
|
||||
}
|
||||
})(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@@ -4,23 +4,19 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\ByteStream\ClosedException;
|
||||
use Amp\Promise;
|
||||
|
||||
interface Stream
|
||||
{
|
||||
/**
|
||||
* @psalm-return Promise<null|string>
|
||||
* @return Promise<null|string>
|
||||
*/
|
||||
public function read(): Promise;
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public function write(string $data): Promise;
|
||||
|
||||
/**
|
||||
* @throws ClosedException
|
||||
*/
|
||||
public function close(): void;
|
||||
}
|
||||
|
@@ -5,38 +5,14 @@ declare(strict_types=1);
|
||||
namespace Nsq\Stream;
|
||||
|
||||
use Amp\Promise;
|
||||
use Nsq\Buffer;
|
||||
use Nsq\Exception\StreamException;
|
||||
use Nsq\Exception\NsqException;
|
||||
use Nsq\Stream;
|
||||
|
||||
use function Amp\call;
|
||||
|
||||
class GzipStream implements Stream
|
||||
{
|
||||
private ?\InflateContext $inflate = null;
|
||||
|
||||
private ?\DeflateContext $deflate = null;
|
||||
|
||||
private Buffer $buffer;
|
||||
|
||||
public function __construct(private Stream $stream, private int $level, string $bytes = '')
|
||||
public function __construct(private Stream $stream)
|
||||
{
|
||||
/** @var false|\InflateContext $inflate */
|
||||
$inflate = @inflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
|
||||
/** @var \DeflateContext|false $deflate */
|
||||
$deflate = @deflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
|
||||
|
||||
if (false === $inflate) {
|
||||
throw new StreamException('Failed initializing inflate context');
|
||||
}
|
||||
|
||||
if (false === $deflate) {
|
||||
throw new StreamException('Failed initializing deflate context');
|
||||
}
|
||||
|
||||
$this->inflate = $inflate;
|
||||
$this->deflate = $deflate;
|
||||
$this->buffer = new Buffer($bytes);
|
||||
throw new NsqException('GzipStream not implemented yet.');
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -44,33 +20,7 @@ class GzipStream implements Stream
|
||||
*/
|
||||
public function read(): Promise
|
||||
{
|
||||
return call(function (): \Generator {
|
||||
if (null === $this->inflate) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if ($this->buffer->empty()) {
|
||||
$chunk = yield $this->stream->read();
|
||||
|
||||
if (null !== $chunk) {
|
||||
$this->buffer->append($chunk);
|
||||
}
|
||||
}
|
||||
|
||||
$data = $this->buffer->flush();
|
||||
|
||||
if ('' === $data) {
|
||||
return null;
|
||||
}
|
||||
/** @psalm-suppress UndefinedFunction,InvalidArgument */
|
||||
$decompressed = inflate_add($this->inflate, $data, ZLIB_SYNC_FLUSH);
|
||||
|
||||
if (false === $decompressed) {
|
||||
throw new StreamException('Failed adding data to deflate context');
|
||||
}
|
||||
|
||||
return $decompressed;
|
||||
});
|
||||
return $this->stream->read();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -78,27 +28,11 @@ class GzipStream implements Stream
|
||||
*/
|
||||
public function write(string $data): Promise
|
||||
{
|
||||
if (null === $this->deflate) {
|
||||
throw new StreamException('The stream has already been closed');
|
||||
return $this->stream->write($data);
|
||||
}
|
||||
|
||||
/** @psalm-suppress UndefinedFunction,InvalidArgument */
|
||||
$compressed = deflate_add($this->deflate, $data, ZLIB_SYNC_FLUSH);
|
||||
|
||||
if (false === $compressed) {
|
||||
throw new StreamException('Failed adding data to deflate context');
|
||||
}
|
||||
|
||||
return $this->stream->write($compressed);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public function close(): void
|
||||
{
|
||||
$this->stream->close();
|
||||
$this->inflate = null;
|
||||
$this->deflate = null;
|
||||
}
|
||||
}
|
||||
|
@@ -4,11 +4,10 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Stream;
|
||||
|
||||
use Amp\Failure;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use Nsq\Exception\NsqException;
|
||||
use Nsq\Stream;
|
||||
use function Amp\call;
|
||||
|
||||
final class NullStream implements Stream
|
||||
{
|
||||
@@ -25,7 +24,8 @@ final class NullStream implements Stream
|
||||
*/
|
||||
public function write(string $data): Promise
|
||||
{
|
||||
return new Failure(new NsqException('Connection closed.'));
|
||||
return call(static function (): void {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@@ -8,25 +8,24 @@ use Amp\Promise;
|
||||
use Nsq\Buffer;
|
||||
use Nsq\Exception\SnappyException;
|
||||
use Nsq\Stream;
|
||||
|
||||
use function Amp\call;
|
||||
|
||||
class SnappyStream implements Stream
|
||||
{
|
||||
private const IDENTIFIER = [0xFF, 0x06, 0x00, 0x00, 0x73, 0x4E, 0x61, 0x50, 0x70, 0x59];
|
||||
private const IDENTIFIER = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
|
||||
private const SIZE_HEADER = 4;
|
||||
private const SIZE_CHECKSUM = 4;
|
||||
private const SIZE_CHUNK = 65536;
|
||||
private const TYPE_IDENTIFIER = 0xFF;
|
||||
private const TYPE_IDENTIFIER = 0xff;
|
||||
private const TYPE_COMPRESSED = 0x00;
|
||||
private const TYPE_UNCOMPRESSED = 0x01;
|
||||
private const TYPE_PADDING = 0xFE;
|
||||
private const TYPE_PADDING = 0xfe;
|
||||
|
||||
private Buffer $buffer;
|
||||
|
||||
public function __construct(private Stream $stream, string $bytes = '')
|
||||
{
|
||||
if (!\function_exists('snappy_uncompress') || !\function_exists('snappy_compress')) {
|
||||
if (!\function_exists('snappy_uncompress')) {
|
||||
throw SnappyException::notInstalled();
|
||||
}
|
||||
|
||||
@@ -46,7 +45,7 @@ class SnappyStream implements Stream
|
||||
$type = $this->buffer->readUInt32LE();
|
||||
|
||||
$size = $type >> 8;
|
||||
$type &= 0xFF;
|
||||
$type &= 0xff;
|
||||
|
||||
while ($this->buffer->size() < $size && null !== ($chunk = yield $this->stream->read())) {
|
||||
$this->buffer->append($chunk);
|
||||
@@ -60,7 +59,6 @@ class SnappyStream implements Stream
|
||||
case self::TYPE_COMPRESSED:
|
||||
$this->buffer->discard(self::SIZE_CHECKSUM);
|
||||
|
||||
/** @psalm-suppress UndefinedFunction */
|
||||
return snappy_uncompress($this->buffer->consume($size - self::SIZE_HEADER));
|
||||
case self::TYPE_UNCOMPRESSED:
|
||||
$this->buffer->discard(self::SIZE_CHECKSUM);
|
||||
@@ -80,7 +78,6 @@ class SnappyStream implements Stream
|
||||
public function write(string $data): Promise
|
||||
{
|
||||
return call(function () use ($data): Promise {
|
||||
/** @var string $result */
|
||||
$result = pack('CCCCCCCCCC', ...self::IDENTIFIER);
|
||||
|
||||
foreach (str_split($data, self::SIZE_CHUNK) as $chunk) {
|
||||
@@ -96,27 +93,23 @@ class SnappyStream implements Stream
|
||||
$this->stream->close();
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-suppress PossiblyFalseArgument
|
||||
*/
|
||||
private function compress(string $uncompressed): string
|
||||
{
|
||||
/** @psalm-suppress UndefinedFunction */
|
||||
$compressed = snappy_compress($uncompressed);
|
||||
|
||||
\assert(\is_string($compressed));
|
||||
|
||||
[$type, $data] = \strlen($compressed) <= 0.875 * \strlen($uncompressed)
|
||||
? [self::TYPE_COMPRESSED, $compressed]
|
||||
: [self::TYPE_UNCOMPRESSED, $uncompressed];
|
||||
|
||||
/** @psalm-suppress PossiblyFalseArgument */
|
||||
$unpacked = unpack('N', hash('crc32c', $uncompressed, true));
|
||||
\assert(\is_array($unpacked));
|
||||
|
||||
$checksum = $unpacked[1];
|
||||
$checksum = (($checksum >> 15) | ($checksum << 17)) + 0xA282EAD8 & 0xFFFFFFFF;
|
||||
/** @phpstan-ignore-next-line */
|
||||
$checksum = unpack('N', hash('crc32c', $uncompressed, true))[1];
|
||||
$checksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff;
|
||||
|
||||
$size = (\strlen($data) + 4) << 8;
|
||||
|
||||
/** @psalm-suppress PossiblyFalseOperand */
|
||||
return pack('VV', $type + $size, $checksum).$data;
|
||||
}
|
||||
}
|
||||
|
@@ -5,22 +5,20 @@ declare(strict_types=1);
|
||||
namespace Nsq\Stream;
|
||||
|
||||
use Amp\Promise;
|
||||
use Amp\Socket\ClientTlsContext;
|
||||
use Amp\Socket\ConnectContext;
|
||||
use Amp\Socket\EncryptableSocket;
|
||||
use Amp\Socket\Socket;
|
||||
use Nsq\Stream;
|
||||
|
||||
use function Amp\call;
|
||||
use function Amp\Socket\connect;
|
||||
|
||||
class SocketStream implements Stream
|
||||
{
|
||||
public function __construct(private EncryptableSocket $socket)
|
||||
public function __construct(private Socket $socket)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<self>
|
||||
* @return Promise<self>
|
||||
*/
|
||||
public static function connect(string $uri, int $timeout = 0, int $attempts = 0, bool $noDelay = false): Promise
|
||||
{
|
||||
@@ -39,17 +37,12 @@ class SocketStream implements Stream
|
||||
$context = $context->withTcpNoDelay();
|
||||
}
|
||||
|
||||
$context = $context->withTlsContext(
|
||||
(new ClientTlsContext(''))
|
||||
->withoutPeerVerification(),
|
||||
);
|
||||
|
||||
return new self(yield connect($uri, $context));
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<null|string>
|
||||
* @return Promise<null|string>
|
||||
*/
|
||||
public function read(): Promise
|
||||
{
|
||||
@@ -57,26 +50,15 @@ class SocketStream implements Stream
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public function write(string $data): Promise
|
||||
{
|
||||
return $this->socket->write($data);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public function close(): void
|
||||
{
|
||||
$this->socket->close();
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
*/
|
||||
public function setupTls(): Promise
|
||||
{
|
||||
return $this->socket->setupTls();
|
||||
}
|
||||
}
|
||||
|
@@ -7,11 +7,6 @@ use PHPUnit\Framework\TestCase;
|
||||
|
||||
final class ClientConfigTest extends TestCase
|
||||
{
|
||||
public function testNegotiationPayload(): void
|
||||
{
|
||||
self::assertJson((new ClientConfig())->asNegotiationPayload());
|
||||
}
|
||||
|
||||
public function testInvalidCompression(): void
|
||||
{
|
||||
$this->expectException(InvalidArgumentException::class);
|
||||
@@ -19,42 +14,4 @@ final class ClientConfigTest extends TestCase
|
||||
|
||||
new ClientConfig(deflate: true, snappy: true);
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider array
|
||||
*/
|
||||
public function testFromArray(array $data, array $expected): void
|
||||
{
|
||||
self::assertSame($expected, get_object_vars(ClientConfig::fromArray($data)));
|
||||
}
|
||||
|
||||
public function array(): Generator
|
||||
{
|
||||
$default = get_object_vars(new ClientConfig());
|
||||
|
||||
yield 'Empty array' => [[], $default];
|
||||
|
||||
yield 'With wrong keys' => [['bla' => 'bla'], $default];
|
||||
|
||||
$custom = [
|
||||
'authSecret' => 'SomeSecret',
|
||||
'connectTimeout' => 100,
|
||||
'maxAttempts' => 10,
|
||||
'tcpNoDelay' => true,
|
||||
'rdyCount' => 1,
|
||||
'featureNegotiation' => true,
|
||||
'clientId' => 'SomeGorgeousClientId',
|
||||
'deflate' => true,
|
||||
'deflateLevel' => 1,
|
||||
'heartbeatInterval' => 31111,
|
||||
'hostname' => gethostname(),
|
||||
'msgTimeout' => 59999,
|
||||
'sampleRate' => 25,
|
||||
'tls' => true,
|
||||
'snappy' => false,
|
||||
'userAgent' => 'nsqphp/test',
|
||||
];
|
||||
|
||||
yield 'Full filled' => [$custom, $custom];
|
||||
}
|
||||
}
|
||||
|
@@ -55,7 +55,6 @@ final class MessageTest extends TestCase
|
||||
*/
|
||||
public function messages(): Generator
|
||||
{
|
||||
/** @phpstan-ignore-next-line */
|
||||
$consumer = $this->createMock(Consumer::class);
|
||||
$consumer->method('fin')->willReturn(new Success());
|
||||
$consumer->method('touch')->willReturn(new Success());
|
||||
|
@@ -1,37 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use Nsq\Config\ClientConfig;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
final class NsqTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @dataProvider configs
|
||||
*/
|
||||
public function test(ClientConfig $clientConfig): void
|
||||
{
|
||||
self::markTestSkipped('');
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Generator<string, array<int, ClientConfig>>
|
||||
*/
|
||||
public function configs(): Generator
|
||||
{
|
||||
yield 'default' => [
|
||||
new ClientConfig(
|
||||
heartbeatInterval: 3000,
|
||||
snappy: false,
|
||||
),
|
||||
];
|
||||
|
||||
yield 'snappy' => [
|
||||
new ClientConfig(
|
||||
heartbeatInterval: 3000,
|
||||
snappy: true,
|
||||
),
|
||||
];
|
||||
}
|
||||
}
|
@@ -1,95 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use Symfony\Component\Filesystem\Filesystem;
|
||||
use Symfony\Component\Process\Process;
|
||||
|
||||
final class Nsqd
|
||||
{
|
||||
public Process $process;
|
||||
|
||||
public string $address;
|
||||
|
||||
private static Filesystem $fs;
|
||||
|
||||
public function __construct(
|
||||
public readonly string $dataPath,
|
||||
public readonly int $httpPort,
|
||||
public readonly int $tcpPort,
|
||||
) {
|
||||
self::$fs ??= new Filesystem();
|
||||
self::$fs->mkdir($this->dataPath);
|
||||
|
||||
$nsqd = new Process([
|
||||
'./nsqd',
|
||||
sprintf('-data-path=%s', $this->dataPath),
|
||||
sprintf('-http-address=0.0.0.0:%s', $this->httpPort),
|
||||
sprintf('-tcp-address=0.0.0.0:%s', $this->tcpPort),
|
||||
'-log-level=debug',
|
||||
], dirname(__DIR__).'/bin');
|
||||
|
||||
$nsqd->start();
|
||||
|
||||
while (false === @fsockopen('localhost', $this->tcpPort)) {
|
||||
if (!$nsqd->isRunning()) {
|
||||
throw new RuntimeException($nsqd->getErrorOutput());
|
||||
}
|
||||
|
||||
usleep(10000);
|
||||
}
|
||||
|
||||
$this->process = $nsqd;
|
||||
$this->address = sprintf('tcp://localhost:%s', $this->tcpPort);
|
||||
}
|
||||
|
||||
public static function create(): self
|
||||
{
|
||||
do {
|
||||
$dir = sprintf('/tmp/%s', bin2hex(random_bytes(5)));
|
||||
} while (is_dir($dir));
|
||||
|
||||
return new self(
|
||||
$dir,
|
||||
findFreePort(),
|
||||
findFreePort(),
|
||||
);
|
||||
}
|
||||
|
||||
public function tail(string $topic, string $channel, int $messages): Process
|
||||
{
|
||||
$tail = new Process(
|
||||
[
|
||||
'./nsq_tail',
|
||||
sprintf('-nsqd-tcp-address=localhost:%s', $this->tcpPort),
|
||||
sprintf('-topic=%s', $topic),
|
||||
sprintf('-channel=%s', $channel),
|
||||
sprintf('-n=%s', $messages),
|
||||
'-print-topic',
|
||||
],
|
||||
dirname(__DIR__).'/bin',
|
||||
timeout: 10,
|
||||
);
|
||||
|
||||
$tail->start();
|
||||
|
||||
return $tail;
|
||||
}
|
||||
|
||||
public function __destruct()
|
||||
{
|
||||
$this->process->stop();
|
||||
self::$fs->remove($this->dataPath);
|
||||
}
|
||||
}
|
||||
|
||||
function findFreePort(): int
|
||||
{
|
||||
$sock = socket_create_listen(0);
|
||||
assert($sock instanceof \Socket);
|
||||
|
||||
socket_getsockname($sock, $addr, $port);
|
||||
socket_close($sock);
|
||||
|
||||
return $port;
|
||||
}
|
@@ -3,66 +3,42 @@
|
||||
declare(strict_types=1);
|
||||
|
||||
use Amp\Loop;
|
||||
use Amp\Process\Process;
|
||||
use Nsq\Exception\ServerException;
|
||||
use Nsq\Producer;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
use function Amp\ByteStream\buffer;
|
||||
use function Amp\Promise\wait;
|
||||
|
||||
final class ProducerTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @dataProvider bodies
|
||||
* @param array<int, string>|string $body
|
||||
*
|
||||
* @dataProvider data
|
||||
*/
|
||||
public function testPublish(string $body): void
|
||||
public function testPublish(array | string $body, string $expected): void
|
||||
{
|
||||
$nsqd = Nsqd::create();
|
||||
$tail = $nsqd->tail('test', 'test', 1);
|
||||
|
||||
$producer = Producer::create($nsqd->address);
|
||||
$process = new Process(
|
||||
sprintf('bin/nsq_tail -topic %s -channel default -nsqd-tcp-address localhost:4150 -n 1', __FUNCTION__),
|
||||
);
|
||||
wait($process->start());
|
||||
|
||||
$producer = Producer::create('tcp://localhost:4150');
|
||||
wait($producer->connect());
|
||||
self::assertTrue(wait($producer->publish('test', $body)));
|
||||
wait($producer->publish(__FUNCTION__, $body));
|
||||
|
||||
self::assertSame(0, $tail->wait());
|
||||
wait($process->join());
|
||||
|
||||
self::assertSame("test | {$body}", trim($tail->getOutput()));
|
||||
self::assertSame($expected, wait(buffer($process->getStdout())));
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider bodies
|
||||
* @return Generator<int, array{0: string|array, 1: string}>
|
||||
*/
|
||||
public function testPublishMultiple(string $body): void
|
||||
public function data(): Generator
|
||||
{
|
||||
$nsqd = Nsqd::create();
|
||||
$tail = $nsqd->tail('test', 'test', 2);
|
||||
|
||||
$producer = Producer::create($nsqd->address);
|
||||
|
||||
wait($producer->connect());
|
||||
self::assertTrue(wait($producer->publish('test', [$body, $body])));
|
||||
|
||||
self::assertSame(0, $tail->wait());
|
||||
|
||||
self::assertSame("test | {$body}\ntest | {$body}", trim($tail->getOutput()));
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider bodies
|
||||
*/
|
||||
public function testPublishDeferred(string $body): void
|
||||
{
|
||||
$nsqd = Nsqd::create();
|
||||
$tail = $nsqd->tail('test', 'test', 1);
|
||||
|
||||
$producer = Producer::create($nsqd->address);
|
||||
|
||||
wait($producer->connect());
|
||||
self::assertTrue(wait($producer->publish('test', $body, 1)));
|
||||
|
||||
self::assertSame(0, $tail->wait());
|
||||
|
||||
self::assertSame("test | {$body}", trim($tail->getOutput()));
|
||||
yield ['Test Message One!', 'Test Message One!'.PHP_EOL];
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -70,12 +46,10 @@ final class ProducerTest extends TestCase
|
||||
*/
|
||||
public function testPubFail(string $topic, string $body, string $exceptionMessage): void
|
||||
{
|
||||
$nsqd = Nsqd::create();
|
||||
|
||||
$this->expectException(ServerException::class);
|
||||
$this->expectExceptionMessage($exceptionMessage);
|
||||
|
||||
$producer = Producer::create($nsqd->address);
|
||||
$producer = Producer::create('tcp://localhost:4150');
|
||||
|
||||
Loop::run(static function () use ($producer, $topic, $body): Generator {
|
||||
yield $producer->connect();
|
||||
@@ -84,15 +58,12 @@ final class ProducerTest extends TestCase
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Generator<string, array>
|
||||
*/
|
||||
public function pubFails(): Generator
|
||||
{
|
||||
yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0'];
|
||||
yield 'Invalid topic' => ['test$%^&', '', 'E_BAD_TOPIC PUB topic name "test$%^&" is not valid'];
|
||||
}
|
||||
|
||||
public function bodies(): Generator
|
||||
{
|
||||
yield 'Simple Body' => ['Simple Body'];
|
||||
yield 'Body with special chars' => ['test$%^&'];
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user