4 Commits

Author SHA1 Message Date
443473f53d handle heartbeat after reading frame 2021-02-05 00:08:06 +03:00
57b7715c2e simplify command 2021-02-04 23:09:47 +03:00
b126094e74 Close connection on __destruct 2021-02-04 23:07:41 +03:00
15296f4b61 amphp 2021-02-04 22:57:46 +03:00
68 changed files with 1170 additions and 2478 deletions

1
.env
View File

@@ -1 +0,0 @@
NSQ_VERSION=1.2.1

21
.gitattributes vendored
View File

@@ -1,21 +0,0 @@
# Exclude build/test files from archive
/.editorconfig export-ignore
/.gitattributes export-ignore
/.github export-ignore
/.gitignore export-ignore
/.php_cs export-ignore
/.php_cs.dist export-ignore
/.psalm export-ignore
/docs export-ignore
/examples export-ignore
/infection.json export-ignore
/infection.json.dist export-ignore
/phpstan.neon export-ignore
/phpunit.xml export-ignore
/phpunit.xml.dist export-ignore
/psalm.xml export-ignore
/tests export-ignore
# Configure diff output for .php and .phar files.
*.php diff=php
*.phar -diff

202
.github/workflows/ci.yaml vendored Normal file
View File

@@ -0,0 +1,202 @@
name: CI
on:
- pull_request
- push
jobs:
tests:
name: Tests
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
php:
- '8.0'
dependencies:
- lowest
- highest
services:
nsqd:
image: nsqio/nsq:v1.2.0
options: --entrypoint /nsqd
ports:
- 4150:4150
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php }}
coverage: pcov
env:
update: true
- name: Setup Problem Matchers for PHPUnit
run: echo "::add-matcher::${{ runner.tool_cache }}/phpunit.json"
- name: Determine Composer cache directory
id: composer-cache
run: echo "::set-output name=directory::$(composer config cache-dir)"
- name: Cache Composer dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.directory }}
key: ${{ runner.os }}-${{ matrix.php }}-composer-${{ matrix.dependencies }}-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-${{ matrix.php }}-${{ matrix.dependencies }}-composer-
- name: Install highest dependencies
run: composer update --no-progress --no-interaction --prefer-dist
if: ${{ matrix.dependencies == 'highest' }}
- name: Install lowest dependencies
run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest
if: ${{ matrix.dependencies == 'lowest' }}
- name: Run tests
run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml
- name: Upload code coverage
uses: codecov/codecov-action@v1
with:
file: build/coverage-report.xml
php-cs-fixer:
name: PHP-CS-Fixer
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.0'
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
run: composer cs-check
phpstan:
name: PHPStan
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.0'
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
run: composer phpstan
psalm:
name: Psalm
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.0'
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
run: vendor/bin/psalm --output-format=github
infection:
name: Infection
runs-on: ubuntu-latest
services:
nsqd:
image: nsqio/nsq:v1.2.0
options: --entrypoint /nsqd
ports:
- 4150:4150
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.0'
coverage: pcov
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
env:
STRYKER_DASHBOARD_API_KEY: ${{ secrets.STRYKER_DASHBOARD_API_KEY }}
run: |
git fetch --depth=1 origin $GITHUB_BASE_REF
php vendor/bin/infection -j2 --git-diff-filter=A --git-diff-base=origin/$GITHUB_BASE_REF --logger-github --ignore-msi-with-no-mutations --only-covered

View File

@@ -1,37 +0,0 @@
name: Code Style
on:
- pull_request
- push
jobs:
php-cs-fixer:
name: PHP-CS-Fixer
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.1'
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
run: composer cs-check

View File

@@ -1,71 +0,0 @@
name: phpunit
on:
- pull_request
- push
jobs:
phpunit:
name: phpunit
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
php:
- '8.1'
nsq:
- nsq-1.2.0.linux-amd64.go1.12.9
- nsq-1.2.1.linux-amd64.go1.16.6
dependencies:
- lowest
- highest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php }}
coverage: pcov
extensions: kjdev/php-ext-snappy@0.2.1
env:
update: true
- name: Download NSQ
run: |
curl -sSL "http://bitly-downloads.s3.amazonaws.com/nsq/${{ matrix.nsq }}.tar.gz" \
| tar -xzv --strip-components=1
./bin/nsqd --version
- name: Setup Problem Matchers for PHPUnit
run: echo "::add-matcher::${{ runner.tool_cache }}/phpunit.json"
- name: Determine Composer cache directory
id: composer-cache
run: echo "::set-output name=directory::$(composer config cache-dir)"
- name: Cache Composer dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.directory }}
key: ${{ runner.os }}-${{ matrix.php }}-composer-${{ matrix.dependencies }}-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-${{ matrix.php }}-${{ matrix.dependencies }}-composer-
- name: Install highest dependencies
run: composer update --no-progress --no-interaction --prefer-dist
if: ${{ matrix.dependencies == 'highest' }}
- name: Install lowest dependencies
run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest
if: ${{ matrix.dependencies == 'lowest' }}
- name: Run tests
run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml
- name: Upload code coverage
uses: codecov/codecov-action@v1
with:
file: build/coverage-report.xml

View File

@@ -1,70 +0,0 @@
name: Static Analyze
on:
- pull_request
- push
jobs:
phpstan:
name: PHPStan
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.1'
extensions: snappy-kjdev/php-ext-snappy@0.2.1
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
run: composer phpstan
psalm:
name: Psalm
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.1'
extensions: snappy-kjdev/php-ext-snappy@0.2.1
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
run: vendor/bin/psalm --output-format=github

13
.gitignore vendored
View File

@@ -1,17 +1,6 @@
/vendor/ /vendor/
/composer.lock /composer.lock
/.php-cs-fixer.cache /.php_cs.cache
/.phpunit.result.cache /.phpunit.result.cache
/infection.log /infection.log
# Nsq
bin/nsq_stat
bin/nsq_tail
bin/nsq_to_file
bin/nsq_to_http
bin/nsq_to_nsq
bin/nsqadmin
bin/nsqd
bin/nsqlookupd
bin/to_nsq

View File

@@ -1,45 +0,0 @@
<?php
$finder = PhpCsFixer\Finder::create()
->in(__DIR__)
->exclude('vendor');
return (new PhpCsFixer\Config())
->setRiskyAllowed(true)
->setRules([
'@PhpCsFixer' => true,
'@PhpCsFixer:risky' => true,
'@PSR12' => true,
'@PSR12:risky' => true,
'blank_line_before_statement' => [
'statements' => [
'continue',
'do',
'exit',
'goto',
'if',
'return',
'switch',
'throw',
'try',
],
],
'declare_strict_types' => true,
'global_namespace_import' => [
'import_classes' => false,
'import_constants' => false,
'import_functions' => false,
],
'php_unit_internal_class' => false,
'php_unit_test_case_static_method_calls' => ['call_type' => 'self'],
'php_unit_test_class_requires_covers' => false,
'phpdoc_to_comment' => false,
'yoda_style' => true,
'trailing_comma_in_multiline' => [
'after_heredoc' => true,
'elements' => ['arrays', 'arguments', 'parameters'],
],
'types_spaces' => ['space' => 'single'],
])
->setFinder($finder)
->setCacheFile(__DIR__.'/.php-cs-fixer.cache');

28
.php_cs.dist Normal file
View File

@@ -0,0 +1,28 @@
#!/usr/bin/env php
<?php
return (new PhpCsFixer\Config())
->setRiskyAllowed(true)
->setRules([
'@PhpCsFixer' => true,
'@PhpCsFixer:risky' => true,
'@PSR12' => true,
'@PSR12:risky' => true,
'braces' => [
'allow_single_line_closure' => true,
],
'blank_line_before_statement' => [
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try'],
],
'declare_strict_types' => true,
'global_namespace_import' => ['import_classes' => false, 'import_constants' => false, 'import_functions' => false],
'php_unit_internal_class' => false,
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
'php_unit_test_class_requires_covers' => false,
'phpdoc_to_comment' => false,
'yoda_style' => true,
])
->setFinder(
PhpCsFixer\Finder::create()
->in(__DIR__)
);

View File

@@ -1,20 +0,0 @@
all: install composer-validate php-cs-fixer psalm phpstan phpunit
install:
composer install
psalm:
php vendor/bin/psalm
phpstan:
php vendor/bin/phpstan analyse
phpunit:
php vendor/bin/phpunit
php-cs-fixer:
php vendor/bin/php-cs-fixer fix
composer-validate:
composer validate

View File

