60 Commits

Author SHA1 Message Date
596d1cb77d Readme: fix main branch to master 2022-09-12 18:03:29 +03:00
d16da5de97 Rename tests to phpunit 2022-09-12 18:03:14 +03:00
4eeb3e424c Explode ci.yaml workflow 2022-09-12 17:59:08 +03:00
d79c491578 Tests with nsqd and utilities 2022-09-12 17:59:08 +03:00
d7a289d5c9 Readme: add telegram badge 2022-09-11 23:15:35 +03:00
350f08c2c1 Fix and suppress static analyze errors 2022-09-11 23:07:46 +03:00
bd8d13692f Remove infection, revert dg/bypass-finals 2022-09-11 22:15:11 +03:00
9414042f57 Bump min req php ^8.1 2022-09-11 22:10:39 +03:00
c88fdb6354 Bump dependencies 2022-09-11 22:05:17 +03:00
3bf8444e06 tests: add testNegotiationPayload 2022-09-11 22:03:37 +03:00
d4b29c69db Fix some psalm/phpstan errors 2021-09-19 02:34:53 +03:00
381ba88f8d Simplify discovery producer example 2021-09-19 02:13:11 +03:00
43ab797ee0 Add Makefile 2021-09-19 02:10:09 +03:00
45048320a8 Lookup: graceful shutdown 2021-09-19 01:57:53 +03:00
be696f17b5 Lookup: reconnections 2021-09-19 01:30:30 +03:00
d0307b47e6 Fix wrong return type 2021-09-19 00:44:31 +03:00
bac144582e Use psalm-return instead of return for generics 2021-09-19 00:44:20 +03:00
5e43f9b0df Prevent multiple watchers for one topic 2021-09-19 00:42:26 +03:00
e1725ea140 Return bool from public api of Producer and Consumer to indicate of success of process and prevent throwing exception from it 2021-09-19 00:42:26 +03:00
b130b09a82 Decrease default ready count to 100 2021-09-18 23:17:53 +03:00
155e896543 Unset previous producers on lookup newer 2021-09-18 23:01:56 +03:00
2c07fb1aca Remove unnecessary Producer::run and Consumer::run 2021-09-18 22:55:44 +03:00
97b3d8206c Refactor logging 2021-09-18 22:55:44 +03:00
c8cd41777f Close connection on error while read or write 2021-09-18 22:46:03 +03:00
fcd1f256ff Add Connection::onCallback, chain callbacks onClose 2021-09-18 22:39:43 +03:00
530b03974e Graceful close connection 2021-09-18 22:17:43 +03:00
083bc44c9c Decouple Consumer and Producer from direct call Streams 2021-09-18 22:17:43 +03:00
aa3333bfba Stream::close can throw Amp\ByteStream\ClosedException 2021-09-18 22:14:56 +03:00
679573ad0a update readme 2021-09-15 01:54:35 +03:00
32f226942e Add discovery examples 2021-09-15 01:53:20 +03:00
dbe312ddf1 Lookup fix cancelling topic watcher 2021-09-15 01:35:26 +03:00
47194b30f3 Refactor Lookup 2021-09-15 01:26:47 +03:00
5bab748952 Add Lookup/Producer::toTcpUri 2021-09-15 01:26:20 +03:00
43b92e9bb9 cs fix 2021-09-15 01:26:06 +03:00
c505d62533 Add logging on Consumer disconnected 2021-09-15 01:26:00 +03:00
f3f67bedd3 add Connection::onClose 2021-09-15 01:25:47 +03:00
af4e86d219 php-cs-fixer add types_spaces rule 2021-09-15 01:25:27 +03:00
53d7813198 Add Connection::isConnected() 2021-09-13 23:49:52 +03:00
56cdda1a0d Add some logs 2021-09-13 23:49:15 +03:00
7984d09e83 Return Failure on try to write to NullStream 2021-09-13 23:48:17 +03:00
3c7686405d Dynamic log level on LookupException 2021-09-13 23:47:45 +03:00
6428a1ec33 Make Lookup/{Response,Producer} compatible to /nodes endpoint 2021-09-13 23:47:20 +03:00
65adecde3f Leave one nsq service in favor of use docker-compose scale 2021-09-13 23:46:24 +03:00
e3e83212c4 deflate 2021-09-04 01:56:49 +03:00
ca2c2ee633 Lookup (#14) 2021-09-04 01:55:34 +03:00
e9dce19e25 Fix: Message::touch must not mark message as processed 2021-07-08 18:35:31 +03:00
a913fb0907 Add rdyCount to ClientConfig 2021-07-08 17:41:59 +03:00
6c8c30a1bd Bump ergebnis/composer-normalize to 2.15 2021-06-17 18:23:56 +03:00
a08bccac45 phpstan: ignore Return type of call to method PHPUnit\Framework\TestCase::createMock() contains unresolvable type. 2021-06-17 16:28:24 +03:00
55480ab2c0 Bump friendsofphp/php-cs-fixer up to 3.0 2021-06-17 16:21:26 +03:00
4546c5085f Bump infection/infection up to 0.23 2021-06-17 16:18:55 +03:00
b6f4726002 Fix: ClientConfig::$connectionTimeout is a milliseconds 2021-06-17 15:46:51 +03:00
e3c64f6f09 tls 2021-06-09 20:08:15 +03:00
411fabb1f5 Fix: get default settings from real object 2021-06-09 18:20:46 +03:00
34847e2467 Mark ServerConfig as internal 2021-06-09 18:17:00 +03:00
6e50fa2258 Refactoring configs. Use connections settings on establishing connection. Create ClientConfig from array. 2021-06-09 18:15:45 +03:00
ca54c7ad80 Add symfony/var-dumper for dev 2021-06-09 18:13:17 +03:00
4c00fb0fd5 Message::requeue $timeout must be positive-int or zero 2021-06-09 17:08:46 +03:00
d3e1788d23 Refactoring: Simplify Message methods, add isProcessed method. 2021-06-09 16:33:18 +03:00
2fc7e37120 Producer::publish $delay must be positive-int or zero 2021-06-09 15:34:29 +03:00
43 changed files with 1643 additions and 488 deletions

1
.env Normal file
View File

@ -0,0 +1 @@
NSQ_VERSION=1.2.1

View File

@ -1,205 +0,0 @@
name: CI
on:
- pull_request
- push
jobs:
tests:
name: Tests
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
php:
- '8.0'
dependencies:
- lowest
- highest
services:
nsqd:
image: nsqio/nsq:v1.2.0
options: --entrypoint /nsqd
ports:
- 4150:4150
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php }}
coverage: pcov
extensions: kjdev/php-ext-snappy@0.2.1
env:
update: true
- name: Setup Problem Matchers for PHPUnit
run: echo "::add-matcher::${{ runner.tool_cache }}/phpunit.json"
- name: Determine Composer cache directory
id: composer-cache
run: echo "::set-output name=directory::$(composer config cache-dir)"
- name: Cache Composer dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.directory }}
key: ${{ runner.os }}-${{ matrix.php }}-composer-${{ matrix.dependencies }}-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-${{ matrix.php }}-${{ matrix.dependencies }}-composer-
- name: Install highest dependencies
run: composer update --no-progress --no-interaction --prefer-dist
if: ${{ matrix.dependencies == 'highest' }}
- name: Install lowest dependencies
run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest
if: ${{ matrix.dependencies == 'lowest' }}
- name: Run tests
run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml
- name: Upload code coverage
uses: codecov/codecov-action@v1
with:
file: build/coverage-report.xml
php-cs-fixer:
name: PHP-CS-Fixer
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.0'
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
run: composer cs-check
phpstan:
name: PHPStan
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.0'
extensions: snappy-kjdev/php-ext-snappy@0.2.1
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
run: composer phpstan
psalm:
name: Psalm
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.0'
extensions: snappy-kjdev/php-ext-snappy@0.2.1
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
run: vendor/bin/psalm --output-format=github
infection:
name: Infection
runs-on: ubuntu-latest
services:
nsqd:
image: nsqio/nsq:v1.2.0
options: --entrypoint /nsqd
ports:
- 4150:4150
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.0'
coverage: pcov
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
env:
STRYKER_DASHBOARD_API_KEY: ${{ secrets.STRYKER_DASHBOARD_API_KEY }}
run: |
git fetch --depth=1 origin $GITHUB_BASE_REF
php vendor/bin/infection -j2 --git-diff-filter=A --git-diff-base=origin/$GITHUB_BASE_REF --logger-github --ignore-msi-with-no-mutations --only-covered