@@ -1,12 +1,11 @@
# Nsq PHP # Nsq PHP
<img src="https://github.com/nsqphp/nsqphp/raw/master/docs/logo.png" alt="" align="left" width="150"> <img src="https://github.com/nsqphp/nsqphp/raw/main/logo.png" alt="" align="left" width="150">
PHP Client for [NSQ](https://nsq.io/). PHP Client for [NSQ](https://nsq.io/).
[![Latest Stable Version](https://poser.pugx.org/nsq/nsq/v)](//packagist.org/packages/nsq/nsq) [![Total Downloads](https://poser.pugx.org/nsq/nsq/downloads)](//packagist.org/packages/nsq/nsq) [![License](https://poser.pugx.org/nsq/nsq/license)](//packagist.org/packages/nsq/nsq) [![Latest Stable Version](https://poser.pugx.org/nsq/nsq/v)](//packagist.org/packages/nsq/nsq) [![Total Downloads](https://poser.pugx.org/nsq/nsq/downloads)](//packagist.org/packages/nsq/nsq) [![License](https://poser.pugx.org/nsq/nsq/license)](//packagist.org/packages/nsq/nsq)
[![codecov](https://codecov.io/gh/nsqphp/nsqphp/branch/master/graph/badge.svg?token=AYUMC3OO2B)](https://codecov.io/gh/nsqphp/nsqphp) [![Mutation testing badge](https://img.shields.io/endpoint?style=flat&url=https%3A%2F%2Fbadge-api.stryker-mutator.io%2Fgithub.com%2Fnsqphp%2Fnsqphp%2Fmaster)](https://dashboard.stryker-mutator.io/reports/github.com/nsqphp/nsqphp/master) [![telegram](https://raw.githubusercontent.com/aleen42/badges/master/src/telegram.svg)](http://t.me/grachevko) [![codecov](https://codecov.io/gh/nsqphp/nsqphp/branch/main/graph/badge.svg?token=AYUMC3OO2B)](https://codecov.io/gh/nsqphp/nsqphp) [![Mutation testing badge](https://img.shields.io/endpoint?style=flat&url=https%3A%2F%2Fbadge-api.stryker-mutator.io%2Fgithub.com%2Fnsqphp%2Fnsqphp%2Fmain)](https://dashboard.stryker-mutator.io/reports/github.com/nsqphp/nsqphp/main)
This library follow [SemVer](https://semver.org/). Until version 1.0 will be released anything MAY change at any time, public API SHOULD NOT be considered stable. If you want use it before stable version was released install strict version without range. This library follow [SemVer](https://semver.org/). Until version 1.0 will be released anything MAY change at any time, public API SHOULD NOT be considered stable. If you want use it before stable version was released install strict version without range.
@@ -32,10 +31,10 @@ Features
- [x] PUB - [x] PUB
- [x] SUB - [x] SUB
- [X] Feature Negotiation - [X] Feature Negotiation
- [X] Discovery - [ ] Discovery
- [ ] Backoff - [ ] Backoff
- [X] TLS - [ ] TLS
- [X] Deflate - [ ] Deflate
- [X] Snappy - [X] Snappy
- [X] Sampling - [X] Sampling
- [X] AUTH - [X] AUTH
@@ -48,59 +47,53 @@ Usage
```php ```php
use Nsq\Producer; use Nsq\Producer;
$producer = Producer::create(address: 'tcp://nsqd:4150'); $producer = new Producer(address: 'tcp://nsqd:4150');
// Publish a message to a topic // Publish a message to a topic
$producer->publish('topic', 'Simple message'); $producer->pub('topic', 'Simple message');
// Publish multiple messages to a topic (atomically) // Publish multiple messages to a topic (atomically)
$producer->publish('topic', [ $producer->mpub('topic', [
'Message one', 'Message one',
'Message two', 'Message two',
]); ]);
// Publish a deferred message to a topic // Publish a deferred message to a topic
$producer->publish('topic', 'Deferred message', delay: 5000); $producer->dpub('topic', 'Deferred message', delay: 5000);
``` ```
### Consumer ### Consumer
```php ```php
use Nsq\Consumer; use Nsq\Consumer;
use Nsq\Message; use Nsq\Protocol\Message;
$consumer = Consumer::create( $consumer = new Consumer(
address: 'tcp://nsqd:4150',
topic: 'topic', topic: 'topic',
channel: 'channel', channel: 'channel',
onMessage: static function (Message $message): Generator { address: 'tcp://nsqd:4150',
yield $message->touch(); // Reset the timeout for an in-flight message
yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
yield $message->finish(); // Finish a message (indicate successful processing)
},
); );
```
### Lookup // Simple blocking loop based on generator
$generator = $consumer->generator();
```php foreach ($generator as $message) {
use Nsq\Lookup; if ($message instanceof Message) {
use Nsq\Message; $payload = $message->body;
$lookup = new Lookup('http://nsqlookupd0:4161'); // handle message
$lookup = new Lookup(['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161']);
$callable = static function (Message $message): Generator { $message->touch(); // Reset the timeout for an in-flight message
yield $message->touch(); // Reset the timeout for an in-flight message $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process) $message->finish(); // Finish a message (indicate successful processing)
yield $message->finish(); // Finish a message (indicate successful processing) }
};
$lookup->subscribe(topic: 'topic', channel: 'channel', onMessage: $callable); // In case of nothing received during timeout generator will return NULL
$lookup->subscribe(topic: 'anotherTopic', channel: 'channel', onMessage: $callable); // Here we can do something between messages, like pcntl_signal_dispatch()
$lookup->unsubscribe(topic: 'local', channel: 'channel'); // Gracefully close connection (loop will be ended)
$lookup->stop(); // unsubscribe all $generator->send(0);
}
``` ```
### Integrations ### Integrations

View File

@@ -11,54 +11,39 @@
} }
], ],
"require": { "require": {
"php": "^8.1", "php": "^8.0.1",
"ext-json": "*", "ext-json": "*",
"amphp/http-client": "^4.6",
"amphp/socket": "^1.1", "amphp/socket": "^1.1",
"composer/semver": "^3.2", "composer/semver": "^3.2",
"phpinnacle/buffer": "^1.2", "phpinnacle/buffer": "^1.2",
"psr/log": "^3.0" "psr/log": "^1.1"
}, },
"require-dev": { "require-dev": {
"amphp/log": "^1.1", "amphp/log": "^1.1",
"dg/bypass-finals": "^1.3", "dg/bypass-finals": "^1.3",
"ergebnis/composer-normalize": "^2.15", "ergebnis/composer-normalize": "9999999-dev",
"friendsofphp/php-cs-fixer": "^3.4", "friendsofphp/php-cs-fixer": "^2.18",
"infection/infection": "^0.20.2",
"nyholm/nsa": "^1.2", "nyholm/nsa": "^1.2",
"phpstan/phpstan": "^1.8", "phpstan/phpstan": "^0.12.68",
"phpstan/phpstan-phpunit": "^1.1", "phpstan/phpstan-phpunit": "^0.12.17",
"phpstan/phpstan-strict-rules": "^1.3", "phpstan/phpstan-strict-rules": "^0.12.9",
"phpunit/phpunit": "^9.5", "phpunit/phpunit": "^9.5",
"symfony/filesystem": "^6.1",
"symfony/process": "^6.1",
"symfony/var-dumper": "^6.1",
"vimeo/psalm": "^4.4" "vimeo/psalm": "^4.4"
}, },
"config": { "config": {
"sort-packages": true, "sort-packages": true
"allow-plugins": {
"ergebnis/composer-normalize": true,
"infection/extension-installer": true
}
}, },
"autoload": { "autoload": {
"psr-4": { "psr-4": {
"Nsq\\": "src/" "Nsq\\": "src/"
} }
}, },
"autoload-dev": {
"classmap": [
"tests/"
],
"files": [
"vendor/symfony/var-dumper/Resources/functions/dump.php"
]
},
"minimum-stability": "dev", "minimum-stability": "dev",
"prefer-stable": true, "prefer-stable": true,
"scripts": { "scripts": {
"cs": [ "cs": [
"vendor/bin/php-cs-fixer fix --using-cache=no" "vendor/bin/php-cs-fixer fix"
], ],
"cs-check": [ "cs-check": [
"vendor/bin/php-cs-fixer fix --verbose --diff --dry-run" "vendor/bin/php-cs-fixer fix --verbose --diff --dry-run"
@@ -75,7 +60,7 @@
"vendor/bin/psalm" "vendor/bin/psalm"
], ],
"test": [ "test": [
"@norm", "@norm-check",
"@cs", "@cs",
"@phpstan", "@phpstan",
"@psalm", "@psalm",

View File

@@ -2,56 +2,19 @@ version: '3.7'
services: services:
nsqd: nsqd:
image: nsqio/nsq:v${NSQ_VERSION} image: nsqio/nsq:v1.2.0
labels: labels:
ru.grachevko.dhu: 'nsqd' ru.grachevko.dhu: 'nsqd'
command: >- command: /nsqd -log-level debug
nsqd # command: /nsqd
--log-level debug ports:
--lookupd-tcp-address nsqlookupd0:4160 - 4150:4150
--lookupd-tcp-address nsqlookupd1:4160 - 4151:4151
--lookupd-tcp-address nsqlookupd2:4160
nsqlookupd0:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd0'
command: /nsqlookupd -log-level debug
nsqlookupd1:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd1'
command: /nsqlookupd -log-level debug
nsqlookupd2:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd2'
command: /nsqlookupd -log-level debug
nsqadmin: nsqadmin:
image: nsqio/nsq:v${NSQ_VERSION} image: nsqio/nsq:v1.2.0
labels: labels:
ru.grachevko.dhu: 'nsqadmin' ru.grachevko.dhu: 'nsqadmin'
command: command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171
- nsqadmin ports:
- --http-address=0.0.0.0:4171 - 4171:4171
- --lookupd-http-address=nsqlookupd0:4161
- --lookupd-http-address=nsqlookupd1:4161
- --lookupd-http-address=nsqlookupd2:4161
depends_on:
- nsqlookupd0
- nsqlookupd1
- nsqlookupd2
tail:
image: nsqio/nsq:v${NSQ_VERSION}
command: >-
nsq_tail
--channel nsq_tail
--topic local
--lookupd-http-address nsqlookupd1:4161
depends_on:
- nsqd
- nsqlookupd1

View File

@@ -9,12 +9,12 @@ use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler; use Amp\Log\StreamHandler;
use Amp\Loop; use Amp\Loop;
use Amp\Promise; use Amp\Promise;
use Amp\Success;
use Monolog\Logger; use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor; use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Consumer; use Nsq\Consumer;
use Nsq\Message; use Nsq\Protocol\Message;
use function Amp\call; use function Amp\call;
Loop::run(static function () { Loop::run(static function () {
@@ -24,6 +24,16 @@ Loop::run(static function () {
$consumer = new Consumer( $consumer = new Consumer(
'tcp://localhost:4150', 'tcp://localhost:4150',
clientConfig: new ClientConfig(
deflate: false,
snappy: false,
),
logger: $logger,
);
yield $consumer->connect();
yield $consumer->listen(
topic: 'local', topic: 'local',
channel: 'local', channel: 'local',
onMessage: static function (Message $message) use ($logger): Promise { onMessage: static function (Message $message) use ($logger): Promise {
@@ -31,14 +41,9 @@ Loop::run(static function () {
$logger->info('Received: {body}', ['body' => $message->body]); $logger->info('Received: {body}', ['body' => $message->body]);
yield $message->finish(); yield $message->finish();
});
},
clientConfig: new ClientConfig(
deflate: false,
snappy: true,
),
logger: $logger,
);
yield $consumer->connect(); return new Success(false);
});
}
);
}); });

View File

@@ -1,47 +0,0 @@
<?php
declare(strict_types=1);
require dirname(__DIR__).'/../vendor/autoload.php';
use Amp\ByteStream;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Loop;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Lookup;
use Nsq\Message;
Loop::run(static function () {
$handler = new StreamHandler(ByteStream\getStdout());
$handler->setFormatter(new ConsoleFormatter());
$logger = new Logger('consumer', [$handler], [new PsrLogMessageProcessor()]);
$callable = static function (Message $message) {
yield $message->finish();
};
$clientConfig = new ClientConfig();
$lookupConfig = new LookupConfig();
$watcherId = Loop::repeat(5000, function () {
yield Amp\Dns\resolver()->reloadConfig();
});
$lookup = Lookup::create(
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
$lookupConfig,
$logger,
);
$lookup->subscribe('local', 'local', $callable, $clientConfig);
Loop::delay(10000, function () use ($lookup, $watcherId) {
$lookup->stop();
Loop::cancel($watcherId);
});
});

View File

@@ -1,102 +0,0 @@
<?php
declare(strict_types=1);
require dirname(__DIR__).'/../vendor/autoload.php';
use Amp\ByteStream;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Loop;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Lookup;
use Nsq\Producer;
use function Amp\asyncCall;
use function Amp\delay;
Loop::run(static function () {
$handler = new StreamHandler(ByteStream\getStdout());
$handler->setFormatter(new ConsoleFormatter());
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
$clientConfig = new ClientConfig();
/** @var Producer[] $producers */
$producers = [];
$lookupConfig = new LookupConfig();
$lookup = Lookup::create(
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
$lookupConfig,
$logger,
);
$isRunning = true;
asyncCall(static function () use ($lookup, $clientConfig, $logger, &$producers, &$isRunning) {
while ($isRunning) {
/** @var Lookup\Producer[] $nodes */
$nodes = yield $lookup->nodes();
foreach ($nodes as $node) {
$address = $node->toTcpUri();
if (array_key_exists($address, $producers)) {
continue;
}
asyncCall(function () use ($address, $clientConfig, $logger, &$producers) {
yield ($producers[$address] = Producer::create($address, $clientConfig, $logger))
->onClose(function () use (&$producers, $address) {
unset($producers[$address]);
})
->connect()
;
});
}
yield delay(5000);
yield Amp\Dns\resolver()->reloadConfig(); // for reload /etc/hosts
}
});
Loop::delay(5000, function () use (&$isRunning, $logger) {
$logger->info('Stopping producer.');
$isRunning = false;
});
$counter = 0;
while (true) {
if (!$isRunning) {
foreach ($producers as $producer) {
$producer->close();
}
break;
}
if ([] === $producers) {
yield delay(200);
continue;
}
$index = array_rand($producers);
$producer = $producers[$index];
if (!$producer->isConnected()) {
yield delay(100);
continue;
}
yield $producer->publish('local', 'This is message of count '.$counter++);
}
});

View File

@@ -22,15 +22,12 @@ Loop::run(static function () {
'tcp://localhost:4150', 'tcp://localhost:4150',
clientConfig: new ClientConfig( clientConfig: new ClientConfig(
deflate: false, deflate: false,
heartbeatInterval: 5000, snappy: false,
snappy: true,
), ),
logger: $logger, logger: $logger,
); );
yield $producer->connect(); yield $producer->connect();
while (true) { yield $producer->pub(topic: 'topic', body: 'Message body!');
yield $producer->publish(topic: 'local', body: array_fill(0, 200, 'Message body!'));
}
}); });

View File

Before

Width:  |  Height:  |  Size: 98 KiB

After

Width:  |  Height:  |  Size: 98 KiB

View File

@@ -8,9 +8,3 @@ parameters:
paths: paths:
- src - src
- tests - tests
ignoreErrors:
-
message: '#no value type specified in iterable type array#'
paths:
- %currentWorkingDirectory%/src
- %currentWorkingDirectory%/tests

View File

@@ -1,5 +1,6 @@
<?xml version="1.0"?> <?xml version="1.0"?>
<psalm <psalm
allowPhpStormGenerics="true"
ignoreInternalFunctionFalseReturn="false" ignoreInternalFunctionFalseReturn="false"
ignoreInternalFunctionNullReturn="false" ignoreInternalFunctionNullReturn="false"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

View File

@@ -1,3 +0,0 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
}

View File

@@ -1,37 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq;
use PHPinnacle\Buffer\ByteBuffer;
/**
* @psalm-suppress
*/
final class Buffer extends ByteBuffer
{
public function readUInt32LE(): int
{
$unpacked = unpack('V', $this->consume(4));
\assert(\is_array($unpacked) && \array_key_exists(1, $unpacked));
return $unpacked[1];
}
public function consumeTimestamp(): int
{
return $this->consumeUint64();
}
public function consumeAttempts(): int
{
return $this->consumeUint16();
}
public function consumeMessageID(): string
{
return $this->consume(16);
}
}

View File

@@ -1,111 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq;
use PHPinnacle\Buffer\ByteBuffer;
/**
* @internal
*/
final class Command
{
public static function magic(): string
{
return ' V2';
}
public static function identify(string $data): string
{
return self::pack('IDENTIFY', data: $data);
}
public static function auth(?string $authSecret): string
{
return self::pack('AUTH', data: $authSecret);
}
public static function nop(): string
{
return self::pack('NOP');
}
public static function cls(): string
{
return self::pack('CLS');
}
public static function rdy(int $count): string
{
return self::pack('RDY', (string) $count);
}
public static function fin(string $id): string
{
return self::pack('FIN', $id);
}
public static function req(string $id, int $timeout): string
{
return self::pack('REQ', [$id, $timeout]);
}
public static function touch(string $id): string
{
return self::pack('TOUCH', $id);
}
public static function pub(string $topic, string $body): string
{
return self::pack('PUB', $topic, $body);
}
/**
* @param array<int, string> $bodies
*/
public static function mpub(string $topic, array $bodies): string
{
static $buffer;
$buffer ??= new ByteBuffer();
$buffer->appendUint32(\count($bodies));
foreach ($bodies as $body) {
$buffer->appendUint32(\strlen($body));
$buffer->append($body);
}
return self::pack('MPUB', $topic, $buffer->flush());
}
public static function dpub(string $topic, string $body, int $delay): string
{
return self::pack('DPUB', [$topic, $delay], $body);
}
public static function sub(string $topic, string $channel): string
{
return self::pack('SUB', [$topic, $channel]);
}
/**
* @param array<int, scalar>|string $params
*/
private static function pack(string $command, array | string $params = [], string $data = null): string
{
static $buffer;
$buffer ??= new Buffer();
$command = implode(' ', [$command, ...((array) $params)]);
$buffer->append($command.PHP_EOL);
if (null !== $data) {
$buffer->appendUint32(\strlen($data));
$buffer->append($data);
}
return $buffer->flush();
}
}

View File

@@ -13,7 +13,7 @@ use Composer\InstalledVersions;
* *
* @psalm-immutable * @psalm-immutable
*/ */
final class ClientConfig final class ClientConfig implements \JsonSerializable
{ {
/** /**
* @psalm-suppress ImpureFunctionCall * @psalm-suppress ImpureFunctionCall
@@ -26,26 +26,9 @@ final class ClientConfig
public ?string $authSecret = null, public ?string $authSecret = null,
/** /**
* The timeout for establishing a connection in milliseconds. * The timeout for establishing a connection in seconds.
*/ */
public int $connectTimeout = 10000, public int $connectTimeout = 10,
/**
* The max attempts for establishing a connection.
*/
public int $maxAttempts = 0,
/**
* Use tcp_nodelay for establishing a connection.
*/
public bool $tcpNoDelay = false,
public int $rdyCount = 100,
/**
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
* it will send back a JSON payload of supported features and metadata.
*/
public bool $featureNegotiation = true,
/** /**
* An identifier used to disambiguate this client (i.e. something specific to the consumer). * An identifier used to disambiguate this client (i.e. something specific to the consumer).
@@ -90,6 +73,12 @@ final class ClientConfig
*/ */
public int $sampleRate = 0, public int $sampleRate = 0,
/**
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
* it will send back a JSON payload of supported features and metadata.
*/
public bool $featureNegotiation = true,
/** /**
* Enable TLS for this connection. * Enable TLS for this connection.
*/ */
@@ -100,6 +89,11 @@ final class ClientConfig
*/ */
public bool $snappy = false, public bool $snappy = false,
/**
* The read timeout for connection sockets and for awaiting responses from nsq.
*/
public int $readTimeout = 5,
/** /**
* A string identifying the agent for this client in the spirit of HTTP. * A string identifying the agent for this client in the spirit of HTTP.
*/ */
@@ -120,14 +114,12 @@ final class ClientConfig
} }
} }
public static function fromArray(array $array): self /**
* @phpstan-ignore-next-line
*/
public function jsonSerialize(): array
{ {
return new self(...array_intersect_key($array, get_class_vars(self::class))); return [
}
public function asNegotiationPayload(): string
{
$data = [
'client_id' => $this->clientId, 'client_id' => $this->clientId,
'deflate' => $this->deflate, 'deflate' => $this->deflate,
'deflate_level' => $this->deflateLevel, 'deflate_level' => $this->deflateLevel,
@@ -140,7 +132,10 @@ final class ClientConfig
'tls_v1' => $this->tls, 'tls_v1' => $this->tls,
'user_agent' => $this->userAgent, 'user_agent' => $this->userAgent,
]; ];
}
return json_encode($data, JSON_THROW_ON_ERROR); public function toString(): string
{
return json_encode($this, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
} }
} }

View File

@@ -8,10 +8,8 @@ namespace Nsq\Config;
* The configuration object that holds the config status for a single Connection. * The configuration object that holds the config status for a single Connection.
* *
* @psalm-immutable * @psalm-immutable
*
* @internal
*/ */
final class ServerConfig final class ConnectionConfig
{ {
public function __construct( public function __construct(
/** /**
@@ -84,6 +82,9 @@ final class ServerConfig
) { ) {
} }
/**
* @phpstan-ignore-next-line
*/
public static function fromArray(array $array): self public static function fromArray(array $array): self
{ {
return new self( return new self(

View File

@@ -1,13 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Config;
final class LookupConfig
{
public function __construct(
public int $pollingInterval = 10000,
) {
}
}

View File

@@ -4,266 +4,254 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Amp\ByteStream\ClosedException; use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\ZlibInputStream;
use Amp\ByteStream\ZlibOutputStream;
use Amp\Failure; use Amp\Failure;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\Socket;
use Amp\Success;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Config\ServerConfig; use Nsq\Config\ConnectionConfig;
use Nsq\Exception\AuthenticationRequired; use Nsq\Exception\AuthenticationRequired;
use Nsq\Exception\BadResponse;
use Nsq\Exception\NotConnected;
use Nsq\Exception\NsqError;
use Nsq\Exception\NsqException; use Nsq\Exception\NsqException;
use Nsq\Frame\Response; use Nsq\Protocol\Error;
use Nsq\Stream\GzipStream; use Nsq\Protocol\Frame;
use Nsq\Protocol\Message;
use Nsq\Protocol\Response;
use Nsq\Stream\NsqInputStream;
use Nsq\Stream\NullStream; use Nsq\Stream\NullStream;
use Nsq\Stream\SnappyStream; use Nsq\Stream\SnappyInputStream;
use Nsq\Stream\SocketStream; use Nsq\Stream\SnappyOutputStream;
use PHPinnacle\Buffer\ByteBuffer;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function Amp\asyncCall;
use function Amp\call; use function Amp\call;
use function Amp\Socket\connect;
/** /**
* @internal * @internal
*/ */
abstract class Connection abstract class Connection
{ {
private Stream $stream; private ?Socket $socket = null;
/** private InputStream $inputStream;
* @var callable
*/
private $onConnectCallback;
/** private OutputStream $outputStream;
* @var callable
*/
private $onCloseCallback;
public function __construct( private ByteBuffer $buffer;
/**
* @readonly protected ?ConnectionConfig $connectionConfig = null;
*/
public string $address, protected ClientConfig $clientConfig;
protected ClientConfig $clientConfig,
protected LoggerInterface $logger, protected LoggerInterface $logger;
final public function __construct(
private string $address,
ClientConfig $clientConfig = null,
?LoggerInterface $logger = null,
) { ) {
$this->stream = new NullStream(); $this->buffer = new ByteBuffer();
$this->onConnectCallback = static function (): void { $this->inputStream = $this->outputStream = new NullStream();
}; $this->clientConfig = $clientConfig ?? new ClientConfig();
$this->onCloseCallback = static function (): void { $this->logger = $logger ?? new NullLogger();
};
} }
public function __destruct() public function __destruct()
{ {
$this->close(false); $this->close();
}
public function isConnected(): bool
{
return !$this->stream instanceof NullStream;
} }
/** /**
* @psalm-return Promise<void> * @return Promise<void>
*/ */
public function connect(): Promise public function connect(): Promise
{ {
return call(function (): \Generator { return call(function (): \Generator {
$buffer = new Buffer(); $this->socket = $this->outputStream = yield connect($this->address);
$this->inputStream = new NsqInputStream($this->socket);
/** @var SocketStream $stream */ yield $this->outputStream->write(' V2');
$stream = yield SocketStream::connect(
$this->address, yield $this->command('IDENTIFY', data: $this->clientConfig->toString());
$this->clientConfig->connectTimeout, /** @var Response $response */
$this->clientConfig->maxAttempts, $response = yield $this->readResponse();
$this->clientConfig->tcpNoDelay, $this->connectionConfig = ConnectionConfig::fromArray($response->toArray());
if ($this->connectionConfig->snappy) {
$this->inputStream = new NsqInputStream(
new SnappyInputStream($this->inputStream, $this->logger),
); );
$this->outputStream = new SnappyOutputStream($this->outputStream);
yield $stream->write(Command::magic()); $this->checkIsOK();
yield $stream->write(Command::identify($this->clientConfig->asNegotiationPayload()));
/** @var Response $response */
$response = yield $this->response($stream, $buffer);
$serverConfig = ServerConfig::fromArray($response->toArray());
if ($serverConfig->tls) {
yield $stream->setupTls();
/** @var Response $response */
$response = yield $this->response($stream, $buffer);
if (!$response->isOk()) {
throw new NsqException();
}
} }
if ($serverConfig->snappy) { if ($this->connectionConfig->deflate) {
$stream = new SnappyStream($stream, $buffer->flush()); $this->inputStream = new NsqInputStream(
new ZlibInputStream($this->socket, ZLIB_ENCODING_DEFLATE, [
'level' => $this->connectionConfig->deflateLevel,
]),
);
$this->outputStream = new ZlibOutputStream($this->socket, ZLIB_ENCODING_DEFLATE, [
'level' => $this->connectionConfig->deflateLevel,
]);
/** @var Response $response */ $this->checkIsOK();
$response = yield $this->response($stream, $buffer);
if (!$response->isOk()) {
throw new NsqException();
}
} }
if ($serverConfig->deflate) { if ($this->connectionConfig->authRequired) {
$stream = new GzipStream($stream, $serverConfig->deflateLevel, $buffer->flush());
/** @var Response $response */
$response = yield $this->response($stream, $buffer);
if (!$response->isOk()) {
throw new NsqException();
}
}
if ($serverConfig->authRequired) {
if (null === $this->clientConfig->authSecret) { if (null === $this->clientConfig->authSecret) {
yield $this->close();
throw new AuthenticationRequired(); throw new AuthenticationRequired();
} }
yield $stream->write(Command::auth($this->clientConfig->authSecret)); yield $this->command('AUTH', data: $this->clientConfig->authSecret);
$response = yield $this->readResponse();
/** @var Response $response */
$response = yield $this->response($stream, $buffer);
$this->logger->info('Authorization response: '.http_build_query($response->toArray())); $this->logger->info('Authorization response: '.http_build_query($response->toArray()));
} }
$this->stream = $stream;
($this->onConnectCallback)();
}); });
} }
public function close(bool $graceful = true): void
{
if (!$this->isConnected()) {
return;
}
$logger = $this->logger;
[$stream, $this->stream] = [$this->stream, new NullStream()];
if ($graceful) {
$this->logger->debug('Graceful disconnect.', [
'class' => static::class,
'address' => $this->address,
]);
asyncCall(static function () use ($stream, $logger): \Generator {
try {
yield $stream->write(Command::cls());
} catch (\Throwable $e) {
$logger->warning($e->getMessage(), ['exception' => $e]);
}
$stream->close();
});
return;
}
try {
$stream->close();
} catch (ClosedException) {
}
($this->onCloseCallback)();
}
public function onConnect(callable $callback): static
{
$previous = $this->onConnectCallback;
$this->onConnectCallback = static function () use ($previous, $callback): void {
$previous();
$callback();
};
return $this;
}
public function onClose(callable $callback): static
{
$previous = $this->onCloseCallback;
$this->onCloseCallback = static function () use ($previous, $callback): void {
$previous();
$callback();
};
return $this;
}
/** /**
* @psalm-return Promise<null|string> * Cleanly close your connection (no more messages are sent).
*
* @return Promise<void>
*/ */
protected function read(): Promise public function close(): Promise
{
if (null === $this->socket) {
return new Success();
}
return call(function (): \Generator {
yield $this->command('CLS');
if (null !== $this->socket) {
$this->socket->close();
$this->socket = null;
}
});
}
public function isClosed(): bool
{
return null === $this->socket;
}
/**
* @param array<int, int|string>|string $params
*
* @return Promise<void>
*/
protected function command(string $command, array | string $params = [], string $data = null): Promise
{
if (null === $this->socket) {
return new Failure(new NotConnected());
}
$command = implode(' ', [$command, ...((array) $params)]);
$buffer = $this->buffer->append($command.PHP_EOL);
if (null !== $data) {
$buffer->appendUint32(\strlen($data));
$buffer->append($data);
}
$this->logger->debug('Sending: {bytes}', ['bytes' => $buffer->bytes()]);
return $this->outputStream->write($buffer->flush());
}
/**
* @return Promise<Frame>
*/
protected function readFrame(): Promise
{ {
return call(function (): \Generator { return call(function (): \Generator {
try { $bytes = yield $this->inputStream->read();
return yield $this->stream->read();
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
$this->close(false); $this->logger->debug('Receiving: {bytes}', ['bytes' => $bytes]);
return new Failure($e); if (null === $bytes) {
throw new NotConnected();
} }
$buffer = $this->buffer->append($bytes);
$frame = match ($type = $buffer->consumeUint32()) {
0 => new Response($buffer->flush()),
1 => new Error($buffer->flush()),
2 => new Message(
timestamp: $buffer->consumeInt64(),
attempts: $buffer->consumeUint16(),
id: $buffer->consume(Bytes::BYTES_ID),
body: $buffer->flush(),
consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'),
),
default => throw new NsqException('Unexpected frame type: '.$type)
};
if ($frame instanceof Response && $frame->isHeartBeat()) {
yield $this->command('NOP');
return $this->readFrame();
}
return $frame;
}); });
} }
/** /**
* @psalm-return Promise<void> * @return Promise<void>
*/ */
protected function write(string $data): Promise protected function checkIsOK(): Promise
{ {
return call(function () use ($data): \Generator { return call(function (): \Generator {
try { /** @var Response $response */
return yield $this->stream->write($data); $response = yield $this->readResponse();
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
$this->close(false); if (!$response->isOk()) {
throw new BadResponse($response);
return new Failure($e);
} }
$this->logger->debug('Ok checked.');
return call(static function (): void {});
}); });
} }
protected function handleError(Frame\Error $error): void /**
* @return Promise<Response>
*/
private function readResponse(): Promise
{ {
$this->logger->error($error->data); return call(function (): \Generator {
$frame = yield $this->readFrame();
if (ErrorType::terminable($error)) { if ($frame instanceof Error) {
if ($frame->type->terminateConnection) {
$this->close(); $this->close();
throw $error->toException();
}
} }
/** throw new NsqError($frame);
* @psalm-return Promise<Frame\Response>
*/
private function response(Stream $stream, Buffer $buffer): Promise
{
return call(function () use ($stream, $buffer): \Generator {
while (true) {
$response = Parser::parse($buffer);
if (null === $response && null !== ($chunk = yield $stream->read())) {
$buffer->append($chunk);
continue;
} }
if (!$response instanceof Frame\Response) { if (!$frame instanceof Response) {
throw new NsqException(); throw new NsqException('Unreachable statement.');
} }
return $response; return $frame;
}
}); });
} }
} }

View File

@@ -7,192 +7,105 @@ namespace Nsq;
use Amp\Failure; use Amp\Failure;
use Amp\Promise; use Amp\Promise;
use Amp\Success; use Amp\Success;
use Nsq\Config\ClientConfig; use Nsq\Exception\NsqError;
use Nsq\Exception\ConsumerException; use Nsq\Exception\NsqException;
use Nsq\Frame\Response; use Nsq\Protocol\Error;
use Psr\Log\LoggerInterface; use Nsq\Protocol\Message;
use Psr\Log\NullLogger; use Nsq\Protocol\Response;
use function Amp\asyncCall; use function Amp\asyncCall;
use function Amp\call; use function Amp\call;
/**
* @internal
*/
final class Consumer extends Connection final class Consumer extends Connection
{ {
private int $rdy = 0; private int $rdy = 0;
/** /**
* @var callable * @return Promise<void>
*/ */
private $onMessage; public function listen(
public function __construct(
string $address,
/**
* @readonly
*/
public string $topic,
/**
* @readonly
*/
public string $channel,
callable $onMessage,
ClientConfig $clientConfig,
LoggerInterface $logger,
) {
parent::__construct(
$address,
$clientConfig,
$logger,
);
$this->onMessage = $onMessage;
$context = compact('address', 'topic', 'channel');
$this->onConnect(function () use ($context): void {
$this->logger->debug('Consumer connected.', $context);
});
$this->onClose(function () use ($context): void {
$this->logger->debug('Consumer disconnected.', $context);
});
$this->logger->debug('Consumer created.', $context);
}
public static function create(
string $address,
string $topic, string $topic,
string $channel, string $channel,
callable $onMessage, callable $onMessage,
?ClientConfig $clientConfig = null, ): Promise {
?LoggerInterface $logger = null, return call(function () use ($topic, $channel, $onMessage): \Generator {
): self { yield $this->command('SUB', [$topic, $channel]);
return new self( yield $this->checkIsOK();
$address,
$topic, asyncCall(function () use ($onMessage): \Generator {
$channel, yield $this->rdy(2500);
$onMessage,
$clientConfig ?? new ClientConfig(), while ($message = yield $this->readMessage()) {
$logger ?? new NullLogger(), $command = yield $onMessage($message);
);
if (true === $command) {
break;
} }
public function connect(): Promise if ($this->rdy < 1000) {
yield $this->rdy(2500);
}
}
return new Success();
});
});
}
/**
* @return Promise<Message>
*/
public function readMessage(): Promise
{ {
if ($this->isConnected()) {
return call(static function (): void {
});
}
return call(function (): \Generator { return call(function (): \Generator {
yield parent::connect(); $frame = yield $this->readFrame();
$buffer = new Buffer(); if ($frame instanceof Message) {
return $frame;
asyncCall(function () use ($buffer): \Generator {
yield $this->write(Command::sub($this->topic, $this->channel));
if (null !== ($chunk = yield $this->read())) {
$buffer->append($chunk);
} }
/** @var Response $response */ if ($frame instanceof Error) {
$response = Parser::parse($buffer); if ($frame->type->terminateConnection) {
yield $this->close();
if (!$response->isOk()) {
return new Failure(new ConsumerException('Fail subscription.'));
} }
yield $this->rdy(1); throw new NsqError($frame);
/** @phpstan-ignore-next-line */
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->write(Command::nop());
break;
} }
throw ConsumerException::response($frame); throw new NsqException('Unreachable statement.');
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
case $frame instanceof Frame\Message:
asyncCall($this->onMessage, Message::compose($frame, $this));
break;
}
if ($this->rdy !== $this->clientConfig->rdyCount) {
yield $this->rdy($this->clientConfig->rdyCount);
}
}
}
$this->close(false);
});
});
}); });
} }
/** /**
* Update RDY state (indicate you are ready to receive N messages). * Update RDY state (indicate you are ready to receive N messages).
* *
* @psalm-return Promise<bool> * @return Promise<void>
*/ */
public function rdy(int $count): Promise public function rdy(int $count): Promise
{ {
if (!$this->isConnected()) {
return new Success(false);
}
if ($this->rdy === $count) { if ($this->rdy === $count) {
return new Success(true); return call(static function (): void {});
} }
$this->rdy = $count; $this->rdy = $count;
return call(function () use ($count): \Generator { return $this->command('RDY', (string) $count);
try {
yield $this->write(Command::rdy($count));
return true;
} catch (\Throwable) {
return false;
}
});
} }
/** /**
* Finish a message (indicate successful processing). * Finish a message (indicate successful processing).
* *
* @psalm-return Promise<bool> * @return Promise<void>
* *
* @internal * @internal
*/ */
public function fin(string $id): Promise public function fin(string $id): Promise
{ {
if (!$this->isConnected()) { $promise = $this->command('FIN', $id);
return new Success(false); $promise->onResolve(function (): void {
} --$this->rdy;
return call(function () use ($id): \Generator {
try {
yield $this->write(Command::fin($id));
return true;
} catch (\Throwable) {
return false;
}
}); });
return $promise;
} }
/** /**
@@ -201,48 +114,29 @@ final class Consumer extends Connection
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out * be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
* behaves identically to an explicit REQ. * behaves identically to an explicit REQ.
* *
* @psalm-return Promise<bool> * @return Promise<void>
* *
* @internal * @internal
*/ */
public function req(string $id, int $timeout): Promise public function req(string $id, int $timeout): Promise
{ {
if (!$this->isConnected()) { $promise = $this->command('REQ', [$id, $timeout]);
return new Success(false); $promise->onResolve(function (): void {
} --$this->rdy;
return call(function () use ($id, $timeout): \Generator {
try {
yield $this->write(Command::req($id, $timeout));
return true;
} catch (\Throwable) {
return false;
}
}); });
return $promise;
} }
/** /**
* Reset the timeout for an in-flight message. * Reset the timeout for an in-flight message.
* *
* @psalm-return Promise<bool> * @return Promise<void>
* *
* @internal * @internal
*/ */
public function touch(string $id): Promise public function touch(string $id): Promise
{ {
if (!$this->isConnected()) { return $this->command('TOUCH', $id);
return new Success(false);
}
return call(function () use ($id): \Generator {
try {
yield $this->write(Command::touch($id));
return true;
} catch (\Throwable) {
return false;
}
});
} }
} }

View File

@@ -0,0 +1,15 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Nsq\Protocol\Response;
final class BadResponse extends NsqException
{
public function __construct(Response $response)
{
parent::__construct($response->msg);
}
}

View File

@@ -0,0 +1,16 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
final class ConnectionFail extends NsqException
{
/**
* @codeCoverageIgnore
*/
public static function fromThrowable(\Throwable $throwable): self
{
return new self($throwable->getMessage(), (int) $throwable->getCode(), $throwable);
}
}

View File

@@ -1,15 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Nsq\Frame\Response;
final class ConsumerException extends NsqException
{
public static function response(Response $response): self
{
return new self(sprintf('Consumer receive response "%s" from nsq, which not expected. ', $response->data));
}
}

View File

@@ -1,18 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Psr\Log\LogLevel;
final class LookupException extends NsqException
{
public function level(): string
{
return match ($this->getMessage()) {
'TOPIC_NOT_FOUND' => LogLevel::DEBUG,
default => LogLevel::WARNING,
};
}
}

View File

@@ -0,0 +1,25 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Nsq\Protocol\Message;
final class MessageAlreadyFinished extends NsqException
{
public static function finish(Message $message): self
{
return new self('Can\'t finish message as it already finished.');
}
public static function requeue(Message $message): self
{
return new self('Can\'t requeue message as it already finished.');
}
public static function touch(Message $message): self
{
return new self('Can\'t touch message as it already finished.');
}
}

View File

@@ -1,15 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Nsq\Message;
final class MessageException extends NsqException
{
public static function processed(Message $message): self
{
return new self(sprintf('Message "%s" already processed.', $message->id));
}
}

View File

@@ -4,6 +4,6 @@ declare(strict_types=1);
namespace Nsq\Exception; namespace Nsq\Exception;
final class ServerException extends NsqException final class NotConnected extends NsqException
{ {
} }

View File

@@ -0,0 +1,15 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Nsq\Protocol\Error;
final class NsqError extends NsqException
{
public function __construct(Error $error)
{
parent::__construct($error->rawData);
}
}

View File

@@ -1,18 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
final class SnappyException extends NsqException
{
public static function notInstalled(): self
{
return new self('Snappy extension not installed.');
}
public static function invalidHeader(): self
{
return new self('Invalid snappy protocol header.');
}
}

View File

@@ -1,9 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
final class StreamException extends NsqException
{
}

View File

@@ -1,31 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq;
abstract class Frame
{
public const TYPE_RESPONSE = 0;
public const TYPE_ERROR = 1;
public const TYPE_MESSAGE = 2;
public function __construct(public int $type)
{
}
public function response(): bool
{
return self::TYPE_RESPONSE === $this->type;
}
public function error(): bool
{
return self::TYPE_ERROR === $this->type;
}
public function message(): bool
{
return self::TYPE_MESSAGE === $this->type;
}
}

View File

@@ -1,24 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Frame;
use Nsq\Exception\ServerException;
use Nsq\Frame;
/**
* @psalm-immutable
*/
final class Error extends Frame
{
public function __construct(public string $data)
{
parent::__construct(self::TYPE_ERROR);
}
public function toException(): ServerException
{
return new ServerException($this->data);
}
}

View File

@@ -1,19 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Frame;
use Nsq\Frame;
final class Message extends Frame
{
public function __construct(
public int $timestamp,
public int $attempts,
public string $id,
public string $body,
) {
parent::__construct(self::TYPE_MESSAGE);
}
}

View File

@@ -1,39 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Frame;
use Nsq\Frame;
/**
* @psalm-immutable
*/
final class Response extends Frame
{
public const OK = 'OK';
public const HEARTBEAT = '_heartbeat_';
public function __construct(public string $data)
{
parent::__construct(self::TYPE_RESPONSE);
}
public function isOk(): bool
{
return self::OK === $this->data;
}
public function isHeartBeat(): bool
{
return self::HEARTBEAT === $this->data;
}
/**
* @psalm-return array<mixed, mixed>
*/
public function toArray(): array
{
return json_decode($this->data, true, flags: JSON_THROW_ON_ERROR);
}
}

View File

@@ -1,274 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Amp\Deferred;
use Amp\Dns\DnsException;
use Amp\Http\Client\DelegateHttpClient;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\NullCancellationToken;
use Amp\Promise;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Exception\LookupException;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function Amp\asyncCall;
use function Amp\call;
use function Amp\delay;
final class Lookup
{
/**
* @psalm-var array<string, array<string, Lookup\Producer>>
*/
private array $producers = [];
private array $consumers = [];
private array $running = [];
private array $topicWatchers = [];
public function __construct(
private array $addresses,
private LookupConfig $config,
private LoggerInterface $logger,
private DelegateHttpClient $httpClient,
) {
}
public static function create(
string | array $address,
LookupConfig $config = null,
LoggerInterface $logger = null,
DelegateHttpClient $httpClient = null,
): self {
return new self(
(array) $address,
$config ?? new LookupConfig(),
$logger ?? new NullLogger(),
$httpClient ?? HttpClientBuilder::buildDefault(),
);
}
/**
* @psalm-return Promise<Lookup\Producer[]>
*/
public function nodes(): Promise
{
return call(function (): \Generator {
$requestHandler = function (string $uri): \Generator {
/** @var Response $response */
$response = yield $this->httpClient->request(new Request($uri.'/nodes'), new NullCancellationToken());
try {
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
} catch (LookupException $e) {
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
return null;
}
};
$promises = [];
foreach ($this->addresses as $address) {
$promises[$address] = call($requestHandler, $address);
}
$nodes = [];
/** @var Lookup\Response $response */
foreach (yield $promises as $response) {
foreach ($response->producers as $producer) {
$nodes[$producer->toTcpUri()] = $producer;
}
}
return array_values($nodes);
});
}
public function stop(): void
{
$this->producers = [];
$this->consumers = [];
$this->running = [];
$this->topicWatchers = [];
$this->logger->info('Lookup stopped.');
}
/**
* @psalm-suppress InvalidPropertyAssignmentValue
*/
public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void
{
if (null !== ($this->running[$topic][$channel] ?? null)) {
throw new \InvalidArgumentException('Subscription already exists.');
}
$this->running[$topic][$channel] = true;
asyncCall(function () use ($topic, $channel, $onMessage, $config): \Generator {
while (true) {
if (null === ($this->running[$topic][$channel] ?? null)) {
return;
}
/** @phpstan-ignore-next-line */
$producers = $this->producers[$topic] ??= new Deferred();
if ($producers instanceof Deferred) {
/** @var array<string, Lookup\Producer> $producers */
$producers = yield $producers->promise();
}
foreach (array_diff_key($this->consumers, $producers) as $address => $producer) {
unset($this->consumers[$address]);
}
foreach ($producers as $address => $producer) {
if (null !== ($this->consumers[$address][$topic][$channel] ?? null)) {
continue;
}
$this->keepConnection(
Consumer::create(
$address,
$topic,
$channel,
$onMessage,
$config,
$this->logger,
),
);
}
yield delay($this->config->pollingInterval);
}
});
$this->watch($topic);
$this->logger->info('Subscribed.', compact('topic', 'channel'));
}
public function unsubscribe(string $topic, string $channel): void
{
if (null === ($this->running[$topic][$channel] ?? null)) {
$this->logger->debug('Not subscribed.', compact('topic', 'channel'));
return;
}
unset($this->running[$topic][$channel]);
if ([] === $this->running[$topic]) {
unset($this->running[$topic]);
}
$this->logger->info('Unsubscribed.', compact('topic', 'channel'));
}
private function keepConnection(Consumer $consumer): void
{
$this->consumers[$consumer->address][$consumer->topic][$consumer->channel] = $consumer;
asyncCall(function () use ($consumer): \Generator {
while (null !== ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
try {
yield $consumer->connect();
} catch (DnsException $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
unset(
$this->consumers[$consumer->address],
$this->producers[$consumer->topic][$consumer->address],
);
return;
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
yield delay($this->config->pollingInterval);
continue;
}
while (true) {
/** @phpstan-ignore-next-line */
if (null === ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
$consumer->close();
return;
}
if (!$consumer->isConnected()) {
break;
}
yield delay(500);
}
}
});
}
private function watch(string $topic): void
{
if (\array_key_exists($topic, $this->topicWatchers)) {
return;
}
$this->topicWatchers[$topic] = true;
asyncCall(function () use ($topic): \Generator {
$cancellationToken = new NullCancellationToken();
$requestHandler = function (string $uri) use ($topic, $cancellationToken): \Generator {
$this->logger->debug('Lookup', compact('topic'));
/** @var Response $response */
$response = yield $this->httpClient->request(new Request($uri.'/lookup?topic='.$topic), $cancellationToken);
try {
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
} catch (LookupException $e) {
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
return null;
}
};
while (\array_key_exists($topic, $this->running)) {
$promises = [];
foreach ($this->addresses as $address) {
$promises[$address] = call($requestHandler, $address);
}
/** @var Lookup\Response[] $responses */
$responses = yield $promises;
$producers = [];
foreach ($responses as $response) {
foreach ($response->producers as $producer) {
$producers[$producer->toTcpUri()] = $producer;
}
}
/** @phpstan-ignore-next-line */
if (($deferred = ($this->producers[$topic] ?? null)) instanceof Deferred) {
$deferred->resolve($producers);
}
$this->producers[$topic] = $producers;
yield delay($this->config->pollingInterval);
}
unset($this->topicWatchers[$topic]);
});
}
}

View File

@@ -1,39 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Lookup;
final class Producer
{
public function __construct(
public string $broadcastAddress,
public string $hostname,
public string $remoteAddress,
public int $tcpPort,
public int $httpPort,
public string $version,
public array $tombstones,
public array $topics,
) {
}
public static function fromArray(array $array): self
{
return new self(
$array['broadcast_address'],
$array['hostname'],
$array['remote_address'],
$array['tcp_port'],
$array['http_port'],
$array['version'],
$array['tombstones'] ?? [],
$array['topics'] ?? [],
);
}
public function toTcpUri(): string
{
return sprintf('%s:%s', $this->broadcastAddress, $this->tcpPort);
}
}

View File

@@ -1,34 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Lookup;
use Nsq\Exception\LookupException;
final class Response
{
/**
* @param string[] $channels
* @param Producer[] $producers
*/
public function __construct(
public array $channels,
public array $producers,
) {
}
public static function fromJson(string $json): self
{
$array = json_decode($json, true, flags: JSON_THROW_ON_ERROR);
if (\array_key_exists('message', $array)) {
throw new LookupException($array['message']);
}
return new self(
$array['channels'] ?? [],
array_map([Producer::class, 'fromArray'], $array['producers']),
);
}
}

View File

@@ -1,81 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Amp\Promise;
use Nsq\Exception\MessageException;
final class Message
{
private bool $processed = false;
public function __construct(
public string $id,
public string $body,
public int $timestamp,
public int $attempts,
private Consumer $consumer,
) {
}
public static function compose(Frame\Message $message, Consumer $consumer): self
{
return new self(
$message->id,
$message->body,
$message->timestamp,
$message->attempts,
$consumer,
);
}
public function isProcessed(): bool
{
return $this->processed;
}
/**
* @psalm-return Promise<bool>
*/
public function finish(): Promise
{
$this->markAsProcessedOrFail();
return $this->consumer->fin($this->id);
}
/**
* @psalm-param positive-int|0 $timeout
*
* @psalm-return Promise<bool>
*/
public function requeue(int $timeout): Promise
{
$this->markAsProcessedOrFail();
return $this->consumer->req($this->id, $timeout);
}
/**
* @psalm-return Promise<bool>
*/
public function touch(): Promise
{
if ($this->processed) {
throw MessageException::processed($this);
}
return $this->consumer->touch($this->id);
}
private function markAsProcessedOrFail(): void
{
if ($this->processed) {
throw MessageException::processed($this);
}
$this->processed = true;
}
}

View File

@@ -1,47 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Nsq\Exception\NsqException;
class Parser
{
private const SIZE = 4;
private const TYPE = 4;
private const MESSAGE_HEADER_SIZE =
8 + // timestamp
2 + // attempts
16 + // ID
4; // Frame type
public static function parse(Buffer $buffer): ?Frame
{
if ($buffer->size() < self::SIZE) {
return null;
}
$size = $buffer->readInt32();
if ($buffer->size() < $size + self::SIZE) {
return null;
}
$buffer->discard(self::SIZE);
$type = $buffer->consumeInt32();
return match ($type) {
Frame::TYPE_RESPONSE => new Frame\Response($buffer->consume($size - self::TYPE)),
Frame::TYPE_ERROR => new Frame\Error($buffer->consume($size - self::TYPE)),
Frame::TYPE_MESSAGE => new Frame\Message(
timestamp: $buffer->consumeTimestamp(),
attempts: $buffer->consumeAttempts(),
id: $buffer->consumeMessageID(),
body: $buffer->consume($size - self::MESSAGE_HEADER_SIZE),
),
default => throw new NsqException(sprintf('Unexpected frame type: "%s"', $type)),
};
}
}

View File

@@ -5,126 +5,55 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Amp\Promise; use Amp\Promise;
use Amp\Success; use PHPinnacle\Buffer\ByteBuffer;
use Nsq\Config\ClientConfig;
use Nsq\Exception\NsqException;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function Amp\asyncCall;
use function Amp\call; use function Amp\call;
/**
* @psalm-suppress PropertyNotSetInConstructor
*/
final class Producer extends Connection final class Producer extends Connection
{ {
public function __construct( /**
string $address, * @return Promise<void>
ClientConfig $clientConfig, */
LoggerInterface $logger, public function pub(string $topic, string $body): Promise
) {
parent::__construct(
$address,
$clientConfig,
$logger,
);
$context = compact('address');
$this->onConnect(function () use ($context): void {
$this->logger->debug('Producer connected.', $context);
});
$this->onClose(function () use ($context): void {
$this->logger->debug('Producer disconnected.', $context);
});
$this->logger->debug('Producer created.', $context);
}
public static function create(
string $address,
ClientConfig $clientConfig = null,
LoggerInterface $logger = null,
): self {
return new self(
$address,
$clientConfig ?? new ClientConfig(),
$logger ?? new NullLogger(),
);
}
public function connect(): Promise
{ {
if ($this->isConnected()) { return call(function () use ($topic, $body): \Generator {
return call(static function (): void { yield $this->command('PUB', $topic, $body);
}); yield $this->checkIsOK();
}
return call(function (): \Generator {
yield parent::connect();
$buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->write(Command::nop());
}
// Ok received
break;
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
default:
throw new NsqException('Unreachable statement.');
}
}
}
$this->close(false);
});
}); });
} }
/** /**
* @param array<int, string>|string $body * @psalm-param array<int, mixed> $bodies
* *
* @psalm-param positive-int|0 $delay * @return Promise<void>
*
* @psalm-return Promise<bool>
*/ */
public function publish(string $topic, string | array $body, int $delay = null): Promise public function mpub(string $topic, array $bodies): Promise
{ {
if (!$this->isConnected()) { return call(function () use ($topic, $bodies): \Generator {
return new Success(false); $buffer = new ByteBuffer();
$buffer->appendUint32(\count($bodies));
foreach ($bodies as $body) {
$buffer->appendUint32(\strlen($body));
$buffer->append($body);
} }
return call( yield $this->command('MPUB', $topic, $buffer->flush());
function (iterable $commands): \Generator { yield $this->checkIsOK();
try { });
foreach ($commands as $command) {
yield $this->write($command);
} }
return true; /**
} catch (\Throwable) { * @return Promise<void>
return false; */
} public function dpub(string $topic, string $body, int $delay): Promise
}, {
(static function () use ($topic, $body, $delay): \Generator { return call(function () use ($topic, $body, $delay): \Generator {
if (\is_array($body) && null === $delay) { yield $this->command('DPUB', [$topic, $delay], $body);
yield Command::mpub($topic, $body); yield $this->checkIsOK();
} elseif (null !== $delay) { });
foreach ((array) $body as $content) {
yield Command::dpub($topic, $content, $delay);
}
} else {
yield Command::pub($topic, $body);
}
})(),
);
} }
} }