37
.github/workflows/code_style.yaml vendored Normal file
View File

@ -0,0 +1,37 @@
name: Code Style
on:
- pull_request
- push
jobs:
php-cs-fixer:
name: PHP-CS-Fixer
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.1'
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
run: composer cs-check

71
.github/workflows/phpunit.yaml vendored Normal file
View File

@ -0,0 +1,71 @@
name: phpunit
on:
- pull_request
- push
jobs:
phpunit:
name: phpunit
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
php:
- '8.1'
nsq:
- nsq-1.2.0.linux-amd64.go1.12.9
- nsq-1.2.1.linux-amd64.go1.16.6
dependencies:
- lowest
- highest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php }}
coverage: pcov
extensions: kjdev/php-ext-snappy@0.2.1
env:
update: true
- name: Download NSQ
run: |
curl -sSL "http://bitly-downloads.s3.amazonaws.com/nsq/${{ matrix.nsq }}.tar.gz" \
| tar -xzv --strip-components=1
./bin/nsqd --version
- name: Setup Problem Matchers for PHPUnit
run: echo "::add-matcher::${{ runner.tool_cache }}/phpunit.json"
- name: Determine Composer cache directory
id: composer-cache
run: echo "::set-output name=directory::$(composer config cache-dir)"
- name: Cache Composer dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.directory }}
key: ${{ runner.os }}-${{ matrix.php }}-composer-${{ matrix.dependencies }}-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-${{ matrix.php }}-${{ matrix.dependencies }}-composer-
- name: Install highest dependencies
run: composer update --no-progress --no-interaction --prefer-dist
if: ${{ matrix.dependencies == 'highest' }}
- name: Install lowest dependencies
run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest
if: ${{ matrix.dependencies == 'lowest' }}
- name: Run tests
run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml
- name: Upload code coverage
uses: codecov/codecov-action@v1
with:
file: build/coverage-report.xml

70
.github/workflows/static_analyze.yaml vendored Normal file
View File

@ -0,0 +1,70 @@
name: Static Analyze
on:
- pull_request
- push
jobs:
phpstan:
name: PHPStan
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.1'
extensions: snappy-kjdev/php-ext-snappy@0.2.1
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
run: composer phpstan
psalm:
name: Psalm
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.1'
extensions: snappy-kjdev/php-ext-snappy@0.2.1
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
run: vendor/bin/psalm --output-format=github

13
.gitignore vendored
View File

@ -1,6 +1,17 @@
/vendor/
/composer.lock
/.php_cs.cache
/.php-cs-fixer.cache
/.phpunit.result.cache
/infection.log
# Nsq
bin/nsq_stat
bin/nsq_tail
bin/nsq_to_file
bin/nsq_to_http
bin/nsq_to_nsq
bin/nsqadmin
bin/nsqd
bin/nsqlookupd
bin/to_nsq

45
.php-cs-fixer.php Normal file
View File

@ -0,0 +1,45 @@
<?php
$finder = PhpCsFixer\Finder::create()
->in(__DIR__)
->exclude('vendor');
return (new PhpCsFixer\Config())
->setRiskyAllowed(true)
->setRules([
'@PhpCsFixer' => true,
'@PhpCsFixer:risky' => true,
'@PSR12' => true,
'@PSR12:risky' => true,
'blank_line_before_statement' => [
'statements' => [
'continue',
'do',
'exit',
'goto',
'if',
'return',
'switch',
'throw',
'try',
],
],
'declare_strict_types' => true,
'global_namespace_import' => [
'import_classes' => false,
'import_constants' => false,
'import_functions' => false,
],
'php_unit_internal_class' => false,
'php_unit_test_case_static_method_calls' => ['call_type' => 'self'],
'php_unit_test_class_requires_covers' => false,
'phpdoc_to_comment' => false,
'yoda_style' => true,
'trailing_comma_in_multiline' => [
'after_heredoc' => true,
'elements' => ['arrays', 'arguments', 'parameters'],
],
'types_spaces' => ['space' => 'single'],
])
->setFinder($finder)
->setCacheFile(__DIR__.'/.php-cs-fixer.cache');

View File

@ -1,28 +0,0 @@
#!/usr/bin/env php
<?php
return (new PhpCsFixer\Config())
->setRiskyAllowed(true)
->setRules([
'@PhpCsFixer' => true,
'@PhpCsFixer:risky' => true,
'@PSR12' => true,
'@PSR12:risky' => true,
'braces' => [
'allow_single_line_closure' => true,
],
'blank_line_before_statement' => [
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try'],
],
'declare_strict_types' => true,
'global_namespace_import' => ['import_classes' => false, 'import_constants' => false, 'import_functions' => false],
'php_unit_internal_class' => false,
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
'php_unit_test_class_requires_covers' => false,
'phpdoc_to_comment' => false,
'yoda_style' => true,
])
->setFinder(
PhpCsFixer\Finder::create()
->in(__DIR__)
);

20
Makefile Normal file
View File

@ -0,0 +1,20 @@
all: install composer-validate php-cs-fixer psalm phpstan phpunit
install:
composer install
psalm:
php vendor/bin/psalm
phpstan:
php vendor/bin/phpstan analyse
phpunit:
php vendor/bin/phpunit
php-cs-fixer:
php vendor/bin/php-cs-fixer fix
composer-validate:
composer validate

View File