22
src/Protocol/Error.php Normal file
View File

@@ -0,0 +1,22 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
use Nsq\Bytes;
/**
* @psalm-immutable
*/
final class Error extends Frame
{
public ErrorType $type;
public function __construct(public string $rawData)
{
parent::__construct(\strlen($this->rawData) + Bytes::BYTES_TYPE);
$this->type = new ErrorType(explode(' ', $this->rawData)[0]);
}
}

View File

@@ -2,7 +2,7 @@
declare(strict_types=1); declare(strict_types=1);
namespace Nsq; namespace Nsq\Protocol;
/** /**
* @psalm-immutable * @psalm-immutable
@@ -88,12 +88,13 @@ final class ErrorType
*/ */
public const E_UNAUTHORIZED = true; public const E_UNAUTHORIZED = true;
public static function terminable(Frame\Error $error): bool /**
* A boolean indicating whether or not an [Error] with this type terminates the connection or not.
*/
public bool $terminateConnection;
public function __construct(public string $type)
{ {
$type = explode(' ', $error->data)[0]; $this->terminateConnection = \constant('self::'.$this->type) ?? self::E_INVALID;
$constant = 'self::'.$type;
return \defined($constant) ? \constant($constant) : self::E_INVALID;
} }
} }

16
src/Protocol/Frame.php Normal file
View File

@@ -0,0 +1,16 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
abstract class Frame
{
public function __construct(
/**
* @psalm-readonly
*/
public int $length,
) {
}
}

101
src/Protocol/Message.php Normal file
View File

@@ -0,0 +1,101 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
use Amp\Failure;
use Amp\Promise;
use Nsq\Bytes;
use Nsq\Consumer;
use Nsq\Exception\MessageAlreadyFinished;
final class Message extends Frame
{
/**
* @psalm-readonly
*/
public int $timestamp;
/**
* @psalm-readonly
*/
public int $attempts;
/**
* @psalm-readonly
*/
public string $id;
/**
* @psalm-readonly
*/
public string $body;
private bool $finished = false;
private Consumer $consumer;
public function __construct(int $timestamp, int $attempts, string $id, string $body, Consumer $consumer)
{
parent::__construct(
Bytes::BYTES_TYPE
+ Bytes::BYTES_TIMESTAMP
+ Bytes::BYTES_ATTEMPTS
+ Bytes::BYTES_ID
+ \strlen($body)
);
$this->timestamp = $timestamp;
$this->attempts = $attempts;
$this->id = $id;
$this->body = $body;
$this->consumer = $consumer;
}
public function isFinished(): bool
{
return $this->finished;
}
/**
* @return Promise<void>
*/
public function finish(): Promise
{
if ($this->finished) {
return new Failure(MessageAlreadyFinished::finish($this));
}
$this->finished = true;
return $this->consumer->fin($this->id);
}
/**
* @return Promise<void>
*/
public function requeue(int $timeout): Promise
{
if ($this->finished) {
return new Failure(MessageAlreadyFinished::requeue($this));
}
$this->finished = true;
return $this->consumer->req($this->id, $timeout);
}
/**
* @return Promise<void>
*/
public function touch(): Promise
{
if ($this->finished) {
return new Failure(MessageAlreadyFinished::touch($this));
}
return $this->consumer->touch($this->id);
}
}