@ -1,11 +1,12 @@
# Nsq PHP
<img src="https://github.com/nsqphp/nsqphp/raw/main/docs/logo.png" alt="" align="left" width="150">
<img src="https://github.com/nsqphp/nsqphp/raw/master/docs/logo.png" alt="" align="left" width="150">
PHP Client for [NSQ](https://nsq.io/).
[![Latest Stable Version](https://poser.pugx.org/nsq/nsq/v)](//packagist.org/packages/nsq/nsq) [![Total Downloads](https://poser.pugx.org/nsq/nsq/downloads)](//packagist.org/packages/nsq/nsq) [![License](https://poser.pugx.org/nsq/nsq/license)](//packagist.org/packages/nsq/nsq)
[![codecov](https://codecov.io/gh/nsqphp/nsqphp/branch/main/graph/badge.svg?token=AYUMC3OO2B)](https://codecov.io/gh/nsqphp/nsqphp) [![Mutation testing badge](https://img.shields.io/endpoint?style=flat&url=https%3A%2F%2Fbadge-api.stryker-mutator.io%2Fgithub.com%2Fnsqphp%2Fnsqphp%2Fmain)](https://dashboard.stryker-mutator.io/reports/github.com/nsqphp/nsqphp/main)
[![codecov](https://codecov.io/gh/nsqphp/nsqphp/branch/master/graph/badge.svg?token=AYUMC3OO2B)](https://codecov.io/gh/nsqphp/nsqphp) [![Mutation testing badge](https://img.shields.io/endpoint?style=flat&url=https%3A%2F%2Fbadge-api.stryker-mutator.io%2Fgithub.com%2Fnsqphp%2Fnsqphp%2Fmaster)](https://dashboard.stryker-mutator.io/reports/github.com/nsqphp/nsqphp/master) [![telegram](https://raw.githubusercontent.com/aleen42/badges/master/src/telegram.svg)](http://t.me/grachevko)
This library follow [SemVer](https://semver.org/). Until version 1.0 will be released anything MAY change at any time, public API SHOULD NOT be considered stable. If you want use it before stable version was released install strict version without range.
@ -31,10 +32,10 @@ Features
- [x] PUB
- [x] SUB
- [X] Feature Negotiation
- [ ] Discovery
- [X] Discovery
- [ ] Backoff
- [ ] TLS
- [ ] Deflate
- [X] TLS
- [X] Deflate
- [X] Snappy
- [X] Sampling
- [X] AUTH
@ -80,6 +81,28 @@ $consumer = Consumer::create(
);
```
### Lookup
```php
use Nsq\Lookup;
use Nsq\Message;
$lookup = new Lookup('http://nsqlookupd0:4161');
$lookup = new Lookup(['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161']);
$callable = static function (Message $message): Generator {
yield $message->touch(); // Reset the timeout for an in-flight message
yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
yield $message->finish(); // Finish a message (indicate successful processing)
};
$lookup->subscribe(topic: 'topic', channel: 'channel', onMessage: $callable);
$lookup->subscribe(topic: 'anotherTopic', channel: 'channel', onMessage: $callable);
$lookup->unsubscribe(topic: 'local', channel: 'channel');
$lookup->stop(); // unsubscribe all
```
### Integrations
- [Symfony](https://github.com/nsqphp/NsqBundle)

View File

@ -11,34 +11,49 @@
}
],
"require": {
"php": "^8.0.1",
"php": "^8.1",
"ext-json": "*",
"amphp/http-client": "^4.6",
"amphp/socket": "^1.1",
"composer/semver": "^3.2",
"phpinnacle/buffer": "^1.2",
"psr/log": "^1.1"
"psr/log": "^3.0"
},
"require-dev": {
"amphp/log": "^1.1",
"dg/bypass-finals": "^1.3",
"ergebnis/composer-normalize": "9999999-dev",
"friendsofphp/php-cs-fixer": "^2.18",
"infection/infection": "^0.20.2",
"ergebnis/composer-normalize": "^2.15",
"friendsofphp/php-cs-fixer": "^3.4",
"nyholm/nsa": "^1.2",
"phpstan/phpstan": "^0.12.68",
"phpstan/phpstan-phpunit": "^0.12.17",
"phpstan/phpstan-strict-rules": "^0.12.9",
"phpstan/phpstan": "^1.8",
"phpstan/phpstan-phpunit": "^1.1",
"phpstan/phpstan-strict-rules": "^1.3",
"phpunit/phpunit": "^9.5",
"symfony/filesystem": "^6.1",
"symfony/process": "^6.1",
"symfony/var-dumper": "^6.1",
"vimeo/psalm": "^4.4"
},
"config": {
"sort-packages": true
"sort-packages": true,
"allow-plugins": {
"ergebnis/composer-normalize": true,
"infection/extension-installer": true
}
},
"autoload": {
"psr-4": {
"Nsq\\": "src/"
}
},
"autoload-dev": {
"classmap": [
"tests/"
],
"files": [
"vendor/symfony/var-dumper/Resources/functions/dump.php"
]
},
"minimum-stability": "dev",
"prefer-stable": true,
"scripts": {

View File

@ -2,23 +2,56 @@ version: '3.7'
services:
nsqd:
image: nsqio/nsq:v1.2.0
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqd'
command: /nsqd -log-level debug
# command: /nsqd
ports:
- 4150:4150
- 4151:4151
command: >-
nsqd
--log-level debug
--lookupd-tcp-address nsqlookupd0:4160
--lookupd-tcp-address nsqlookupd1:4160
--lookupd-tcp-address nsqlookupd2:4160
nsqlookupd0:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd0'
command: /nsqlookupd -log-level debug
nsqlookupd1:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd1'
command: /nsqlookupd -log-level debug
nsqlookupd2:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd2'
command: /nsqlookupd -log-level debug
nsqadmin:
image: nsqio/nsq:v1.2.0
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqadmin'
command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171
ports:
- 4171:4171
command:
- nsqadmin
- --http-address=0.0.0.0:4171
- --lookupd-http-address=nsqlookupd0:4161
- --lookupd-http-address=nsqlookupd1:4161
- --lookupd-http-address=nsqlookupd2:4161
depends_on:
- nsqlookupd0
- nsqlookupd1
- nsqlookupd2
tail:
image: nsqio/nsq:v1.2.0
command: nsq_tail -channel nsq_tail -topic local -nsqd-tcp-address nsqd:4150
image: nsqio/nsq:v${NSQ_VERSION}
command: >-
nsq_tail
--channel nsq_tail
--topic local
--lookupd-http-address nsqlookupd1:4161
depends_on:
- nsqd
- nsqlookupd1

View File

@ -0,0 +1,47 @@
<?php
declare(strict_types=1);
require dirname(__DIR__).'/../vendor/autoload.php';
use Amp\ByteStream;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Loop;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Lookup;
use Nsq\Message;
Loop::run(static function () {
$handler = new StreamHandler(ByteStream\getStdout());
$handler->setFormatter(new ConsoleFormatter());
$logger = new Logger('consumer', [$handler], [new PsrLogMessageProcessor()]);
$callable = static function (Message $message) {
yield $message->finish();
};
$clientConfig = new ClientConfig();
$lookupConfig = new LookupConfig();
$watcherId = Loop::repeat(5000, function () {
yield Amp\Dns\resolver()->reloadConfig();
});
$lookup = Lookup::create(
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
$lookupConfig,
$logger,
);
$lookup->subscribe('local', 'local', $callable, $clientConfig);
Loop::delay(10000, function () use ($lookup, $watcherId) {
$lookup->stop();
Loop::cancel($watcherId);
});
});

View File

@ -0,0 +1,102 @@
<?php
declare(strict_types=1);
require dirname(__DIR__).'/../vendor/autoload.php';
use Amp\ByteStream;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Loop;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Lookup;
use Nsq\Producer;
use function Amp\asyncCall;
use function Amp\delay;
Loop::run(static function () {
$handler = new StreamHandler(ByteStream\getStdout());
$handler->setFormatter(new ConsoleFormatter());
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
$clientConfig = new ClientConfig();
/** @var Producer[] $producers */
$producers = [];
$lookupConfig = new LookupConfig();
$lookup = Lookup::create(
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
$lookupConfig,
$logger,
);
$isRunning = true;
asyncCall(static function () use ($lookup, $clientConfig, $logger, &$producers, &$isRunning) {
while ($isRunning) {
/** @var Lookup\Producer[] $nodes */
$nodes = yield $lookup->nodes();
foreach ($nodes as $node) {
$address = $node->toTcpUri();
if (array_key_exists($address, $producers)) {
continue;
}
asyncCall(function () use ($address, $clientConfig, $logger, &$producers) {
yield ($producers[$address] = Producer::create($address, $clientConfig, $logger))
->onClose(function () use (&$producers, $address) {
unset($producers[$address]);
})
->connect()
;
});
}
yield delay(5000);
yield Amp\Dns\resolver()->reloadConfig(); // for reload /etc/hosts
}
});
Loop::delay(5000, function () use (&$isRunning, $logger) {
$logger->info('Stopping producer.');
$isRunning = false;
});
$counter = 0;
while (true) {
if (!$isRunning) {
foreach ($producers as $producer) {
$producer->close();
}
break;
}
if ([] === $producers) {
yield delay(200);
continue;
}
$index = array_rand($producers);
$producer = $producers[$index];
if (!$producer->isConnected()) {
yield delay(100);
continue;
}
yield $producer->publish('local', 'This is message of count '.$counter++);
}
});

View File

@ -14,6 +14,7 @@ use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig;
use Nsq\Consumer;
use Nsq\Message;
use function Amp\call;
Loop::run(static function () {

View File

@ -8,3 +8,9 @@ parameters:
paths:
- src
- tests
ignoreErrors:
-
message: '#no value type specified in iterable type array#'
paths:
- %currentWorkingDirectory%/src
- %currentWorkingDirectory%/tests

View File

@ -1,6 +1,5 @@
<?xml version="1.0"?>
<psalm
allowPhpStormGenerics="true"
ignoreInternalFunctionFalseReturn="false"
ignoreInternalFunctionNullReturn="false"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

View File

@ -13,8 +13,11 @@ final class Buffer extends ByteBuffer
{
public function readUInt32LE(): int
{
/** @phpstan-ignore-next-line */
return unpack('V', $this->consume(4))[1];
$unpacked = unpack('V', $this->consume(4));
\assert(\is_array($unpacked) && \array_key_exists(1, $unpacked));
return $unpacked[1];
}
public function consumeTimestamp(): int

View File

@ -13,7 +13,7 @@ use Composer\InstalledVersions;
*
* @psalm-immutable
*/
final class ClientConfig implements \JsonSerializable
final class ClientConfig
{
/**
* @psalm-suppress ImpureFunctionCall
@ -26,9 +26,26 @@ final class ClientConfig implements \JsonSerializable
public ?string $authSecret = null,
/**
* The timeout for establishing a connection in seconds.
* The timeout for establishing a connection in milliseconds.
*/
public int $connectTimeout = 10,
public int $connectTimeout = 10000,
/**
* The max attempts for establishing a connection.
*/
public int $maxAttempts = 0,
/**
* Use tcp_nodelay for establishing a connection.
*/
public bool $tcpNoDelay = false,
public int $rdyCount = 100,
/**
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
* it will send back a JSON payload of supported features and metadata.
*/
public bool $featureNegotiation = true,
/**
* An identifier used to disambiguate this client (i.e. something specific to the consumer).
@ -73,12 +90,6 @@ final class ClientConfig implements \JsonSerializable
*/
public int $sampleRate = 0,
/**
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
* it will send back a JSON payload of supported features and metadata.
*/
public bool $featureNegotiation = true,
/**
* Enable TLS for this connection.
*/
@ -89,11 +100,6 @@ final class ClientConfig implements \JsonSerializable
*/
public bool $snappy = false,
/**
* The read timeout for connection sockets and for awaiting responses from nsq.
*/
public int $readTimeout = 5,
/**
* A string identifying the agent for this client in the spirit of HTTP.
*/
@ -114,12 +120,14 @@ final class ClientConfig implements \JsonSerializable
}
}
/**
* @phpstan-ignore-next-line
*/
public function jsonSerialize(): array
public static function fromArray(array $array): self
{
return [
return new self(...array_intersect_key($array, get_class_vars(self::class)));
}
public function asNegotiationPayload(): string
{
$data = [
'client_id' => $this->clientId,
'deflate' => $this->deflate,
'deflate_level' => $this->deflateLevel,
@ -132,10 +140,7 @@ final class ClientConfig implements \JsonSerializable
'tls_v1' => $this->tls,
'user_agent' => $this->userAgent,
];
}
public function toString(): string
{
return json_encode($this, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
return json_encode($data, JSON_THROW_ON_ERROR);
}
}

View File

@ -0,0 +1,13 @@
<?php
declare(strict_types=1);
namespace Nsq\Config;
final class LookupConfig
{
public function __construct(
public int $pollingInterval = 10000,
) {
}
}

View File

@ -8,8 +8,10 @@ namespace Nsq\Config;
* The configuration object that holds the config status for a single Connection.
*
* @psalm-immutable
*
* @internal
*/
final class ConnectionConfig
final class ServerConfig
{
public function __construct(
/**
@ -82,9 +84,6 @@ final class ConnectionConfig
) {
}
/**
* @phpstan-ignore-next-line
*/
public static function fromArray(array $array): self
{
return new self(

View File

@ -4,9 +4,11 @@ declare(strict_types=1);
namespace Nsq;
use Amp\ByteStream\ClosedException;
use Amp\Failure;
use Amp\Promise;
use Nsq\Config\ClientConfig;
use Nsq\Config\ConnectionConfig;
use Nsq\Config\ServerConfig;
use Nsq\Exception\AuthenticationRequired;
use Nsq\Exception\NsqException;
use Nsq\Frame\Response;
@ -15,6 +17,8 @@ use Nsq\Stream\NullStream;
use Nsq\Stream\SnappyStream;
use Nsq\Stream\SocketStream;
use Psr\Log\LoggerInterface;
use function Amp\asyncCall;
use function Amp\call;
/**
@ -22,23 +26,45 @@ use function Amp\call;
*/
abstract class Connection
{
protected Stream $stream;
private Stream $stream;
/**
* @var callable
*/
private $onConnectCallback;
/**
* @var callable
*/
private $onCloseCallback;
public function __construct(
private string $address,
private ClientConfig $clientConfig,
private LoggerInterface $logger,
/**
* @readonly
*/
public string $address,
protected ClientConfig $clientConfig,
protected LoggerInterface $logger,
) {
$this->stream = new NullStream();
$this->onConnectCallback = static function (): void {
};
$this->onCloseCallback = static function (): void {
};
}
public function __destruct()
{
$this->close();
$this->close(false);
}
public function isConnected(): bool
{
return !$this->stream instanceof NullStream;
}
/**
* @return Promise<void>
* @psalm-return Promise<void>
*/
public function connect(): Promise
{
@ -46,16 +72,32 @@ abstract class Connection
$buffer = new Buffer();
/** @var SocketStream $stream */
$stream = yield SocketStream::connect($this->address);
$stream = yield SocketStream::connect(
$this->address,
$this->clientConfig->connectTimeout,
$this->clientConfig->maxAttempts,
$this->clientConfig->tcpNoDelay,
);
yield $stream->write(Command::magic());
yield $stream->write(Command::identify($this->clientConfig->toString()));
yield $stream->write(Command::identify($this->clientConfig->asNegotiationPayload()));
/** @var Response $response */
$response = yield $this->response($stream, $buffer);
$connectionConfig = ConnectionConfig::fromArray($response->toArray());
$serverConfig = ServerConfig::fromArray($response->toArray());
if ($connectionConfig->snappy) {
if ($serverConfig->tls) {
yield $stream->setupTls();
/** @var Response $response */
$response = yield $this->response($stream, $buffer);
if (!$response->isOk()) {
throw new NsqException();
}
}
if ($serverConfig->snappy) {
$stream = new SnappyStream($stream, $buffer->flush());
/** @var Response $response */
@ -66,8 +108,8 @@ abstract class Connection
}
}
if ($connectionConfig->deflate) {
$stream = new GzipStream($stream);
if ($serverConfig->deflate) {
$stream = new GzipStream($stream, $serverConfig->deflateLevel, $buffer->flush());
/** @var Response $response */
$response = yield $this->response($stream, $buffer);
@ -77,7 +119,7 @@ abstract class Connection
}
}
if ($connectionConfig->authRequired) {
if ($serverConfig->authRequired) {
if (null === $this->clientConfig->authSecret) {
throw new AuthenticationRequired();
}
@ -91,15 +133,103 @@ abstract class Connection
}
$this->stream = $stream;
($this->onConnectCallback)();
});
}
public function close(): void
public function close(bool $graceful = true): void
{
// $this->stream->write(Command::cls());
if (!$this->isConnected()) {
return;
}
$this->stream->close();
$this->stream = new NullStream();
$logger = $this->logger;
[$stream, $this->stream] = [$this->stream, new NullStream()];
if ($graceful) {
$this->logger->debug('Graceful disconnect.', [
'class' => static::class,
'address' => $this->address,
]);
asyncCall(static function () use ($stream, $logger): \Generator {
try {
yield $stream->write(Command::cls());
} catch (\Throwable $e) {
$logger->warning($e->getMessage(), ['exception' => $e]);
}
$stream->close();
});
return;
}
try {
$stream->close();
} catch (ClosedException) {
}
($this->onCloseCallback)();
}
public function onConnect(callable $callback): static
{
$previous = $this->onConnectCallback;
$this->onConnectCallback = static function () use ($previous, $callback): void {
$previous();
$callback();
};
return $this;
}
public function onClose(callable $callback): static
{
$previous = $this->onCloseCallback;
$this->onCloseCallback = static function () use ($previous, $callback): void {
$previous();
$callback();
};
return $this;
}
/**
* @psalm-return Promise<null|string>
*/
protected function read(): Promise
{
return call(function (): \Generator {
try {
return yield $this->stream->read();
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
$this->close(false);
return new Failure($e);
}
});
}
/**
* @psalm-return Promise<void>
*/
protected function write(string $data): Promise
{
return call(function () use ($data): \Generator {
try {
return yield $this->stream->write($data);
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
$this->close(false);
return new Failure($e);
}
});
}
protected function handleError(Frame\Error $error): void
@ -114,7 +244,7 @@ abstract class Connection
}
/**
* @return Promise<Frame\Response>
* @psalm-return Promise<Frame\Response>
*/
private function response(Stream $stream, Buffer $buffer): Promise
{

View File

@ -6,15 +6,19 @@ namespace Nsq;
use Amp\Failure;
use Amp\Promise;
use Amp\Success;
use Nsq\Config\ClientConfig;
use Nsq\Exception\ConsumerException;
use Nsq\Frame\Response;
use Nsq\Stream\NullStream;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function Amp\asyncCall;
use function Amp\call;
/**
* @internal
*/
final class Consumer extends Connection
{
private int $rdy = 0;
@ -25,20 +29,35 @@ final class Consumer extends Connection
private $onMessage;
public function __construct(
private string $address,
private string $topic,
private string $channel,
string $address,
/**
* @readonly
*/
public string $topic,
/**
* @readonly
*/
public string $channel,
callable $onMessage,
ClientConfig $clientConfig,
private LoggerInterface $logger,
LoggerInterface $logger,
) {
parent::__construct(
$this->address,
$address,
$clientConfig,
$this->logger,
$logger,
);
$this->onMessage = $onMessage;
$context = compact('address', 'topic', 'channel');
$this->onConnect(function () use ($context): void {
$this->logger->debug('Consumer connected.', $context);
});
$this->onClose(function () use ($context): void {
$this->logger->debug('Consumer disconnected.', $context);
});
$this->logger->debug('Consumer created.', $context);
}
public static function create(
@ -61,7 +80,7 @@ final class Consumer extends Connection
public function connect(): Promise
{
if (!$this->stream instanceof NullStream) {
if ($this->isConnected()) {
return call(static function (): void {
});
}
@ -69,58 +88,57 @@ final class Consumer extends Connection
return call(function (): \Generator {
yield parent::connect();
$this->run();
});
}
$buffer = new Buffer();
private function run(): void
{
$buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator {
yield $this->stream->write(Command::sub($this->topic, $this->channel));
if (null !== ($chunk = yield $this->stream->read())) {
$buffer->append($chunk);
}
/** @var Response $response */
$response = Parser::parse($buffer);
if (!$response->isOk()) {
return new Failure(new ConsumerException('Fail subscription.'));
}
yield $this->rdy(2500);
/** @phpstan-ignore-next-line */
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->stream->read()) {
yield $this->write(Command::sub($this->topic, $this->channel));
if (null !== ($chunk = yield $this->read())) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->stream->write(Command::nop());
break;
}
throw ConsumerException::response($frame);
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
case $frame instanceof Frame\Message:
asyncCall($this->onMessage, Message::compose($frame, $this));
break;
}
}
}
$this->stream = new NullStream();
/** @var Response $response */
$response = Parser::parse($buffer);
if (!$response->isOk()) {
return new Failure(new ConsumerException('Fail subscription.'));
}
yield $this->rdy(1);
/** @phpstan-ignore-next-line */
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->write(Command::nop());
break;
}
throw ConsumerException::response($frame);
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
case $frame instanceof Frame\Message:
asyncCall($this->onMessage, Message::compose($frame, $this));
break;
}
if ($this->rdy !== $this->clientConfig->rdyCount) {
yield $this->rdy($this->clientConfig->rdyCount);
}
}
}
$this->close(false);
});
});
});
}
@ -128,32 +146,53 @@ final class Consumer extends Connection
/**
* Update RDY state (indicate you are ready to receive N messages).
*
* @return Promise<void>
* @psalm-return Promise<bool>
*/
public function rdy(int $count): Promise
{
if (!$this->isConnected()) {
return new Success(false);
}
if ($this->rdy === $count) {
return call(static function (): void {
});
return new Success(true);
}
$this->rdy = $count;
return $this->stream->write(Command::rdy($count));
return call(function () use ($count): \Generator {
try {
yield $this->write(Command::rdy($count));
return true;
} catch (\Throwable) {
return false;
}
});
}
/**
* Finish a message (indicate successful processing).
*
* @return Promise<void>
* @psalm-return Promise<bool>
*
* @internal
*/
public function fin(string $id): Promise
{
--$this->rdy;
if (!$this->isConnected()) {
return new Success(false);
}
return $this->stream->write(Command::fin($id));
return call(function () use ($id): \Generator {
try {
yield $this->write(Command::fin($id));
return true;
} catch (\Throwable) {
return false;
}
});
}
/**
@ -162,26 +201,48 @@ final class Consumer extends Connection
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
* behaves identically to an explicit REQ.
*
* @return Promise<void>
* @psalm-return Promise<bool>
*
* @internal
*/
public function req(string $id, int $timeout): Promise
{
--$this->rdy;
if (!$this->isConnected()) {
return new Success(false);
}
return $this->stream->write(Command::req($id, $timeout));
return call(function () use ($id, $timeout): \Generator {
try {
yield $this->write(Command::req($id, $timeout));
return true;
} catch (\Throwable) {
return false;
}
});
}
/**
* Reset the timeout for an in-flight message.
*
* @return Promise<void>
* @psalm-return Promise<bool>
*
* @internal
*/
public function touch(string $id): Promise
{
return $this->stream->write(Command::touch($id));
if (!$this->isConnected()) {
return new Success(false);
}
return call(function () use ($id): \Generator {
try {
yield $this->write(Command::touch($id));
return true;
} catch (\Throwable) {
return false;
}
});
}
}

View File

@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Psr\Log\LogLevel;
final class LookupException extends NsqException
{
public function level(): string
{
return match ($this->getMessage()) {
'TOPIC_NOT_FOUND' => LogLevel::DEBUG,
default => LogLevel::WARNING,
};
}
}

View File

@ -0,0 +1,9 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
final class StreamException extends NsqException
{
}

View File

@ -6,10 +6,9 @@ namespace Nsq;
abstract class Frame
{
public const TYPE_RESPONSE = 0,
TYPE_ERROR = 1,
TYPE_MESSAGE = 2
;
public const TYPE_RESPONSE = 0;
public const TYPE_ERROR = 1;
public const TYPE_MESSAGE = 2;
public function __construct(public int $type)
{

View File

@ -30,7 +30,7 @@ final class Response extends Frame
}
/**
* @return array<mixed, mixed>
* @psalm-return array<mixed, mixed>
*/
public function toArray(): array
{

274
src/Lookup.php Normal file
View File

@ -0,0 +1,274 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Amp\Deferred;
use Amp\Dns\DnsException;
use Amp\Http\Client\DelegateHttpClient;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\NullCancellationToken;
use Amp\Promise;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Exception\LookupException;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function Amp\asyncCall;
use function Amp\call;
use function Amp\delay;
final class Lookup
{
/**
* @psalm-var array<string, array<string, Lookup\Producer>>
*/
private array $producers = [];
private array $consumers = [];
private array $running = [];
private array $topicWatchers = [];
public function __construct(
private array $addresses,
private LookupConfig $config,
private LoggerInterface $logger,
private DelegateHttpClient $httpClient,
) {
}
public static function create(
string | array $address,
LookupConfig $config = null,
LoggerInterface $logger = null,
DelegateHttpClient $httpClient = null,
): self {
return new self(
(array) $address,
$config ?? new LookupConfig(),
$logger ?? new NullLogger(),
$httpClient ?? HttpClientBuilder::buildDefault(),
);
}
/**
* @psalm-return Promise<Lookup\Producer[]>
*/
public function nodes(): Promise
{
return call(function (): \Generator {
$requestHandler = function (string $uri): \Generator {
/** @var Response $response */
$response = yield $this->httpClient->request(new Request($uri.'/nodes'), new NullCancellationToken());
try {
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
} catch (LookupException $e) {
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
return null;
}
};
$promises = [];
foreach ($this->addresses as $address) {
$promises[$address] = call($requestHandler, $address);
}
$nodes = [];
/** @var Lookup\Response $response */
foreach (yield $promises as $response) {
foreach ($response->producers as $producer) {
$nodes[$producer->toTcpUri()] = $producer;
}
}
return array_values($nodes);
});
}
public function stop(): void
{
$this->producers = [];
$this->consumers = [];
$this->running = [];
$this->topicWatchers = [];
$this->logger->info('Lookup stopped.');
}
/**
* @psalm-suppress InvalidPropertyAssignmentValue
*/
public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void
{
if (null !== ($this->running[$topic][$channel] ?? null)) {
throw new \InvalidArgumentException('Subscription already exists.');
}
$this->running[$topic][$channel] = true;
asyncCall(function () use ($topic, $channel, $onMessage, $config): \Generator {
while (true) {
if (null === ($this->running[$topic][$channel] ?? null)) {
return;
}
/** @phpstan-ignore-next-line */
$producers = $this->producers[$topic] ??= new Deferred();
if ($producers instanceof Deferred) {
/** @var array<string, Lookup\Producer> $producers */
$producers = yield $producers->promise();
}
foreach (array_diff_key($this->consumers, $producers) as $address => $producer) {
unset($this->consumers[$address]);
}
foreach ($producers as $address => $producer) {
if (null !== ($this->consumers[$address][$topic][$channel] ?? null)) {
continue;
}
$this->keepConnection(
Consumer::create(
$address,
$topic,
$channel,
$onMessage,
$config,
$this->logger,
),
);
}
yield delay($this->config->pollingInterval);
}
});
$this->watch($topic);
$this->logger->info('Subscribed.', compact('topic', 'channel'));
}
public function unsubscribe(string $topic, string $channel): void
{
if (null === ($this->running[$topic][$channel] ?? null)) {
$this->logger->debug('Not subscribed.', compact('topic', 'channel'));
return;
}
unset($this->running[$topic][$channel]);
if ([] === $this->running[$topic]) {
unset($this->running[$topic]);
}
$this->logger->info('Unsubscribed.', compact('topic', 'channel'));
}
private function keepConnection(Consumer $consumer): void
{
$this->consumers[$consumer->address][$consumer->topic][$consumer->channel] = $consumer;
asyncCall(function () use ($consumer): \Generator {
while (null !== ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
try {
yield $consumer->connect();
} catch (DnsException $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
unset(
$this->consumers[$consumer->address],
$this->producers[$consumer->topic][$consumer->address],
);
return;
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
yield delay($this->config->pollingInterval);
continue;
}
while (true) {
/** @phpstan-ignore-next-line */
if (null === ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
$consumer->close();
return;
}
if (!$consumer->isConnected()) {
break;
}
yield delay(500);
}
}
});
}
private function watch(string $topic): void
{
if (\array_key_exists($topic, $this->topicWatchers)) {
return;
}
$this->topicWatchers[$topic] = true;
asyncCall(function () use ($topic): \Generator {
$cancellationToken = new NullCancellationToken();
$requestHandler = function (string $uri) use ($topic, $cancellationToken): \Generator {
$this->logger->debug('Lookup', compact('topic'));
/** @var Response $response */
$response = yield $this->httpClient->request(new Request($uri.'/lookup?topic='.$topic), $cancellationToken);
try {
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
} catch (LookupException $e) {
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
return null;
}
};
while (\array_key_exists($topic, $this->running)) {
$promises = [];
foreach ($this->addresses as $address) {
$promises[$address] = call($requestHandler, $address);
}
/** @var Lookup\Response[] $responses */
$responses = yield $promises;
$producers = [];
foreach ($responses as $response) {
foreach ($response->producers as $producer) {
$producers[$producer->toTcpUri()] = $producer;
}
}
/** @phpstan-ignore-next-line */
if (($deferred = ($this->producers[$topic] ?? null)) instanceof Deferred) {
$deferred->resolve($producers);
}
$this->producers[$topic] = $producers;
yield delay($this->config->pollingInterval);
}
unset($this->topicWatchers[$topic]);
});
}
}

39
src/Lookup/Producer.php Normal file
View File

@ -0,0 +1,39 @@
<?php
declare(strict_types=1);
namespace Nsq\Lookup;
final class Producer
{
public function __construct(
public string $broadcastAddress,
public string $hostname,
public string $remoteAddress,
public int $tcpPort,
public int $httpPort,
public string $version,
public array $tombstones,
public array $topics,
) {
}
public static function fromArray(array $array): self
{
return new self(
$array['broadcast_address'],
$array['hostname'],
$array['remote_address'],
$array['tcp_port'],
$array['http_port'],
$array['version'],
$array['tombstones'] ?? [],
$array['topics'] ?? [],
);
}
public function toTcpUri(): string
{
return sprintf('%s:%s', $this->broadcastAddress, $this->tcpPort);
}
}

34
src/Lookup/Response.php Normal file
View File

@ -0,0 +1,34 @@
<?php
declare(strict_types=1);
namespace Nsq\Lookup;
use Nsq\Exception\LookupException;
final class Response
{
/**
* @param string[] $channels
* @param Producer[] $producers
*/
public function __construct(
public array $channels,
public array $producers,
) {
}
public static function fromJson(string $json): self
{
$array = json_decode($json, true, flags: JSON_THROW_ON_ERROR);
if (\array_key_exists('message', $array)) {
throw new LookupException($array['message']);
}
return new self(
$array['channels'] ?? [],
array_map([Producer::class, 'fromArray'], $array['producers']),
);
}
}

View File

@ -6,7 +6,6 @@ namespace Nsq;
use Amp\Promise;
use Nsq\Exception\MessageException;
use function Amp\call;
final class Message
{
@ -32,51 +31,51 @@ final class Message
);
}
public function isProcessed(): bool
{
return $this->processed;
}
/**
* @return Promise<void>
* @psalm-return Promise<bool>
*/
public function finish(): Promise
{
return call(function (): \Generator {
if ($this->processed) {
throw MessageException::processed($this);
}
$this->markAsProcessedOrFail();
yield $this->consumer->fin($this->id);
$this->processed = true;
});
return $this->consumer->fin($this->id);
}
/**
* @return Promise<void>
* @psalm-param positive-int|0 $timeout
*
* @psalm-return Promise<bool>
*/
public function requeue(int $timeout): Promise
{
return call(function () use ($timeout): \Generator {
if ($this->processed) {
throw MessageException::processed($this);
}
$this->markAsProcessedOrFail();
yield $this->consumer->req($this->id, $timeout);
$this->processed = true;
});
return $this->consumer->req($this->id, $timeout);
}
/**
* @return Promise<void>
* @psalm-return Promise<bool>
*/
public function touch(): Promise
{
return call(function (): \Generator {
if ($this->processed) {
throw MessageException::processed($this);
}
if ($this->processed) {
throw MessageException::processed($this);
}
yield $this->consumer->touch($this->id);
return $this->consumer->touch($this->id);
}
$this->processed = true;
});
private function markAsProcessedOrFail(): void
{
if ($this->processed) {
throw MessageException::processed($this);
}
$this->processed = true;
}
}

View File

@ -5,16 +5,38 @@ declare(strict_types=1);
namespace Nsq;
use Amp\Promise;
use Amp\Success;
use Nsq\Config\ClientConfig;
use Nsq\Exception\NsqException;
use Nsq\Stream\NullStream;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function Amp\asyncCall;
use function Amp\call;
final class Producer extends Connection
{
public function __construct(
string $address,
ClientConfig $clientConfig,
LoggerInterface $logger,
) {
parent::__construct(
$address,
$clientConfig,
$logger,
);
$context = compact('address');
$this->onConnect(function () use ($context): void {
$this->logger->debug('Producer connected.', $context);
});
$this->onClose(function () use ($context): void {
$this->logger->debug('Producer disconnected.', $context);
});
$this->logger->debug('Producer created.', $context);
}
public static function create(
string $address,
ClientConfig $clientConfig = null,
@ -29,7 +51,7 @@ final class Producer extends Connection
public function connect(): Promise
{
if (!$this->stream instanceof NullStream) {
if ($this->isConnected()) {
return call(static function (): void {
});
}
@ -37,63 +59,72 @@ final class Producer extends Connection
return call(function (): \Generator {
yield parent::connect();
$this->run();
$buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->write(Command::nop());
}
// Ok received
break;
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
default:
throw new NsqException('Unreachable statement.');
}
}
}
$this->close(false);
});
});
}
/**
* @param array<int, string>|string $body
*
* @return Promise<void>
* @psalm-param positive-int|0 $delay
*
* @psalm-return Promise<bool>
*/
public function publish(string $topic, string | array $body, int $delay = 0): Promise
public function publish(string $topic, string | array $body, int $delay = null): Promise
{
if (0 < $delay) {
return call(
function (array $bodies) use ($topic, $delay): \Generator {
foreach ($bodies as $body) {
yield $this->stream->write(Command::dpub($topic, $body, $delay));
}
},
(array) $body,
);
if (!$this->isConnected()) {
return new Success(false);
}
$command = \is_array($body)
? Command::mpub($topic, $body)
: Command::pub($topic, $body);
return $this->stream->write($command);
}
private function run(): void
{
$buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->stream->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->stream->write(Command::nop());
}
// Ok received
break;
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
default:
throw new NsqException('Unreachable statement.');
return call(
function (iterable $commands): \Generator {
try {
foreach ($commands as $command) {
yield $this->write($command);
}
}
}
$this->stream = new NullStream();
});
return true;
} catch (\Throwable) {
return false;
}
},
(static function () use ($topic, $body, $delay): \Generator {
if (\is_array($body) && null === $delay) {
yield Command::mpub($topic, $body);
} elseif (null !== $delay) {
foreach ((array) $body as $content) {
yield Command::dpub($topic, $content, $delay);
}
} else {
yield Command::pub($topic, $body);
}
})(),
);
}
}

View File

@ -4,19 +4,23 @@ declare(strict_types=1);
namespace Nsq;
use Amp\ByteStream\ClosedException;
use Amp\Promise;
interface Stream
{
/**
* @return Promise<null|string>
* @psalm-return Promise<null|string>
*/
public function read(): Promise;
/**
* @return Promise<void>
* @psalm-return Promise<void>
*/
public function write(string $data): Promise;
/**
* @throws ClosedException
*/
public function close(): void;
}

View File

@ -5,14 +5,38 @@ declare(strict_types=1);
namespace Nsq\Stream;
use Amp\Promise;
use Nsq\Exception\NsqException;
use Nsq\Buffer;
use Nsq\Exception\StreamException;
use Nsq\Stream;
use function Amp\call;
class GzipStream implements Stream
{
public function __construct(private Stream $stream)
private ?\InflateContext $inflate = null;
private ?\DeflateContext $deflate = null;
private Buffer $buffer;
public function __construct(private Stream $stream, private int $level, string $bytes = '')
{
throw new NsqException('GzipStream not implemented yet.');
/** @var false|\InflateContext $inflate */
$inflate = @inflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
/** @var \DeflateContext|false $deflate */
$deflate = @deflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
if (false === $inflate) {
throw new StreamException('Failed initializing inflate context');
}
if (false === $deflate) {
throw new StreamException('Failed initializing deflate context');
}
$this->inflate = $inflate;
$this->deflate = $deflate;
$this->buffer = new Buffer($bytes);
}
/**
@ -20,7 +44,33 @@ class GzipStream implements Stream
*/
public function read(): Promise
{
return $this->stream->read();
return call(function (): \Generator {
if (null === $this->inflate) {
return null;
}
if ($this->buffer->empty()) {
$chunk = yield $this->stream->read();
if (null !== $chunk) {
$this->buffer->append($chunk);
}
}
$data = $this->buffer->flush();
if ('' === $data) {
return null;
}
/** @psalm-suppress UndefinedFunction,InvalidArgument */
$decompressed = inflate_add($this->inflate, $data, ZLIB_SYNC_FLUSH);
if (false === $decompressed) {
throw new StreamException('Failed adding data to deflate context');
}
return $decompressed;
});
}
/**
@ -28,11 +78,27 @@ class GzipStream implements Stream
*/
public function write(string $data): Promise
{
return $this->stream->write($data);
if (null === $this->deflate) {
throw new StreamException('The stream has already been closed');
}
/** @psalm-suppress UndefinedFunction,InvalidArgument */
$compressed = deflate_add($this->deflate, $data, ZLIB_SYNC_FLUSH);
if (false === $compressed) {
throw new StreamException('Failed adding data to deflate context');
}
return $this->stream->write($compressed);
}
/**
* {@inheritDoc}
*/
public function close(): void
{
$this->stream->close();
$this->inflate = null;
$this->deflate = null;
}
}

View File

@ -4,10 +4,11 @@ declare(strict_types=1);
namespace Nsq\Stream;
use Amp\Failure;
use Amp\Promise;
use Amp\Success;
use Nsq\Exception\NsqException;
use Nsq\Stream;
use function Amp\call;
final class NullStream implements Stream
{
@ -24,8 +25,7 @@ final class NullStream implements Stream
*/
public function write(string $data): Promise
{
return call(static function (): void {
});
return new Failure(new NsqException('Connection closed.'));
}
/**

View File

@ -8,24 +8,25 @@ use Amp\Promise;
use Nsq\Buffer;
use Nsq\Exception\SnappyException;
use Nsq\Stream;
use function Amp\call;
class SnappyStream implements Stream
{
private const IDENTIFIER = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
private const IDENTIFIER = [0xFF, 0x06, 0x00, 0x00, 0x73, 0x4E, 0x61, 0x50, 0x70, 0x59];
private const SIZE_HEADER = 4;
private const SIZE_CHECKSUM = 4;
private const SIZE_CHUNK = 65536;
private const TYPE_IDENTIFIER = 0xff;
private const TYPE_IDENTIFIER = 0xFF;
private const TYPE_COMPRESSED = 0x00;
private const TYPE_UNCOMPRESSED = 0x01;
private const TYPE_PADDING = 0xfe;
private const TYPE_PADDING = 0xFE;
private Buffer $buffer;
public function __construct(private Stream $stream, string $bytes = '')
{
if (!\function_exists('snappy_uncompress')) {
if (!\function_exists('snappy_uncompress') || !\function_exists('snappy_compress')) {
throw SnappyException::notInstalled();
}
@ -45,7 +46,7 @@ class SnappyStream implements Stream
$type = $this->buffer->readUInt32LE();
$size = $type >> 8;
$type &= 0xff;
$type &= 0xFF;
while ($this->buffer->size() < $size && null !== ($chunk = yield $this->stream->read())) {
$this->buffer->append($chunk);
@ -59,6 +60,7 @@ class SnappyStream implements Stream
case self::TYPE_COMPRESSED:
$this->buffer->discard(self::SIZE_CHECKSUM);
/** @psalm-suppress UndefinedFunction */
return snappy_uncompress($this->buffer->consume($size - self::SIZE_HEADER));
case self::TYPE_UNCOMPRESSED:
$this->buffer->discard(self::SIZE_CHECKSUM);
@ -78,6 +80,7 @@ class SnappyStream implements Stream
public function write(string $data): Promise
{
return call(function () use ($data): Promise {
/** @var string $result */
$result = pack('CCCCCCCCCC', ...self::IDENTIFIER);
foreach (str_split($data, self::SIZE_CHUNK) as $chunk) {
@ -93,23 +96,27 @@ class SnappyStream implements Stream
$this->stream->close();
}
/**
* @psalm-suppress PossiblyFalseArgument
*/
private function compress(string $uncompressed): string
{
/** @psalm-suppress UndefinedFunction */
$compressed = snappy_compress($uncompressed);
\assert(\is_string($compressed));
[$type, $data] = \strlen($compressed) <= 0.875 * \strlen($uncompressed)
? [self::TYPE_COMPRESSED, $compressed]
: [self::TYPE_UNCOMPRESSED, $uncompressed];
/** @phpstan-ignore-next-line */
$checksum = unpack('N', hash('crc32c', $uncompressed, true))[1];
$checksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff;
/** @psalm-suppress PossiblyFalseArgument */
$unpacked = unpack('N', hash('crc32c', $uncompressed, true));
\assert(\is_array($unpacked));
$checksum = $unpacked[1];
$checksum = (($checksum >> 15) | ($checksum << 17)) + 0xA282EAD8 & 0xFFFFFFFF;
$size = (\strlen($data) + 4) << 8;
/** @psalm-suppress PossiblyFalseOperand */
return pack('VV', $type + $size, $checksum).$data;
}
}

View File

@ -5,20 +5,22 @@ declare(strict_types=1);
namespace Nsq\Stream;
use Amp\Promise;
use Amp\Socket\ClientTlsContext;
use Amp\Socket\ConnectContext;
use Amp\Socket\Socket;
use Amp\Socket\EncryptableSocket;
use Nsq\Stream;
use function Amp\call;
use function Amp\Socket\connect;
class SocketStream implements Stream
{
public function __construct(private Socket $socket)
public function __construct(private EncryptableSocket $socket)
{
}
/**
* @return Promise<self>
* @psalm-return Promise<self>
*/
public static function connect(string $uri, int $timeout = 0, int $attempts = 0, bool $noDelay = false): Promise
{
@ -37,12 +39,17 @@ class SocketStream implements Stream
$context = $context->withTcpNoDelay();
}
$context = $context->withTlsContext(
(new ClientTlsContext(''))
->withoutPeerVerification(),
);
return new self(yield connect($uri, $context));
});
}
/**
* @return Promise<null|string>
* @psalm-return Promise<null|string>
*/
public function read(): Promise
{
@ -50,15 +57,26 @@ class SocketStream implements Stream
}
/**
* @return Promise<void>
* @psalm-return Promise<void>
*/
public function write(string $data): Promise
{
return $this->socket->write($data);
}
/**
* {@inheritDoc}
*/
public function close(): void
{
$this->socket->close();
}
/**
* @psalm-return Promise<void>
*/
public function setupTls(): Promise
{
return $this->socket->setupTls();
}
}

View File

@ -7,6 +7,11 @@ use PHPUnit\Framework\TestCase;
final class ClientConfigTest extends TestCase
{
public function testNegotiationPayload(): void
{
self::assertJson((new ClientConfig())->asNegotiationPayload());
}
public function testInvalidCompression(): void
{
$this->expectException(InvalidArgumentException::class);
@ -14,4 +19,42 @@ final class ClientConfigTest extends TestCase
new ClientConfig(deflate: true, snappy: true);
}
/**
* @dataProvider array
*/
public function testFromArray(array $data, array $expected): void
{
self::assertSame($expected, get_object_vars(ClientConfig::fromArray($data)));
}
public function array(): Generator
{
$default = get_object_vars(new ClientConfig());
yield 'Empty array' => [[], $default];
yield 'With wrong keys' => [['bla' => 'bla'], $default];
$custom = [
'authSecret' => 'SomeSecret',
'connectTimeout' => 100,
'maxAttempts' => 10,
'tcpNoDelay' => true,
'rdyCount' => 1,
'featureNegotiation' => true,
'clientId' => 'SomeGorgeousClientId',
'deflate' => true,
'deflateLevel' => 1,
'heartbeatInterval' => 31111,
'hostname' => gethostname(),
'msgTimeout' => 59999,
'sampleRate' => 25,
'tls' => true,
'snappy' => false,
'userAgent' => 'nsqphp/test',
];
yield 'Full filled' => [$custom, $custom];
}
}

View File

@ -55,6 +55,7 @@ final class MessageTest extends TestCase
*/
public function messages(): Generator
{
/** @phpstan-ignore-next-line */
$consumer = $this->createMock(Consumer::class);
$consumer->method('fin')->willReturn(new Success());
$consumer->method('touch')->willReturn(new Success());

View File

@ -24,7 +24,6 @@ final class NsqTest extends TestCase
new ClientConfig(
heartbeatInterval: 3000,
snappy: false,
readTimeout: 1,
),
];
@ -32,7 +31,6 @@ final class NsqTest extends TestCase
new ClientConfig(
heartbeatInterval: 3000,
snappy: true,
readTimeout: 1,
),
];
}

95
tests/Nsqd.php Normal file
View File

@ -0,0 +1,95 @@
<?php
declare(strict_types=1);
use Symfony\Component\Filesystem\Filesystem;
use Symfony\Component\Process\Process;
final class Nsqd
{
public Process $process;
public string $address;
private static Filesystem $fs;
public function __construct(
public readonly string $dataPath,
public readonly int $httpPort,
public readonly int $tcpPort,
) {
self::$fs ??= new Filesystem();
self::$fs->mkdir($this->dataPath);
$nsqd = new Process([
'./nsqd',
sprintf('-data-path=%s', $this->dataPath),
sprintf('-http-address=0.0.0.0:%s', $this->httpPort),
sprintf('-tcp-address=0.0.0.0:%s', $this->tcpPort),
'-log-level=debug',
], dirname(__DIR__).'/bin');
$nsqd->start();
while (false === @fsockopen('localhost', $this->tcpPort)) {
if (!$nsqd->isRunning()) {
throw new RuntimeException($nsqd->getErrorOutput());
}
usleep(10000);
}
$this->process = $nsqd;
$this->address = sprintf('tcp://localhost:%s', $this->tcpPort);
}
public static function create(): self
{
do {
$dir = sprintf('/tmp/%s', bin2hex(random_bytes(5)));
} while (is_dir($dir));
return new self(
$dir,
findFreePort(),
findFreePort(),
);
}
public function tail(string $topic, string $channel, int $messages): Process
{
$tail = new Process(
[
'./nsq_tail',
sprintf('-nsqd-tcp-address=localhost:%s', $this->tcpPort),
sprintf('-topic=%s', $topic),
sprintf('-channel=%s', $channel),
sprintf('-n=%s', $messages),
'-print-topic',
],
dirname(__DIR__).'/bin',
timeout: 10,
);
$tail->start();
return $tail;
}
public function __destruct()
{
$this->process->stop();
self::$fs->remove($this->dataPath);
}
}
function findFreePort(): int
{
$sock = socket_create_listen(0);
assert($sock instanceof \Socket);
socket_getsockname($sock, $addr, $port);
socket_close($sock);
return $port;
}

View File

@ -7,17 +7,75 @@ use Nsq\Exception\ServerException;
use Nsq\Producer;
use PHPUnit\Framework\TestCase;
use function Amp\Promise\wait;
final class ProducerTest extends TestCase
{
/**
* @dataProvider bodies
*/
public function testPublish(string $body): void
{
$nsqd = Nsqd::create();
$tail = $nsqd->tail('test', 'test', 1);
$producer = Producer::create($nsqd->address);
wait($producer->connect());
self::assertTrue(wait($producer->publish('test', $body)));
self::assertSame(0, $tail->wait());
self::assertSame("test | {$body}", trim($tail->getOutput()));
}
/**
* @dataProvider bodies
*/
public function testPublishMultiple(string $body): void
{
$nsqd = Nsqd::create();
$tail = $nsqd->tail('test', 'test', 2);
$producer = Producer::create($nsqd->address);
wait($producer->connect());
self::assertTrue(wait($producer->publish('test', [$body, $body])));
self::assertSame(0, $tail->wait());
self::assertSame("test | {$body}\ntest | {$body}", trim($tail->getOutput()));
}
/**
* @dataProvider bodies
*/
public function testPublishDeferred(string $body): void
{
$nsqd = Nsqd::create();
$tail = $nsqd->tail('test', 'test', 1);
$producer = Producer::create($nsqd->address);
wait($producer->connect());
self::assertTrue(wait($producer->publish('test', $body, 1)));
self::assertSame(0, $tail->wait());
self::assertSame("test | {$body}", trim($tail->getOutput()));
}
/**
* @dataProvider pubFails
*/
public function testPubFail(string $topic, string $body, string $exceptionMessage): void
{
$nsqd = Nsqd::create();
$this->expectException(ServerException::class);
$this->expectExceptionMessage($exceptionMessage);
$producer = Producer::create('tcp://localhost:4150');
$producer = Producer::create($nsqd->address);
Loop::run(static function () use ($producer, $topic, $body): Generator {
yield $producer->connect();
@ -26,12 +84,15 @@ final class ProducerTest extends TestCase
});
}
/**
* @return Generator<string, array>
*/
public function pubFails(): Generator
{
yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0'];
yield 'Invalid topic' => ['test$%^&', '', 'E_BAD_TOPIC PUB topic name "test$%^&" is not valid'];
}
public function bodies(): Generator
{
yield 'Simple Body' => ['Simple Body'];
yield 'Body with special chars' => ['test$%^&'];
}
}