39
src/Protocol/Response.php Normal file
View File

@@ -0,0 +1,39 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
use Nsq\Bytes;
/**
* @psalm-immutable
*/
final class Response extends Frame
{
public const OK = 'OK';
public const HEARTBEAT = '_heartbeat_';
public function __construct(public string $msg)
{
parent::__construct(\strlen($this->msg) + Bytes::BYTES_TYPE);
}
public function isOk(): bool
{
return self::OK === $this->msg;
}
public function isHeartBeat(): bool
{
return self::HEARTBEAT === $this->msg;
}
/**
* @return array<mixed, mixed>
*/
public function toArray(): array
{
return json_decode($this->msg, true, flags: JSON_THROW_ON_ERROR);
}
}

View File

@@ -1,26 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Amp\ByteStream\ClosedException;
use Amp\Promise;
interface Stream
{
/**
* @psalm-return Promise<null|string>
*/
public function read(): Promise;
/**
* @psalm-return Promise<void>
*/
public function write(string $data): Promise;
/**
* @throws ClosedException
*/
public function close(): void;
}

View File

@@ -1,104 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\Promise;
use Nsq\Buffer;
use Nsq\Exception\StreamException;
use Nsq\Stream;
use function Amp\call;
class GzipStream implements Stream
{
private ?\InflateContext $inflate = null;
private ?\DeflateContext $deflate = null;
private Buffer $buffer;
public function __construct(private Stream $stream, private int $level, string $bytes = '')
{
/** @var false|\InflateContext $inflate */
$inflate = @inflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
/** @var \DeflateContext|false $deflate */
$deflate = @deflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
if (false === $inflate) {
throw new StreamException('Failed initializing inflate context');
}
if (false === $deflate) {
throw new StreamException('Failed initializing deflate context');
}
$this->inflate = $inflate;
$this->deflate = $deflate;
$this->buffer = new Buffer($bytes);
}
/**
* {@inheritdoc}
*/
public function read(): Promise
{
return call(function (): \Generator {
if (null === $this->inflate) {
return null;
}
if ($this->buffer->empty()) {
$chunk = yield $this->stream->read();
if (null !== $chunk) {
$this->buffer->append($chunk);
}
}
$data = $this->buffer->flush();
if ('' === $data) {
return null;
}
/** @psalm-suppress UndefinedFunction,InvalidArgument */
$decompressed = inflate_add($this->inflate, $data, ZLIB_SYNC_FLUSH);
if (false === $decompressed) {
throw new StreamException('Failed adding data to deflate context');
}
return $decompressed;
});
}
/**
* {@inheritdoc}
*/
public function write(string $data): Promise
{
if (null === $this->deflate) {
throw new StreamException('The stream has already been closed');
}
/** @psalm-suppress UndefinedFunction,InvalidArgument */
$compressed = deflate_add($this->deflate, $data, ZLIB_SYNC_FLUSH);
if (false === $compressed) {
throw new StreamException('Failed adding data to deflate context');
}
return $this->stream->write($compressed);
}
/**
* {@inheritDoc}
*/
public function close(): void
{
$this->stream->close();
$this->inflate = null;
$this->deflate = null;
}
}

View File

@@ -0,0 +1,57 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\ByteStream\InputStream;
use Amp\Promise;
use Nsq\Bytes;
use Nsq\Exception\NotConnected;
use PHPinnacle\Buffer\ByteBuffer;
use function Amp\call;
final class NsqInputStream implements InputStream
{
private ByteBuffer $buffer;
public function __construct(
private InputStream $inputStream,
) {
$this->buffer = new ByteBuffer();
}
/**
* {@inheritDoc}
*/
public function read(): Promise
{
return call(function (): \Generator {
$buffer = $this->buffer;
while ($buffer->size() < Bytes::BYTES_SIZE) {
$bytes = yield $this->inputStream->read();
if (null === $bytes) {
throw new NotConnected();
}
$buffer->append($bytes);
}
$size = $buffer->consumeUint32();
while ($buffer->size() < $size) {
$bytes = yield $this->inputStream->read();
if (null === $bytes) {
throw new NotConnected();
}
$buffer->append($bytes);
}
return $buffer->consume($size);
});
}
}

View File

@@ -4,34 +4,39 @@ declare(strict_types=1);
namespace Nsq\Stream; namespace Nsq\Stream;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\Failure; use Amp\Failure;
use Amp\Promise; use Amp\Promise;
use Amp\Success; use Nsq\Exception\NotConnected;
use Nsq\Exception\NsqException;
use Nsq\Stream;
final class NullStream implements Stream final class NullStream implements InputStream, OutputStream
{ {
/** /**
* {@inheritdoc} * {@inheritDoc}
*/ */
public function read(): Promise public function read(): Promise
{ {
return new Success(null); return new Failure(new NotConnected());
} }
/** /**
* {@inheritdoc} * {@inheritDoc}
*
* @return Promise<void>
*/ */
public function write(string $data): Promise public function write(string $data): Promise
{ {
return new Failure(new NsqException('Connection closed.')); return new Failure(new NotConnected());
} }
/** /**
* {@inheritdoc} * {@inheritDoc}
*
* @return Promise<void>
*/ */
public function close(): void public function end(string $finalData = ''): Promise
{ {
return new Failure(new NotConnected());
} }
} }

View File

@@ -0,0 +1,106 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\ByteStream\InputStream;
use Amp\Promise;
use Nsq\Exception\NotConnected;
use PHPinnacle\Buffer\ByteBuffer;
use Psr\Log\LoggerInterface;
use function Amp\call;
final class SnappyInputStream implements InputStream
{
private ByteBuffer $buffer;
public function __construct(
private InputStream $inputStream,
private LoggerInterface $logger,
) {
if (!\function_exists('snappy_uncompress')) {
throw new \LogicException('Snappy extension not installed.');
}
$this->buffer = new ByteBuffer();
}
/**
* {@inheritDoc}
*/
public function read(): Promise
{
return call(function (): \Generator {
$buffer = $this->buffer;
while ($buffer->size() < 4) {
$bytes = yield $this->inputStream->read();
if (null === $bytes) {
throw new NotConnected();
}
$buffer->append($bytes);
}
/** @phpstan-ignore-next-line */
$chunkType = unpack('V', $buffer->consume(4))[1];
$size = $chunkType >> 8;
$chunkType &= 0xff;
$this->logger->debug('Snappy receive chunk [{chunk}], size [{size}]', [
'chunk' => $chunkType,
'size' => $size,
]);
while ($buffer->size() < $size) {
$bytes = yield $this->inputStream->read();
if (null === $bytes) {
throw new NotConnected();
}
$buffer->append($bytes);
}
switch ($chunkType) {
case 0xff:
$this->logger->debug('Snappy identifier chunk');
$buffer->discard(6); // discard identifier body
break;
case 0x00: // 'compressed',
$this->logger->debug('Snappy compressed chunk');
$data = $buffer
->discard(4) // discard checksum
->consume($size)
;
$this->logger->debug('Snappy compressed data [{data}]', ['data' => $data]);
return snappy_uncompress($data);
case 0x01: // 'uncompressed',
$this->logger->debug('Snappy uncompressed chunk');
$data = $buffer
->discard(4) // discard checksum
->consume($size)
;
$this->logger->debug('Snappy uncompressed data [{data}]', ['data' => $data]);
return $data;
case 0xfe:// 'padding',
$this->logger->debug('Snappy padding chunk');
$buffer->discard($size); // TODO ?
}
return $this->read();
});
}
}

View File

@@ -0,0 +1,74 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\ByteStream\OutputStream;
use Amp\Promise;
use PHPinnacle\Buffer\ByteBuffer;
final class SnappyOutputStream implements OutputStream
{
private ByteBuffer $buffer;
public function __construct(
private OutputStream $outputStream,
) {
if (!\function_exists('snappy_compress')) {
throw new \LogicException('Snappy extension not installed.');
}
$this->buffer = new ByteBuffer();
}
/**
* {@inheritDoc}
*
* @return Promise<void>
*/
public function write(string $data): Promise
{
$identifierFrame = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
$compressedFrame = 0x00;
$uncompressedFrame = 0x01; // 11
$maxChunkLength = 65536;
$buffer = $this->buffer;
foreach ($identifierFrame as $bite) {
$buffer->appendUint8($bite);
}
foreach (str_split($data, $maxChunkLength) as $chunk) {
$compressedChunk = snappy_compress($chunk);
[$chunk, $chunkType] = \strlen($compressedChunk) <= 0.875 * \strlen($data)
? [$compressedChunk, $compressedFrame]
: [$data, $uncompressedFrame];
/** @var string $checksum */
$checksum = hash('crc32c', $data, true);
/** @phpstan-ignore-next-line */
$checksum = unpack('N', $checksum)[1];
$maskedChecksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff;
$size = (\strlen($chunk) + 4) << 8;
$buffer->append(pack('V', $chunkType + $size));
$buffer->append(pack('V', $maskedChecksum));
$buffer->append($chunk);
}
return $this->outputStream->write($buffer->flush());
}
/**
* {@inheritDoc}
*
* @return Promise<void>
*/
public function end(string $finalData = ''): Promise
{
return $this->outputStream->end($finalData);
}
}

View File

@@ -1,122 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\Promise;
use Nsq\Buffer;
use Nsq\Exception\SnappyException;
use Nsq\Stream;
use function Amp\call;
class SnappyStream implements Stream
{
private const IDENTIFIER = [0xFF, 0x06, 0x00, 0x00, 0x73, 0x4E, 0x61, 0x50, 0x70, 0x59];
private const SIZE_HEADER = 4;
private const SIZE_CHECKSUM = 4;
private const SIZE_CHUNK = 65536;
private const TYPE_IDENTIFIER = 0xFF;
private const TYPE_COMPRESSED = 0x00;
private const TYPE_UNCOMPRESSED = 0x01;
private const TYPE_PADDING = 0xFE;
private Buffer $buffer;
public function __construct(private Stream $stream, string $bytes = '')
{
if (!\function_exists('snappy_uncompress') || !\function_exists('snappy_compress')) {
throw SnappyException::notInstalled();
}
$this->buffer = new Buffer($bytes);
}
/**
* {@inheritdoc}
*/
public function read(): Promise
{
return call(function (): \Generator {
if ($this->buffer->size() < self::SIZE_HEADER && null !== ($chunk = yield $this->stream->read())) {
$this->buffer->append($chunk);
}
$type = $this->buffer->readUInt32LE();
$size = $type >> 8;
$type &= 0xFF;
while ($this->buffer->size() < $size && null !== ($chunk = yield $this->stream->read())) {
$this->buffer->append($chunk);
}
switch ($type) {
case self::TYPE_IDENTIFIER:
$this->buffer->discard($size);
return $this->read();
case self::TYPE_COMPRESSED:
$this->buffer->discard(self::SIZE_CHECKSUM);
/** @psalm-suppress UndefinedFunction */
return snappy_uncompress($this->buffer->consume($size - self::SIZE_HEADER));
case self::TYPE_UNCOMPRESSED:
$this->buffer->discard(self::SIZE_CHECKSUM);
return $this->buffer->consume($size - self::SIZE_HEADER);
case self::TYPE_PADDING:
return $this->read();
default:
throw SnappyException::invalidHeader();
}
});
}
/**
* {@inheritdoc}
*/
public function write(string $data): Promise
{
return call(function () use ($data): Promise {
/** @var string $result */
$result = pack('CCCCCCCCCC', ...self::IDENTIFIER);
foreach (str_split($data, self::SIZE_CHUNK) as $chunk) {
$result .= $this->compress($chunk);
}
return $this->stream->write($result);
});
}
public function close(): void
{
$this->stream->close();
}
private function compress(string $uncompressed): string
{
/** @psalm-suppress UndefinedFunction */
$compressed = snappy_compress($uncompressed);
\assert(\is_string($compressed));
[$type, $data] = \strlen($compressed) <= 0.875 * \strlen($uncompressed)
? [self::TYPE_COMPRESSED, $compressed]
: [self::TYPE_UNCOMPRESSED, $uncompressed];
/** @psalm-suppress PossiblyFalseArgument */
$unpacked = unpack('N', hash('crc32c', $uncompressed, true));
\assert(\is_array($unpacked));
$checksum = $unpacked[1];
$checksum = (($checksum >> 15) | ($checksum << 17)) + 0xA282EAD8 & 0xFFFFFFFF;
$size = (\strlen($data) + 4) << 8;
/** @psalm-suppress PossiblyFalseOperand */
return pack('VV', $type + $size, $checksum).$data;
}
}

View File

@@ -1,82 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\Promise;
use Amp\Socket\ClientTlsContext;
use Amp\Socket\ConnectContext;
use Amp\Socket\EncryptableSocket;
use Nsq\Stream;
use function Amp\call;
use function Amp\Socket\connect;
class SocketStream implements Stream
{
public function __construct(private EncryptableSocket $socket)
{
}
/**
* @psalm-return Promise<self>
*/
public static function connect(string $uri, int $timeout = 0, int $attempts = 0, bool $noDelay = false): Promise
{
return call(function () use ($uri, $timeout, $attempts, $noDelay): \Generator {
$context = new ConnectContext();
if ($timeout > 0) {
$context = $context->withConnectTimeout($timeout);
}
if ($attempts > 0) {
$context = $context->withMaxAttempts($attempts);
}
if ($noDelay) {
$context = $context->withTcpNoDelay();
}
$context = $context->withTlsContext(
(new ClientTlsContext(''))
->withoutPeerVerification(),
);
return new self(yield connect($uri, $context));
});
}
/**
* @psalm-return Promise<null|string>
*/
public function read(): Promise
{
return $this->socket->read();
}
/**
* @psalm-return Promise<void>
*/
public function write(string $data): Promise
{
return $this->socket->write($data);
}
/**
* {@inheritDoc}
*/
public function close(): void
{
$this->socket->close();
}
/**
* @psalm-return Promise<void>
*/
public function setupTls(): Promise
{
return $this->socket->setupTls();
}
}

View File

@@ -7,11 +7,6 @@ use PHPUnit\Framework\TestCase;
final class ClientConfigTest extends TestCase final class ClientConfigTest extends TestCase
{ {
public function testNegotiationPayload(): void
{
self::assertJson((new ClientConfig())->asNegotiationPayload());
}
public function testInvalidCompression(): void public function testInvalidCompression(): void
{ {
$this->expectException(InvalidArgumentException::class); $this->expectException(InvalidArgumentException::class);
@@ -19,42 +14,4 @@ final class ClientConfigTest extends TestCase
new ClientConfig(deflate: true, snappy: true); new ClientConfig(deflate: true, snappy: true);
} }
/**
* @dataProvider array
*/
public function testFromArray(array $data, array $expected): void
{
self::assertSame($expected, get_object_vars(ClientConfig::fromArray($data)));
}
public function array(): Generator
{
$default = get_object_vars(new ClientConfig());
yield 'Empty array' => [[], $default];
yield 'With wrong keys' => [['bla' => 'bla'], $default];
$custom = [
'authSecret' => 'SomeSecret',
'connectTimeout' => 100,
'maxAttempts' => 10,
'tcpNoDelay' => true,
'rdyCount' => 1,
'featureNegotiation' => true,
'clientId' => 'SomeGorgeousClientId',
'deflate' => true,
'deflateLevel' => 1,
'heartbeatInterval' => 31111,
'hostname' => gethostname(),
'msgTimeout' => 59999,
'sampleRate' => 25,
'tls' => true,
'snappy' => false,
'userAgent' => 'nsqphp/test',
];
yield 'Full filled' => [$custom, $custom];
}
} }

View File

@@ -1,28 +0,0 @@
<?php
declare(strict_types=1);
use Nsq\ErrorType;
use Nsq\Frame\Error;
use PHPUnit\Framework\TestCase;
final class ErrorTypeTest extends TestCase
{
/**
* @dataProvider data
*/
public function testConstructor(Error $frame, bool $isConnectionTerminated): void
{
self::assertSame($isConnectionTerminated, ErrorType::terminable($frame));
}
/**
* @return \Generator<int, array{0: Error, 1: bool}>
*/
public function data(): Generator
{
yield [new Error('E_BAD_BODY'), true];
yield [new Error('bla_bla'), true];
yield [new Error('E_REQ_FAILED'), false];
}
}

View File

@@ -2,12 +2,12 @@
declare(strict_types=1); declare(strict_types=1);
use Amp\Loop;
use Amp\Success; use Amp\Success;
use Nsq\Consumer; use Nsq\Consumer;
use Nsq\Exception\MessageException; use Nsq\Exception\MessageAlreadyFinished;
use Nsq\Message; use Nsq\Protocol\Message;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use function Amp\Promise\wait;
final class MessageTest extends TestCase final class MessageTest extends TestCase
{ {
@@ -16,12 +16,16 @@ final class MessageTest extends TestCase
*/ */
public function testDoubleFinish(Message $message): void public function testDoubleFinish(Message $message): void
{ {
$this->expectException(MessageException::class); self::assertFalse($message->isFinished());
Loop::run(function () use ($message): Generator { wait($message->finish());
yield $message->finish();
yield $message->finish(); self::assertTrue($message->isFinished());
});
$this->expectException(MessageAlreadyFinished::class);
$this->expectExceptionMessage('Can\'t finish message as it already finished.');
wait($message->finish());
} }
/** /**
@@ -29,12 +33,16 @@ final class MessageTest extends TestCase
*/ */
public function testDoubleRequeue(Message $message): void public function testDoubleRequeue(Message $message): void
{ {
$this->expectException(MessageException::class); self::assertFalse($message->isFinished());
Loop::run(function () use ($message): Generator { wait($message->requeue(1));
yield $message->requeue(1);
yield $message->requeue(5); self::assertTrue($message->isFinished());
});
$this->expectException(MessageAlreadyFinished::class);
$this->expectExceptionMessage('Can\'t requeue message as it already finished.');
wait($message->requeue(5));
} }
/** /**
@@ -42,12 +50,14 @@ final class MessageTest extends TestCase
*/ */
public function testTouchAfterFinish(Message $message): void public function testTouchAfterFinish(Message $message): void
{ {
$this->expectException(MessageException::class); self::assertFalse($message->isFinished());
Loop::run(function () use ($message): Generator { wait($message->finish());
yield $message->finish();
yield $message->touch(); $this->expectException(MessageAlreadyFinished::class);
}); $this->expectExceptionMessage('Can\'t touch message as it already finished.');
wait($message->touch());
} }
/** /**
@@ -55,12 +65,11 @@ final class MessageTest extends TestCase
*/ */
public function messages(): Generator public function messages(): Generator
{ {
/** @phpstan-ignore-next-line */
$consumer = $this->createMock(Consumer::class); $consumer = $this->createMock(Consumer::class);
$consumer->method('fin')->willReturn(new Success()); $consumer->method('fin')->willReturn(new Success());
$consumer->method('touch')->willReturn(new Success()); $consumer->method('touch')->willReturn(new Success());
$consumer->method('req')->willReturn(new Success()); $consumer->method('req')->willReturn(new Success());
yield [new Message('id', 'body', 0, 0, $consumer)]; yield [new Message(0, 0, 'id', 'body', $consumer)];
} }
} }

View File

@@ -24,6 +24,7 @@ final class NsqTest extends TestCase
new ClientConfig( new ClientConfig(
heartbeatInterval: 3000, heartbeatInterval: 3000,
snappy: false, snappy: false,
readTimeout: 1,
), ),
]; ];
@@ -31,6 +32,7 @@ final class NsqTest extends TestCase
new ClientConfig( new ClientConfig(
heartbeatInterval: 3000, heartbeatInterval: 3000,
snappy: true, snappy: true,
readTimeout: 1,
), ),
]; ];
} }

View File

@@ -1,95 +0,0 @@
<?php
declare(strict_types=1);
use Symfony\Component\Filesystem\Filesystem;
use Symfony\Component\Process\Process;
final class Nsqd
{
public Process $process;
public string $address;
private static Filesystem $fs;
public function __construct(
public readonly string $dataPath,
public readonly int $httpPort,
public readonly int $tcpPort,
) {
self::$fs ??= new Filesystem();
self::$fs->mkdir($this->dataPath);
$nsqd = new Process([
'./nsqd',
sprintf('-data-path=%s', $this->dataPath),
sprintf('-http-address=0.0.0.0:%s', $this->httpPort),
sprintf('-tcp-address=0.0.0.0:%s', $this->tcpPort),
'-log-level=debug',
], dirname(__DIR__).'/bin');
$nsqd->start();
while (false === @fsockopen('localhost', $this->tcpPort)) {
if (!$nsqd->isRunning()) {
throw new RuntimeException($nsqd->getErrorOutput());
}
usleep(10000);
}
$this->process = $nsqd;
$this->address = sprintf('tcp://localhost:%s', $this->tcpPort);
}
public static function create(): self
{
do {
$dir = sprintf('/tmp/%s', bin2hex(random_bytes(5)));
} while (is_dir($dir));
return new self(
$dir,
findFreePort(),
findFreePort(),
);
}
public function tail(string $topic, string $channel, int $messages): Process
{
$tail = new Process(
[
'./nsq_tail',
sprintf('-nsqd-tcp-address=localhost:%s', $this->tcpPort),
sprintf('-topic=%s', $topic),
sprintf('-channel=%s', $channel),
sprintf('-n=%s', $messages),
'-print-topic',
],
dirname(__DIR__).'/bin',
timeout: 10,
);
$tail->start();
return $tail;
}
public function __destruct()
{
$this->process->stop();
self::$fs->remove($this->dataPath);
}
}
function findFreePort(): int
{
$sock = socket_create_listen(0);
assert($sock instanceof \Socket);
socket_getsockname($sock, $addr, $port);
socket_close($sock);
return $port;
}

View File

@@ -2,97 +2,33 @@
declare(strict_types=1); declare(strict_types=1);
use Amp\Loop; use Nsq\Exception\NsqError;
use Nsq\Exception\ServerException;
use Nsq\Producer; use Nsq\Producer;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use function Amp\Promise\wait; use function Amp\Promise\wait;
final class ProducerTest extends TestCase final class ProducerTest extends TestCase
{ {
/**
* @dataProvider bodies
*/
public function testPublish(string $body): void
{
$nsqd = Nsqd::create();
$tail = $nsqd->tail('test', 'test', 1);
$producer = Producer::create($nsqd->address);
wait($producer->connect());
self::assertTrue(wait($producer->publish('test', $body)));
self::assertSame(0, $tail->wait());
self::assertSame("test | {$body}", trim($tail->getOutput()));
}
/**
* @dataProvider bodies
*/
public function testPublishMultiple(string $body): void
{
$nsqd = Nsqd::create();
$tail = $nsqd->tail('test', 'test', 2);
$producer = Producer::create($nsqd->address);
wait($producer->connect());
self::assertTrue(wait($producer->publish('test', [$body, $body])));
self::assertSame(0, $tail->wait());
self::assertSame("test | {$body}\ntest | {$body}", trim($tail->getOutput()));
}
/**
* @dataProvider bodies
*/
public function testPublishDeferred(string $body): void
{
$nsqd = Nsqd::create();
$tail = $nsqd->tail('test', 'test', 1);
$producer = Producer::create($nsqd->address);
wait($producer->connect());
self::assertTrue(wait($producer->publish('test', $body, 1)));
self::assertSame(0, $tail->wait());
self::assertSame("test | {$body}", trim($tail->getOutput()));
}
/** /**
* @dataProvider pubFails * @dataProvider pubFails
*/ */
public function testPubFail(string $topic, string $body, string $exceptionMessage): void public function testPubFail(string $topic, string $body, string $exceptionMessage): void
{ {
$nsqd = Nsqd::create(); $this->expectException(NsqError::class);
$this->expectException(ServerException::class);
$this->expectExceptionMessage($exceptionMessage); $this->expectExceptionMessage($exceptionMessage);
$producer = Producer::create($nsqd->address); $producer = new Producer('tcp://localhost:4150');
Loop::run(static function () use ($producer, $topic, $body): Generator { wait($producer->connect());
yield $producer->connect(); wait($producer->pub($topic, $body));
yield $producer->publish($topic, $body);
});
} }
/**
* @return Generator<string, array>
*/
public function pubFails(): Generator public function pubFails(): Generator
{ {
yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0']; yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0'];
yield 'Invalid topic' => ['test$%^&', '', 'E_BAD_TOPIC PUB topic name "test$%^&" is not valid']; yield 'Invalid topic' => ['test$%^&', '', 'E_BAD_TOPIC PUB topic name "test$%^&" is not valid'];
} }
public function bodies(): Generator
{
yield 'Simple Body' => ['Simple Body'];
yield 'Body with special chars' => ['test$%^&'];
}
} }

View File

@@ -0,0 +1,30 @@
<?php
declare(strict_types=1);
namespace Protocol;
use Nsq\Protocol\ErrorType;
use PHPUnit\Framework\TestCase;
final class ErrorTypeTest extends TestCase
{
/**
* @dataProvider data
*/
public function testConstructor(string $type, bool $isConnectionTerminated): void
{
$errorType = new ErrorType($type);
self::assertSame($isConnectionTerminated, $errorType->terminateConnection);
}
/**
* @return \Generator<string, array<int, bool|string>>
*/
public function data(): \Generator
{
foreach ((new \ReflectionClass(ErrorType::class))->getConstants() as $constant => $isTerminated) {
yield $constant => [$constant, $isTerminated];
}
}
}