Compare commits
1 Commits
350f08c2c1
...
snappy
Author | SHA1 | Date | |
---|---|---|---|
87fe939018 |
21
.gitattributes
vendored
21
.gitattributes
vendored
@ -1,21 +0,0 @@
|
||||
# Exclude build/test files from archive
|
||||
/.editorconfig export-ignore
|
||||
/.gitattributes export-ignore
|
||||
/.github export-ignore
|
||||
/.gitignore export-ignore
|
||||
/.php_cs export-ignore
|
||||
/.php_cs.dist export-ignore
|
||||
/.psalm export-ignore
|
||||
/docs export-ignore
|
||||
/examples export-ignore
|
||||
/infection.json export-ignore
|
||||
/infection.json.dist export-ignore
|
||||
/phpstan.neon export-ignore
|
||||
/phpunit.xml export-ignore
|
||||
/phpunit.xml.dist export-ignore
|
||||
/psalm.xml export-ignore
|
||||
/tests export-ignore
|
||||
|
||||
# Configure diff output for .php and .phar files.
|
||||
*.php diff=php
|
||||
*.phar -diff
|
53
.github/workflows/ci.yaml
vendored
53
.github/workflows/ci.yaml
vendored
@ -14,7 +14,7 @@ jobs:
|
||||
os:
|
||||
- ubuntu-latest
|
||||
php:
|
||||
- '8.1'
|
||||
- '8.0'
|
||||
dependencies:
|
||||
- lowest
|
||||
- highest
|
||||
@ -33,7 +33,6 @@ jobs:
|
||||
with:
|
||||
php-version: ${{ matrix.php }}
|
||||
coverage: pcov
|
||||
extensions: kjdev/php-ext-snappy@0.2.1
|
||||
env:
|
||||
update: true
|
||||
|
||||
@ -77,7 +76,7 @@ jobs:
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.1'
|
||||
php-version: '8.0'
|
||||
env:
|
||||
update: true
|
||||
|
||||
@ -108,8 +107,7 @@ jobs:
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.1'
|
||||
extensions: snappy-kjdev/php-ext-snappy@0.2.1
|
||||
php-version: '8.0'
|
||||
env:
|
||||
update: true
|
||||
|
||||
@ -140,8 +138,7 @@ jobs:
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.1'
|
||||
extensions: snappy-kjdev/php-ext-snappy@0.2.1
|
||||
php-version: '8.0'
|
||||
env:
|
||||
update: true
|
||||
|
||||
@ -161,3 +158,45 @@ jobs:
|
||||
|
||||
- name: Run script
|
||||
run: vendor/bin/psalm --output-format=github
|
||||
|
||||
infection:
|
||||
name: Infection
|
||||
runs-on: ubuntu-latest
|
||||
services:
|
||||
nsqd:
|
||||
image: nsqio/nsq:v1.2.0
|
||||
options: --entrypoint /nsqd
|
||||
ports:
|
||||
- 4150:4150
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup PHP
|
||||
uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: '8.0'
|
||||
coverage: pcov
|
||||
env:
|
||||
update: true
|
||||
|
||||
- name: Get composer cache directory
|
||||
id: composer-cache
|
||||
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.composer-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
|
||||
restore-keys: ${{ runner.os }}-composer-
|
||||
|
||||
- name: Install dependencies
|
||||
run: composer update --no-progress --no-interaction --prefer-dist
|
||||
|
||||
- name: Run script
|
||||
env:
|
||||
STRYKER_DASHBOARD_API_KEY: ${{ secrets.STRYKER_DASHBOARD_API_KEY }}
|
||||
run: |
|
||||
git fetch --depth=1 origin $GITHUB_BASE_REF
|
||||
php vendor/bin/infection -j2 --git-diff-filter=A --git-diff-base=origin/$GITHUB_BASE_REF --logger-github --ignore-msi-with-no-mutations --only-covered
|
||||
|
2
.gitignore
vendored
2
.gitignore
vendored
@ -1,6 +1,6 @@
|
||||
/vendor/
|
||||
/composer.lock
|
||||
|
||||
/.php-cs-fixer.cache
|
||||
/.php_cs.cache
|
||||
/.phpunit.result.cache
|
||||
/infection.log
|
||||
|
@ -1,45 +0,0 @@
|
||||
<?php
|
||||
|
||||
$finder = PhpCsFixer\Finder::create()
|
||||
->in(__DIR__)
|
||||
->exclude('vendor');
|
||||
|
||||
return (new PhpCsFixer\Config())
|
||||
->setRiskyAllowed(true)
|
||||
->setRules([
|
||||
'@PhpCsFixer' => true,
|
||||
'@PhpCsFixer:risky' => true,
|
||||
'@PSR12' => true,
|
||||
'@PSR12:risky' => true,
|
||||
'blank_line_before_statement' => [
|
||||
'statements' => [
|
||||
'continue',
|
||||
'do',
|
||||
'exit',
|
||||
'goto',
|
||||
'if',
|
||||
'return',
|
||||
'switch',
|
||||
'throw',
|
||||
'try',
|
||||
],
|
||||
],
|
||||
'declare_strict_types' => true,
|
||||
'global_namespace_import' => [
|
||||
'import_classes' => false,
|
||||
'import_constants' => false,
|
||||
'import_functions' => false,
|
||||
],
|
||||
'php_unit_internal_class' => false,
|
||||
'php_unit_test_case_static_method_calls' => ['call_type' => 'self'],
|
||||
'php_unit_test_class_requires_covers' => false,
|
||||
'phpdoc_to_comment' => false,
|
||||
'yoda_style' => true,
|
||||
'trailing_comma_in_multiline' => [
|
||||
'after_heredoc' => true,
|
||||
'elements' => ['arrays', 'arguments', 'parameters'],
|
||||
],
|
||||
'types_spaces' => ['space' => 'single'],
|
||||
])
|
||||
->setFinder($finder)
|
||||
->setCacheFile(__DIR__.'/.php-cs-fixer.cache');
|
23
.php_cs.dist
Normal file
23
.php_cs.dist
Normal file
@ -0,0 +1,23 @@
|
||||
#!/usr/bin/env php
|
||||
<?php
|
||||
|
||||
return (new PhpCsFixer\Config())
|
||||
->setRiskyAllowed(true)
|
||||
->setRules([
|
||||
'@PhpCsFixer' => true,
|
||||
'@PhpCsFixer:risky' => true,
|
||||
'@PSR12' => true,
|
||||
'@PSR12:risky' => true,
|
||||
'declare_strict_types' => true,
|
||||
'php_unit_internal_class' => false,
|
||||
'php_unit_test_class_requires_covers' => false,
|
||||
'yoda_style' => true,
|
||||
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
|
||||
'blank_line_before_statement' => [
|
||||
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try']
|
||||
],
|
||||
])
|
||||
->setFinder(
|
||||
PhpCsFixer\Finder::create()
|
||||
->in(__DIR__)
|
||||
);
|
20
Makefile
20
Makefile
@ -1,20 +0,0 @@
|
||||
|
||||
all: install composer-validate php-cs-fixer psalm phpstan phpunit
|
||||
|
||||
install:
|
||||
composer install
|
||||
|
||||
psalm:
|
||||
php vendor/bin/psalm
|
||||
|
||||
phpstan:
|
||||
php vendor/bin/phpstan analyse
|
||||
|
||||
phpunit:
|
||||
php vendor/bin/phpunit
|
||||
|
||||
php-cs-fixer:
|
||||
php vendor/bin/php-cs-fixer fix
|
||||
|
||||
composer-validate:
|
||||
composer validate
|
70
README.md
70
README.md
@ -1,6 +1,6 @@
|
||||
# Nsq PHP
|
||||
|
||||
<img src="https://github.com/nsqphp/nsqphp/raw/main/docs/logo.png" alt="" align="left" width="150">
|
||||
<img src="https://github.com/nsqphp/nsqphp/raw/main/logo.png" alt="" align="left" width="150">
|
||||
|
||||
PHP Client for [NSQ](https://nsq.io/).
|
||||
|
||||
@ -31,75 +31,67 @@ Features
|
||||
- [x] PUB
|
||||
- [x] SUB
|
||||
- [X] Feature Negotiation
|
||||
- [X] Discovery
|
||||
- [ ] Discovery
|
||||
- [ ] Backoff
|
||||
- [X] TLS
|
||||
- [X] Deflate
|
||||
- [X] Snappy
|
||||
- [ ] TLS
|
||||
- [ ] Snappy
|
||||
- [X] Sampling
|
||||
- [X] AUTH
|
||||
|
||||
Usage
|
||||
-----
|
||||
|
||||
### Producer
|
||||
### Publish
|
||||
|
||||
```php
|
||||
use Nsq\Producer;
|
||||
|
||||
$producer = Producer::create(address: 'tcp://nsqd:4150');
|
||||
$producer = new Producer(address: 'tcp://nsqd:4150');
|
||||
|
||||
// Publish a message to a topic
|
||||
$producer->publish('topic', 'Simple message');
|
||||
$producer->pub('topic', 'Simple message');
|
||||
|
||||
// Publish multiple messages to a topic (atomically)
|
||||
$producer->publish('topic', [
|
||||
$producer->mpub('topic', [
|
||||
'Message one',
|
||||
'Message two',
|
||||
]);
|
||||
|
||||
// Publish a deferred message to a topic
|
||||
$producer->publish('topic', 'Deferred message', delay: 5000);
|
||||
$producer->dpub('topic', 'Deferred message', delay: 5000);
|
||||
```
|
||||
|
||||
### Consumer
|
||||
### Subscription
|
||||
|
||||
```php
|
||||
use Nsq\Consumer;
|
||||
use Nsq\Message;
|
||||
use Nsq\Subscriber;
|
||||
|
||||
$consumer = Consumer::create(
|
||||
address: 'tcp://nsqd:4150',
|
||||
topic: 'topic',
|
||||
channel: 'channel',
|
||||
onMessage: static function (Message $message): Generator {
|
||||
yield $message->touch(); // Reset the timeout for an in-flight message
|
||||
yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
|
||||
yield $message->finish(); // Finish a message (indicate successful processing)
|
||||
},
|
||||
);
|
||||
```
|
||||
$consumer = new Consumer('tcp://nsqd:4150');
|
||||
$subscriber = new Subscriber($consumer);
|
||||
|
||||
### Lookup
|
||||
$generator = $subscriber->subscribe('topic', 'channel');
|
||||
foreach ($generator as $message) {
|
||||
if ($message instanceof Message) {
|
||||
$payload = $message->body;
|
||||
|
||||
```php
|
||||
use Nsq\Lookup;
|
||||
use Nsq\Message;
|
||||
// handle message
|
||||
|
||||
$lookup = new Lookup('http://nsqlookupd0:4161');
|
||||
$lookup = new Lookup(['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161']);
|
||||
$message->touch(); // Reset the timeout for an in-flight message
|
||||
$message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
|
||||
$message->finish(); // Finish a message (indicate successful processing)
|
||||
}
|
||||
|
||||
// In case of nothing received during timeout generator will return NULL
|
||||
// Here we can do something between messages, like pcntl_signal_dispatch()
|
||||
|
||||
// We can also communicate with generator through send
|
||||
// for example:
|
||||
|
||||
$callable = static function (Message $message): Generator {
|
||||
yield $message->touch(); // Reset the timeout for an in-flight message
|
||||
yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
|
||||
yield $message->finish(); // Finish a message (indicate successful processing)
|
||||
};
|
||||
|
||||
$lookup->subscribe(topic: 'topic', channel: 'channel', onMessage: $callable);
|
||||
$lookup->subscribe(topic: 'anotherTopic', channel: 'channel', onMessage: $callable);
|
||||
|
||||
$lookup->unsubscribe(topic: 'local', channel: 'channel');
|
||||
$lookup->stop(); // unsubscribe all
|
||||
// Gracefully close connection (loop will be ended)
|
||||
$generator->send(Subscriber::STOP);
|
||||
}
|
||||
```
|
||||
|
||||
### Integrations
|
||||
|
@ -11,49 +11,38 @@
|
||||
}
|
||||
],
|
||||
"require": {
|
||||
"php": "^8.1",
|
||||
"php": "^8.0.1",
|
||||
"ext-json": "*",
|
||||
"amphp/http-client": "^4.6",
|
||||
"amphp/socket": "^1.1",
|
||||
"clue/socket-raw": "^1.5",
|
||||
"composer/semver": "^3.2",
|
||||
"phpinnacle/buffer": "^1.2",
|
||||
"psr/log": "^3.0"
|
||||
"psr/log": "^1.1"
|
||||
},
|
||||
"require-dev": {
|
||||
"amphp/log": "^1.1",
|
||||
"dg/bypass-finals": "^1.3",
|
||||
"ergebnis/composer-normalize": "^2.15",
|
||||
"friendsofphp/php-cs-fixer": "^3.4",
|
||||
"ergebnis/composer-normalize": "9999999-dev",
|
||||
"friendsofphp/php-cs-fixer": "^2.18",
|
||||
"infection/infection": "^0.20.2",
|
||||
"nyholm/nsa": "^1.2",
|
||||
"phpstan/phpstan": "^1.8",
|
||||
"phpstan/phpstan-phpunit": "^1.1",
|
||||
"phpstan/phpstan-strict-rules": "^1.3",
|
||||
"phpstan/phpstan": "^0.12.68",
|
||||
"phpstan/phpstan-phpunit": "^0.12.17",
|
||||
"phpstan/phpstan-strict-rules": "^0.12.9",
|
||||
"phpunit/phpunit": "^9.5",
|
||||
"symfony/var-dumper": "^6.1",
|
||||
"vimeo/psalm": "^4.4"
|
||||
},
|
||||
"config": {
|
||||
"sort-packages": true,
|
||||
"allow-plugins": {
|
||||
"ergebnis/composer-normalize": true,
|
||||
"infection/extension-installer": true
|
||||
}
|
||||
"sort-packages": true
|
||||
},
|
||||
"autoload": {
|
||||
"psr-4": {
|
||||
"Nsq\\": "src/"
|
||||
}
|
||||
},
|
||||
"autoload-dev": {
|
||||
"files": [
|
||||
"vendor/symfony/var-dumper/Resources/functions/dump.php"
|
||||
]
|
||||
},
|
||||
"minimum-stability": "dev",
|
||||
"prefer-stable": true,
|
||||
"scripts": {
|
||||
"cs": [
|
||||
"vendor/bin/php-cs-fixer fix --using-cache=no"
|
||||
"vendor/bin/php-cs-fixer fix"
|
||||
],
|
||||
"cs-check": [
|
||||
"vendor/bin/php-cs-fixer fix --verbose --diff --dry-run"
|
||||
@ -70,7 +59,7 @@
|
||||
"vendor/bin/psalm"
|
||||
],
|
||||
"test": [
|
||||
"@norm",
|
||||
"@norm-check",
|
||||
"@cs",
|
||||
"@phpstan",
|
||||
"@psalm",
|
||||
|
@ -2,56 +2,14 @@ version: '3.7'
|
||||
|
||||
services:
|
||||
nsqd:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqd'
|
||||
command: >-
|
||||
nsqd
|
||||
--log-level debug
|
||||
--lookupd-tcp-address nsqlookupd0:4160
|
||||
--lookupd-tcp-address nsqlookupd1:4160
|
||||
--lookupd-tcp-address nsqlookupd2:4160
|
||||
|
||||
nsqlookupd0:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqlookupd0'
|
||||
command: /nsqlookupd -log-level debug
|
||||
|
||||
nsqlookupd1:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqlookupd1'
|
||||
command: /nsqlookupd -log-level debug
|
||||
|
||||
nsqlookupd2:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqlookupd2'
|
||||
command: /nsqlookupd -log-level debug
|
||||
image: nsqio/nsq:v1.2.0
|
||||
command: /nsqd
|
||||
ports:
|
||||
- 4150:4150
|
||||
- 4151:4151
|
||||
|
||||
nsqadmin:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
labels:
|
||||
ru.grachevko.dhu: 'nsqadmin'
|
||||
command:
|
||||
- nsqadmin
|
||||
- --http-address=0.0.0.0:4171
|
||||
- --lookupd-http-address=nsqlookupd0:4161
|
||||
- --lookupd-http-address=nsqlookupd1:4161
|
||||
- --lookupd-http-address=nsqlookupd2:4161
|
||||
depends_on:
|
||||
- nsqlookupd0
|
||||
- nsqlookupd1
|
||||
- nsqlookupd2
|
||||
|
||||
tail:
|
||||
image: nsqio/nsq:v${NSQ_VERSION}
|
||||
command: >-
|
||||
nsq_tail
|
||||
--channel nsq_tail
|
||||
--topic local
|
||||
--lookupd-http-address nsqlookupd1:4161
|
||||
depends_on:
|
||||
- nsqd
|
||||
- nsqlookupd1
|
||||
image: nsqio/nsq:v1.2.0
|
||||
command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171
|
||||
ports:
|
||||
- 4171:4171
|
||||
|
@ -1,47 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
require dirname(__DIR__).'/../vendor/autoload.php';
|
||||
|
||||
use Amp\ByteStream;
|
||||
use Amp\Log\ConsoleFormatter;
|
||||
use Amp\Log\StreamHandler;
|
||||
use Amp\Loop;
|
||||
use Monolog\Logger;
|
||||
use Monolog\Processor\PsrLogMessageProcessor;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Config\LookupConfig;
|
||||
use Nsq\Lookup;
|
||||
use Nsq\Message;
|
||||
|
||||
Loop::run(static function () {
|
||||
$handler = new StreamHandler(ByteStream\getStdout());
|
||||
$handler->setFormatter(new ConsoleFormatter());
|
||||
$logger = new Logger('consumer', [$handler], [new PsrLogMessageProcessor()]);
|
||||
|
||||
$callable = static function (Message $message) {
|
||||
yield $message->finish();
|
||||
};
|
||||
|
||||
$clientConfig = new ClientConfig();
|
||||
|
||||
$lookupConfig = new LookupConfig();
|
||||
|
||||
$watcherId = Loop::repeat(5000, function () {
|
||||
yield Amp\Dns\resolver()->reloadConfig();
|
||||
});
|
||||
|
||||
$lookup = Lookup::create(
|
||||
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
|
||||
$lookupConfig,
|
||||
$logger,
|
||||
);
|
||||
|
||||
$lookup->subscribe('local', 'local', $callable, $clientConfig);
|
||||
|
||||
Loop::delay(10000, function () use ($lookup, $watcherId) {
|
||||
$lookup->stop();
|
||||
Loop::cancel($watcherId);
|
||||
});
|
||||
});
|
@ -1,102 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
require dirname(__DIR__).'/../vendor/autoload.php';
|
||||
|
||||
use Amp\ByteStream;
|
||||
use Amp\Log\ConsoleFormatter;
|
||||
use Amp\Log\StreamHandler;
|
||||
use Amp\Loop;
|
||||
use Monolog\Logger;
|
||||
use Monolog\Processor\PsrLogMessageProcessor;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Config\LookupConfig;
|
||||
use Nsq\Lookup;
|
||||
use Nsq\Producer;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\delay;
|
||||
|
||||
Loop::run(static function () {
|
||||
$handler = new StreamHandler(ByteStream\getStdout());
|
||||
$handler->setFormatter(new ConsoleFormatter());
|
||||
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
|
||||
|
||||
$clientConfig = new ClientConfig();
|
||||
|
||||
/** @var Producer[] $producers */
|
||||
$producers = [];
|
||||
|
||||
$lookupConfig = new LookupConfig();
|
||||
|
||||
$lookup = Lookup::create(
|
||||
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
|
||||
$lookupConfig,
|
||||
$logger,
|
||||
);
|
||||
|
||||
$isRunning = true;
|
||||
|
||||
asyncCall(static function () use ($lookup, $clientConfig, $logger, &$producers, &$isRunning) {
|
||||
while ($isRunning) {
|
||||
/** @var Lookup\Producer[] $nodes */
|
||||
$nodes = yield $lookup->nodes();
|
||||
|
||||
foreach ($nodes as $node) {
|
||||
$address = $node->toTcpUri();
|
||||
|
||||
if (array_key_exists($address, $producers)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
asyncCall(function () use ($address, $clientConfig, $logger, &$producers) {
|
||||
yield ($producers[$address] = Producer::create($address, $clientConfig, $logger))
|
||||
->onClose(function () use (&$producers, $address) {
|
||||
unset($producers[$address]);
|
||||
})
|
||||
->connect()
|
||||
;
|
||||
});
|
||||
}
|
||||
|
||||
yield delay(5000);
|
||||
|
||||
yield Amp\Dns\resolver()->reloadConfig(); // for reload /etc/hosts
|
||||
}
|
||||
});
|
||||
|
||||
Loop::delay(5000, function () use (&$isRunning, $logger) {
|
||||
$logger->info('Stopping producer.');
|
||||
|
||||
$isRunning = false;
|
||||
});
|
||||
|
||||
$counter = 0;
|
||||
while (true) {
|
||||
if (!$isRunning) {
|
||||
foreach ($producers as $producer) {
|
||||
$producer->close();
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if ([] === $producers) {
|
||||
yield delay(200);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
$index = array_rand($producers);
|
||||
$producer = $producers[$index];
|
||||
|
||||
if (!$producer->isConnected()) {
|
||||
yield delay(100);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
yield $producer->publish('local', 'This is message of count '.$counter++);
|
||||
}
|
||||
});
|
@ -1,44 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
require __DIR__.'/../vendor/autoload.php';
|
||||
|
||||
use Amp\ByteStream;
|
||||
use Amp\Log\ConsoleFormatter;
|
||||
use Amp\Log\StreamHandler;
|
||||
use Amp\Loop;
|
||||
use Amp\Promise;
|
||||
use Monolog\Logger;
|
||||
use Monolog\Processor\PsrLogMessageProcessor;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Consumer;
|
||||
use Nsq\Message;
|
||||
|
||||
use function Amp\call;
|
||||
|
||||
Loop::run(static function () {
|
||||
$handler = new StreamHandler(ByteStream\getStdout());
|
||||
$handler->setFormatter(new ConsoleFormatter());
|
||||
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
|
||||
|
||||
$consumer = new Consumer(
|
||||
'tcp://localhost:4150',
|
||||
topic: 'local',
|
||||
channel: 'local',
|
||||
onMessage: static function (Message $message) use ($logger): Promise {
|
||||
return call(function () use ($message, $logger): Generator {
|
||||
$logger->info('Received: {body}', ['body' => $message->body]);
|
||||
|
||||
yield $message->finish();
|
||||
});
|
||||
},
|
||||
clientConfig: new ClientConfig(
|
||||
deflate: false,
|
||||
snappy: true,
|
||||
),
|
||||
logger: $logger,
|
||||
);
|
||||
|
||||
yield $consumer->connect();
|
||||
});
|
@ -1,36 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
require __DIR__.'/../vendor/autoload.php';
|
||||
|
||||
use Amp\ByteStream;
|
||||
use Amp\Log\ConsoleFormatter;
|
||||
use Amp\Log\StreamHandler;
|
||||
use Amp\Loop;
|
||||
use Monolog\Logger;
|
||||
use Monolog\Processor\PsrLogMessageProcessor;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Producer;
|
||||
|
||||
Loop::run(static function () {
|
||||
$handler = new StreamHandler(ByteStream\getStdout());
|
||||
$handler->setFormatter(new ConsoleFormatter());
|
||||
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
|
||||
|
||||
$producer = new Producer(
|
||||
'tcp://localhost:4150',
|
||||
clientConfig: new ClientConfig(
|
||||
deflate: false,
|
||||
heartbeatInterval: 5000,
|
||||
snappy: true,
|
||||
),
|
||||
logger: $logger,
|
||||
);
|
||||
|
||||
yield $producer->connect();
|
||||
|
||||
while (true) {
|
||||
yield $producer->publish(topic: 'local', body: array_fill(0, 200, 'Message body!'));
|
||||
}
|
||||
});
|
Before Width: | Height: | Size: 98 KiB After Width: | Height: | Size: 98 KiB |
@ -8,9 +8,3 @@ parameters:
|
||||
paths:
|
||||
- src
|
||||
- tests
|
||||
ignoreErrors:
|
||||
-
|
||||
message: '#no value type specified in iterable type array#'
|
||||
paths:
|
||||
- %currentWorkingDirectory%/src
|
||||
- %currentWorkingDirectory%/tests
|
||||
|
@ -1,5 +1,6 @@
|
||||
<?xml version="1.0"?>
|
||||
<psalm
|
||||
allowPhpStormGenerics="true"
|
||||
ignoreInternalFunctionFalseReturn="false"
|
||||
ignoreInternalFunctionNullReturn="false"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
|
@ -1,37 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use PHPinnacle\Buffer\ByteBuffer;
|
||||
|
||||
/**
|
||||
* @psalm-suppress
|
||||
*/
|
||||
final class Buffer extends ByteBuffer
|
||||
{
|
||||
public function readUInt32LE(): int
|
||||
{
|
||||
$unpacked = unpack('V', $this->consume(4));
|
||||
|
||||
\assert(\is_array($unpacked) && \array_key_exists(1, $unpacked));
|
||||
|
||||
return $unpacked[1];
|
||||
}
|
||||
|
||||
public function consumeTimestamp(): int
|
||||
{
|
||||
return $this->consumeUint64();
|
||||
}
|
||||
|
||||
public function consumeAttempts(): int
|
||||
{
|
||||
return $this->consumeUint16();
|
||||
}
|
||||
|
||||
public function consumeMessageID(): string
|
||||
{
|
||||
return $this->consume(16);
|
||||
}
|
||||
}
|
111
src/Command.php
111
src/Command.php
@ -1,111 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use PHPinnacle\Buffer\ByteBuffer;
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
final class Command
|
||||
{
|
||||
public static function magic(): string
|
||||
{
|
||||
return ' V2';
|
||||
}
|
||||
|
||||
public static function identify(string $data): string
|
||||
{
|
||||
return self::pack('IDENTIFY', data: $data);
|
||||
}
|
||||
|
||||
public static function auth(?string $authSecret): string
|
||||
{
|
||||
return self::pack('AUTH', data: $authSecret);
|
||||
}
|
||||
|
||||
public static function nop(): string
|
||||
{
|
||||
return self::pack('NOP');
|
||||
}
|
||||
|
||||
public static function cls(): string
|
||||
{
|
||||
return self::pack('CLS');
|
||||
}
|
||||
|
||||
public static function rdy(int $count): string
|
||||
{
|
||||
return self::pack('RDY', (string) $count);
|
||||
}
|
||||
|
||||
public static function fin(string $id): string
|
||||
{
|
||||
return self::pack('FIN', $id);
|
||||
}
|
||||
|
||||
public static function req(string $id, int $timeout): string
|
||||
{
|
||||
return self::pack('REQ', [$id, $timeout]);
|
||||
}
|
||||
|
||||
public static function touch(string $id): string
|
||||
{
|
||||
return self::pack('TOUCH', $id);
|
||||
}
|
||||
|
||||
public static function pub(string $topic, string $body): string
|
||||
{
|
||||
return self::pack('PUB', $topic, $body);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, string> $bodies
|
||||
*/
|
||||
public static function mpub(string $topic, array $bodies): string
|
||||
{
|
||||
static $buffer;
|
||||
$buffer ??= new ByteBuffer();
|
||||
|
||||
$buffer->appendUint32(\count($bodies));
|
||||
|
||||
foreach ($bodies as $body) {
|
||||
$buffer->appendUint32(\strlen($body));
|
||||
$buffer->append($body);
|
||||
}
|
||||
|
||||
return self::pack('MPUB', $topic, $buffer->flush());
|
||||
}
|
||||
|
||||
public static function dpub(string $topic, string $body, int $delay): string
|
||||
{
|
||||
return self::pack('DPUB', [$topic, $delay], $body);
|
||||
}
|
||||
|
||||
public static function sub(string $topic, string $channel): string
|
||||
{
|
||||
return self::pack('SUB', [$topic, $channel]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, scalar>|string $params
|
||||
*/
|
||||
private static function pack(string $command, array | string $params = [], string $data = null): string
|
||||
{
|
||||
static $buffer;
|
||||
$buffer ??= new Buffer();
|
||||
|
||||
$command = implode(' ', [$command, ...((array) $params)]);
|
||||
|
||||
$buffer->append($command.PHP_EOL);
|
||||
|
||||
if (null !== $data) {
|
||||
$buffer->appendUint32(\strlen($data));
|
||||
$buffer->append($data);
|
||||
}
|
||||
|
||||
return $buffer->flush();
|
||||
}
|
||||
}
|
@ -5,6 +5,9 @@ declare(strict_types=1);
|
||||
namespace Nsq\Config;
|
||||
|
||||
use Composer\InstalledVersions;
|
||||
use InvalidArgumentException;
|
||||
use JsonSerializable;
|
||||
use function gethostname;
|
||||
|
||||
/**
|
||||
* This class is used for configuring the clients for nsq. Immutable properties must be set when creating the object and
|
||||
@ -13,51 +16,25 @@ use Composer\InstalledVersions;
|
||||
*
|
||||
* @psalm-immutable
|
||||
*/
|
||||
final class ClientConfig
|
||||
final class ClientConfig implements JsonSerializable
|
||||
{
|
||||
/**
|
||||
* @psalm-suppress ImpureFunctionCall
|
||||
*/
|
||||
/** @psalm-suppress ImpureFunctionCall */
|
||||
public function __construct(
|
||||
/**
|
||||
/*
|
||||
* The secret used for authorization, if the server requires it. This value will be ignored if the server
|
||||
* does not require authorization.
|
||||
*/
|
||||
public ?string $authSecret = null,
|
||||
|
||||
/**
|
||||
* The timeout for establishing a connection in milliseconds.
|
||||
*/
|
||||
public int $connectTimeout = 10000,
|
||||
// The timeout for establishing a connection in seconds.
|
||||
public int $connectTimeout = 10,
|
||||
|
||||
/**
|
||||
* The max attempts for establishing a connection.
|
||||
*/
|
||||
public int $maxAttempts = 0,
|
||||
|
||||
/**
|
||||
* Use tcp_nodelay for establishing a connection.
|
||||
*/
|
||||
public bool $tcpNoDelay = false,
|
||||
public int $rdyCount = 100,
|
||||
|
||||
/**
|
||||
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
||||
* it will send back a JSON payload of supported features and metadata.
|
||||
*/
|
||||
public bool $featureNegotiation = true,
|
||||
|
||||
/**
|
||||
* An identifier used to disambiguate this client (i.e. something specific to the consumer).
|
||||
*/
|
||||
// An identifier used to disambiguate this client (i.e. something specific to the consumer)
|
||||
public string $clientId = '',
|
||||
|
||||
/**
|
||||
* Enable deflate compression for this connection. A client cannot enable both [snappy] and [deflate].
|
||||
*/
|
||||
// Enable deflate compression for this connection. A client cannot enable both [snappy] and [deflate].
|
||||
public bool $deflate = false,
|
||||
|
||||
/**
|
||||
/*
|
||||
* Configure the deflate compression level for this connection.
|
||||
*
|
||||
* Valid range: `1 <= deflate_level <= configured_max`
|
||||
@ -66,43 +43,42 @@ final class ClientConfig
|
||||
*/
|
||||
public int $deflateLevel = 6,
|
||||
|
||||
/**
|
||||
/*
|
||||
* Milliseconds between heartbeats.
|
||||
*
|
||||
* Valid range: `1000 <= heartbeat_interval <= configured_max` (`-1` disables heartbeats)
|
||||
*/
|
||||
public int $heartbeatInterval = 30000,
|
||||
|
||||
/**
|
||||
* The hostname where the client is deployed.
|
||||
*/
|
||||
// The hostname where the client is deployed
|
||||
public string $hostname = '',
|
||||
|
||||
/**
|
||||
* Configure the server-side message timeout in milliseconds for messages delivered to this client.
|
||||
*/
|
||||
// Configure the server-side message timeout in milliseconds for messages delivered to this client.
|
||||
public int $msgTimeout = 60000,
|
||||
|
||||
/**
|
||||
/*
|
||||
* The sample rate for incoming data to deliver a percentage of all messages received to this connection.
|
||||
* This only applies to subscribing connections. The valid range is between 0 and 99, where 0 means that all
|
||||
* data is sent (this is the default). 1 means that 1% of the data is sent.
|
||||
*/
|
||||
public int $sampleRate = 0,
|
||||
|
||||
/**
|
||||
* Enable TLS for this connection.
|
||||
/*
|
||||
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
||||
* it will send back a JSON payload of supported features and metadata.
|
||||
*/
|
||||
public bool $featureNegotiation = true,
|
||||
|
||||
// Enable TLS for this connection
|
||||
public bool $tls = false,
|
||||
|
||||
/**
|
||||
* Enable snappy compression for this connection. A client cannot enable both [snappy] and [deflate].
|
||||
*/
|
||||
// Enable snappy compression for this connection. A client cannot enable both [snappy] and [deflate].
|
||||
public bool $snappy = false,
|
||||
|
||||
/**
|
||||
* A string identifying the agent for this client in the spirit of HTTP.
|
||||
*/
|
||||
// The read timeout for connection sockets and for awaiting responses from nsq.
|
||||
public int $readTimeout = 5,
|
||||
|
||||
// A string identifying the agent for this client in the spirit of HTTP.
|
||||
public string $userAgent = '',
|
||||
) {
|
||||
$this->featureNegotiation = true; // Always enabled
|
||||
@ -116,18 +92,16 @@ final class ClientConfig
|
||||
}
|
||||
|
||||
if ($this->snappy && $this->deflate) {
|
||||
throw new \InvalidArgumentException('Client cannot enable both [snappy] and [deflate]');
|
||||
throw new InvalidArgumentException('Client cannot enable both [snappy] and [deflate]');
|
||||
}
|
||||
}
|
||||
|
||||
public static function fromArray(array $array): self
|
||||
/**
|
||||
* @phpstan-ignore-next-line
|
||||
*/
|
||||
public function jsonSerialize(): array
|
||||
{
|
||||
return new self(...array_intersect_key($array, get_class_vars(self::class)));
|
||||
}
|
||||
|
||||
public function asNegotiationPayload(): string
|
||||
{
|
||||
$data = [
|
||||
return [
|
||||
'client_id' => $this->clientId,
|
||||
'deflate' => $this->deflate,
|
||||
'deflate_level' => $this->deflateLevel,
|
||||
@ -140,7 +114,5 @@ final class ClientConfig
|
||||
'tls_v1' => $this->tls,
|
||||
'user_agent' => $this->userAgent,
|
||||
];
|
||||
|
||||
return json_encode($data, JSON_THROW_ON_ERROR);
|
||||
}
|
||||
}
|
||||
|
@ -8,82 +8,58 @@ namespace Nsq\Config;
|
||||
* The configuration object that holds the config status for a single Connection.
|
||||
*
|
||||
* @psalm-immutable
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
final class ServerConfig
|
||||
final class ConnectionConfig
|
||||
{
|
||||
public function __construct(
|
||||
/**
|
||||
* Whether or not authorization is required by nsqd.
|
||||
*/
|
||||
// Whether or not authorization is required by nsqd.
|
||||
public bool $authRequired,
|
||||
|
||||
/**
|
||||
* Whether deflate compression is enabled for this connection or not.
|
||||
*/
|
||||
// Whether deflate compression is enabled for this connection or not.
|
||||
public bool $deflate,
|
||||
|
||||
/**
|
||||
* The deflate level. This value can be ignored if [deflate] is `false`.
|
||||
*/
|
||||
// The deflate level. This value can be ignored if [deflate] is `false`.
|
||||
public int $deflateLevel,
|
||||
|
||||
/**
|
||||
* The maximum deflate level supported by the server.
|
||||
*/
|
||||
// The maximum deflate level supported by the server.
|
||||
public int $maxDeflateLevel,
|
||||
|
||||
/**
|
||||
* The maximum value for message timeout.
|
||||
*/
|
||||
// The maximum value for message timeout.
|
||||
public int $maxMsgTimeout,
|
||||
|
||||
/**
|
||||
/*
|
||||
* Each nsqd is configurable with a max-rdy-count. If the consumer sends a RDY count that is outside
|
||||
* of the acceptable range its connection will be forcefully closed.
|
||||
*/
|
||||
public int $maxRdyCount,
|
||||
|
||||
/**
|
||||
* The effective message timeout.
|
||||
*/
|
||||
// The effective message timeout.
|
||||
public int $msgTimeout,
|
||||
|
||||
/**
|
||||
* The size in bytes of the buffer nsqd will use when writing to this client.
|
||||
*/
|
||||
// The size in bytes of the buffer nsqd will use when writing to this client.
|
||||
public int $outputBufferSize,
|
||||
|
||||
/**
|
||||
* The timeout after which any data that nsqd has buffered will be flushed to this client.
|
||||
*/
|
||||
// The timeout after which any data that nsqd has buffered will be flushed to this client.
|
||||
public int $outputBufferTimeout,
|
||||
|
||||
/**
|
||||
/*
|
||||
* The sample rate for incoming data to deliver a percentage of all messages received to this connection.
|
||||
* This only applies to subscribing connections. The valid range is between 0 and 99, where 0 means that all
|
||||
* data is sent (this is the default). 1 means that 1% of the data is sent.
|
||||
*/
|
||||
public int $sampleRate,
|
||||
|
||||
/**
|
||||
* Whether snappy compression is enabled for this connection or not.
|
||||
*/
|
||||
// Whether snappy compression is enabled for this connection or not.
|
||||
public bool $snappy,
|
||||
|
||||
/**
|
||||
* Whether TLS is enabled for this connection or not.
|
||||
*/
|
||||
// Whether TLS is enabled for this connection or not.
|
||||
public bool $tls,
|
||||
|
||||
/**
|
||||
* The nsqd version.
|
||||
*/
|
||||
// The nsqd version.
|
||||
public string $version,
|
||||
) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @phpstan-ignore-next-line
|
||||
*/
|
||||
public static function fromArray(array $array): self
|
||||
{
|
||||
return new self(
|
@ -1,13 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Config;
|
||||
|
||||
final class LookupConfig
|
||||
{
|
||||
public function __construct(
|
||||
public int $pollingInterval = 10000,
|
||||
) {
|
||||
}
|
||||
}
|
@ -4,266 +4,340 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\ByteStream\ClosedException;
|
||||
use Amp\Failure;
|
||||
use Amp\Promise;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Config\ServerConfig;
|
||||
use Nsq\Config\ConnectionConfig;
|
||||
use Nsq\Exception\AuthenticationRequired;
|
||||
use Nsq\Exception\NsqException;
|
||||
use Nsq\Frame\Response;
|
||||
use Nsq\Stream\GzipStream;
|
||||
use Nsq\Stream\NullStream;
|
||||
use Nsq\Stream\SnappyStream;
|
||||
use Nsq\Stream\SocketStream;
|
||||
use Nsq\Exception\ConnectionFail;
|
||||
use Nsq\Exception\UnexpectedResponse;
|
||||
use Nsq\Reconnect\ExponentialStrategy;
|
||||
use Nsq\Reconnect\ReconnectStrategy;
|
||||
use PHPinnacle\Buffer\ByteBuffer;
|
||||
use Psr\Log\LoggerAwareTrait;
|
||||
use Psr\Log\LoggerInterface;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\call;
|
||||
use Psr\Log\NullLogger;
|
||||
use Socket\Raw\Exception;
|
||||
use Socket\Raw\Factory;
|
||||
use Socket\Raw\Socket;
|
||||
use function addcslashes;
|
||||
use function hash;
|
||||
use function http_build_query;
|
||||
use function implode;
|
||||
use function json_encode;
|
||||
use function pack;
|
||||
use function snappy_compress;
|
||||
use function unpack;
|
||||
use const JSON_FORCE_OBJECT;
|
||||
use const JSON_THROW_ON_ERROR;
|
||||
use const PHP_EOL;
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*
|
||||
* @property ConnectionConfig $connectionConfig
|
||||
*/
|
||||
abstract class Connection
|
||||
{
|
||||
private Stream $stream;
|
||||
use LoggerAwareTrait;
|
||||
|
||||
/**
|
||||
* @var callable
|
||||
*/
|
||||
private $onConnectCallback;
|
||||
private string $address;
|
||||
|
||||
/**
|
||||
* @var callable
|
||||
*/
|
||||
private $onCloseCallback;
|
||||
private ?Socket $socket = null;
|
||||
|
||||
private ReconnectStrategy $reconnect;
|
||||
|
||||
private ClientConfig $clientConfig;
|
||||
|
||||
private ?ConnectionConfig $connectionConfig = null;
|
||||
|
||||
public function __construct(
|
||||
/**
|
||||
* @readonly
|
||||
*/
|
||||
public string $address,
|
||||
protected ClientConfig $clientConfig,
|
||||
protected LoggerInterface $logger,
|
||||
string $address,
|
||||
ClientConfig $clientConfig = null,
|
||||
ReconnectStrategy $reconnectStrategy = null,
|
||||
LoggerInterface $logger = null,
|
||||
) {
|
||||
$this->stream = new NullStream();
|
||||
$this->onConnectCallback = static function (): void {
|
||||
};
|
||||
$this->onCloseCallback = static function (): void {
|
||||
};
|
||||
$this->address = $address;
|
||||
|
||||
$this->logger = $logger ?? new NullLogger();
|
||||
$this->reconnect = $reconnectStrategy ?? new ExponentialStrategy(logger: $this->logger);
|
||||
$this->clientConfig = $clientConfig ?? new ClientConfig();
|
||||
}
|
||||
|
||||
public function __destruct()
|
||||
public function connect(): void
|
||||
{
|
||||
$this->close(false);
|
||||
}
|
||||
$this->reconnect->connect(function (): void {
|
||||
try {
|
||||
$this->socket = (new Factory())->createClient($this->address);
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->logger->error('Connecting to {address} failed.', ['address' => $this->address]);
|
||||
|
||||
public function isConnected(): bool
|
||||
{
|
||||
return !$this->stream instanceof NullStream;
|
||||
}
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
*/
|
||||
public function connect(): Promise
|
||||
{
|
||||
return call(function (): \Generator {
|
||||
$buffer = new Buffer();
|
||||
$this->socket->write(' V2');
|
||||
|
||||
/** @var SocketStream $stream */
|
||||
$stream = yield SocketStream::connect(
|
||||
$this->address,
|
||||
$this->clientConfig->connectTimeout,
|
||||
$this->clientConfig->maxAttempts,
|
||||
$this->clientConfig->tcpNoDelay,
|
||||
);
|
||||
$body = json_encode($this->clientConfig, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
||||
|
||||
yield $stream->write(Command::magic());
|
||||
yield $stream->write(Command::identify($this->clientConfig->asNegotiationPayload()));
|
||||
$response = $this->command('IDENTIFY', data: $body)->response();
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->response($stream, $buffer);
|
||||
$serverConfig = ServerConfig::fromArray($response->toArray());
|
||||
$this->connectionConfig = ConnectionConfig::fromArray($response->toArray());
|
||||
|
||||
if ($serverConfig->tls) {
|
||||
yield $stream->setupTls();
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->response($stream, $buffer);
|
||||
|
||||
if (!$response->isOk()) {
|
||||
throw new NsqException();
|
||||
}
|
||||
if ($this->connectionConfig->snappy || $this->connectionConfig->deflate) {
|
||||
$this->response()->okOrFail();
|
||||
}
|
||||
|
||||
if ($serverConfig->snappy) {
|
||||
$stream = new SnappyStream($stream, $buffer->flush());
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->response($stream, $buffer);
|
||||
|
||||
if (!$response->isOk()) {
|
||||
throw new NsqException();
|
||||
}
|
||||
}
|
||||
|
||||
if ($serverConfig->deflate) {
|
||||
$stream = new GzipStream($stream, $serverConfig->deflateLevel, $buffer->flush());
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->response($stream, $buffer);
|
||||
|
||||
if (!$response->isOk()) {
|
||||
throw new NsqException();
|
||||
}
|
||||
}
|
||||
|
||||
if ($serverConfig->authRequired) {
|
||||
if ($this->connectionConfig->authRequired) {
|
||||
if (null === $this->clientConfig->authSecret) {
|
||||
throw new AuthenticationRequired();
|
||||
throw new AuthenticationRequired('NSQ requires authorization, set ClientConfig::$authSecret before connecting');
|
||||
}
|
||||
|
||||
yield $stream->write(Command::auth($this->clientConfig->authSecret));
|
||||
$authResponse = $this->command('AUTH', data: $this->clientConfig->authSecret)->response()->toArray();
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->response($stream, $buffer);
|
||||
|
||||
$this->logger->info('Authorization response: '.http_build_query($response->toArray()));
|
||||
$this->logger->info('Authorization response: '.http_build_query($authResponse));
|
||||
}
|
||||
|
||||
$this->stream = $stream;
|
||||
|
||||
($this->onConnectCallback)();
|
||||
});
|
||||
}
|
||||
|
||||
public function close(bool $graceful = true): void
|
||||
/**
|
||||
* Cleanly close your connection (no more messages are sent).
|
||||
*/
|
||||
public function disconnect(): void
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return;
|
||||
}
|
||||
|
||||
$logger = $this->logger;
|
||||
[$stream, $this->stream] = [$this->stream, new NullStream()];
|
||||
|
||||
if ($graceful) {
|
||||
$this->logger->debug('Graceful disconnect.', [
|
||||
'class' => static::class,
|
||||
'address' => $this->address,
|
||||
]);
|
||||
|
||||
asyncCall(static function () use ($stream, $logger): \Generator {
|
||||
try {
|
||||
yield $stream->write(Command::cls());
|
||||
} catch (\Throwable $e) {
|
||||
$logger->warning($e->getMessage(), ['exception' => $e]);
|
||||
}
|
||||
|
||||
$stream->close();
|
||||
});
|
||||
|
||||
if (null === $this->socket) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$stream->close();
|
||||
} catch (ClosedException) {
|
||||
$this->socket->write('CLS'.PHP_EOL);
|
||||
$this->socket->close();
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->logger->debug($e->getMessage(), ['exception' => $e]);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
|
||||
($this->onCloseCallback)();
|
||||
$this->socket = null;
|
||||
$this->connectionConfig = null;
|
||||
}
|
||||
|
||||
public function onConnect(callable $callback): static
|
||||
public function isReady(): bool
|
||||
{
|
||||
$previous = $this->onConnectCallback;
|
||||
$this->onConnectCallback = static function () use ($previous, $callback): void {
|
||||
$previous();
|
||||
$callback();
|
||||
};
|
||||
return null !== $this->socket;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, int|string>|string $params
|
||||
*/
|
||||
protected function command(string $command, array | string $params = [], string $data = null): self
|
||||
{
|
||||
$socket = $this->socket();
|
||||
|
||||
$buffer = [] === $params ? $command : implode(' ', [$command, ...((array) $params)]);
|
||||
$buffer .= PHP_EOL;
|
||||
|
||||
if (null !== $data) {
|
||||
$buffer .= pack('N', \strlen($data));
|
||||
$buffer .= $data;
|
||||
}
|
||||
|
||||
$this->logger->debug('Prepare send uncompressed buffer: {bytes}', ['bytes' => addcslashes($buffer, PHP_EOL)]);
|
||||
|
||||
if ($this->connectionConfig?->snappy) {
|
||||
$identifierFrame = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
|
||||
$compressedFrame = 0x00;
|
||||
$uncompressedFrame = 0x01;
|
||||
|
||||
$chunk = snappy_compress($buffer);
|
||||
[$chunk, $compressFrame] = match (\strlen($chunk) < \strlen($buffer)) {
|
||||
true => [$chunk, $compressedFrame],
|
||||
false => [$buffer, $uncompressedFrame],
|
||||
};
|
||||
|
||||
$size = \strlen($chunk) + 4;
|
||||
|
||||
$buffer = new ByteBuffer();
|
||||
foreach ([...$identifierFrame, $compressFrame, $size, $size >> 8, $size >> 16] as $byte) {
|
||||
$buffer->appendUint8($byte);
|
||||
}
|
||||
|
||||
$crc32c = hash('crc32c', $data, true);
|
||||
$crc32c = unpack('V', $crc32c)[1];
|
||||
|
||||
$unsignedRightShift = static function ($a, $b) {
|
||||
if ($b >= 32 || $b < -32) {
|
||||
$m = (int) ($b / 32);
|
||||
$b -= ($m * 32);
|
||||
}
|
||||
|
||||
if ($b < 0) {
|
||||
$b = 32 + $b;
|
||||
}
|
||||
|
||||
if (0 === $b) {
|
||||
return (($a >> 1) & 0x7fffffff) * 2 + (($a >> $b) & 1);
|
||||
}
|
||||
|
||||
if ($a < 0) {
|
||||
$a >>= 1;
|
||||
$a &= 2147483647;
|
||||
$a |= 0x40000000;
|
||||
$a >>= ($b - 1);
|
||||
} else {
|
||||
$a >>= $b;
|
||||
}
|
||||
|
||||
return $a;
|
||||
};
|
||||
$checksum = $unsignedRightShift((($crc32c >> 15) | ($crc32c << 17)) + 0xa282ead8, 0);
|
||||
|
||||
$buffer->appendUint32($checksum);
|
||||
$buffer->append($chunk);
|
||||
|
||||
$buffer = $buffer->bytes();
|
||||
}
|
||||
|
||||
$this->logger->debug('Prepare send compressed buffer: {bytes}', ['bytes' => addcslashes($buffer, PHP_EOL)]);
|
||||
|
||||
try {
|
||||
$socket->write($buffer);
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->disconnect();
|
||||
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function onClose(callable $callback): static
|
||||
public function hasMessage(float $timeout = 0): bool
|
||||
{
|
||||
$previous = $this->onCloseCallback;
|
||||
$this->onCloseCallback = static function () use ($previous, $callback): void {
|
||||
$previous();
|
||||
$callback();
|
||||
};
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<null|string>
|
||||
*/
|
||||
protected function read(): Promise
|
||||
{
|
||||
return call(function (): \Generator {
|
||||
try {
|
||||
return yield $this->stream->read();
|
||||
} catch (\Throwable $e) {
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
$this->close(false);
|
||||
|
||||
return new Failure($e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
*/
|
||||
protected function write(string $data): Promise
|
||||
{
|
||||
return call(function () use ($data): \Generator {
|
||||
try {
|
||||
return yield $this->stream->write($data);
|
||||
} catch (\Throwable $e) {
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
$this->close(false);
|
||||
|
||||
return new Failure($e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected function handleError(Frame\Error $error): void
|
||||
{
|
||||
$this->logger->error($error->data);
|
||||
|
||||
if (ErrorType::terminable($error)) {
|
||||
$this->close();
|
||||
|
||||
throw $error->toException();
|
||||
try {
|
||||
return false !== $this->socket()->selectRead($timeout);
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->disconnect();
|
||||
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<Frame\Response>
|
||||
*/
|
||||
private function response(Stream $stream, Buffer $buffer): Promise
|
||||
public function receive(float $timeout = null): ?Response
|
||||
{
|
||||
return call(function () use ($stream, $buffer): \Generator {
|
||||
while (true) {
|
||||
$response = Parser::parse($buffer);
|
||||
$socket = $this->socket();
|
||||
|
||||
$timeout ??= $this->clientConfig->readTimeout;
|
||||
$deadline = microtime(true) + $timeout;
|
||||
|
||||
if (!$this->hasMessage($timeout)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
$size = $socket->read(Bytes::BYTES_SIZE);
|
||||
|
||||
if ('' === $size) {
|
||||
$this->disconnect();
|
||||
|
||||
throw new ConnectionFail('Probably connection lost');
|
||||
}
|
||||
|
||||
if ($this->connectionConfig?->snappy) {
|
||||
$buffer = new ByteBuffer();
|
||||
$snappyBuffer = new ByteBuffer($size);
|
||||
while (true) {
|
||||
$typeByte = \ord($snappyBuffer->consume(1));
|
||||
|
||||
$size = \ord($snappyBuffer->consume(1)) + (\ord($snappyBuffer->consume(1)) << 8) + (\ord($snappyBuffer->consume(1)) << 16);
|
||||
$type = match ($typeByte) {
|
||||
0xff => 'identifier',
|
||||
0x00 => 'compressed',
|
||||
0x01 => 'uncompressed',
|
||||
0xfe => 'padding',
|
||||
};
|
||||
|
||||
$this->logger->debug('Received snappy chunk: {type}, size: {size}', [
|
||||
'type' => $type,
|
||||
'size' => $size,
|
||||
]);
|
||||
|
||||
switch ($typeByte) {
|
||||
case 0xff: // 'identifier',
|
||||
$socket->read($size);
|
||||
$snappyBuffer->append($socket->read(4));
|
||||
|
||||
continue 2;
|
||||
case 0x00: // 'compressed',
|
||||
case 0x01: // 'uncompressed',
|
||||
$uncompressed = $socket->read($size);
|
||||
|
||||
$this->logger->debug('Received uncompressed bytes: {bytes}', ['bytes' => $uncompressed]);
|
||||
$buffer->append($uncompressed);
|
||||
$buffer->consume(4); // slice snappy prefix
|
||||
$buffer->consumeUint32(); // slice size
|
||||
|
||||
break 2;
|
||||
case 0xfe:// 'padding',
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$this->logger->debug('Size bytes received: "{bytes}"', ['bytes' => $size]);
|
||||
|
||||
$buffer = new ByteBuffer($size);
|
||||
|
||||
$size = $buffer->consumeUint32();
|
||||
|
||||
do {
|
||||
$chunk = $socket->read($size);
|
||||
|
||||
if (null === $response && null !== ($chunk = yield $stream->read())) {
|
||||
$buffer->append($chunk);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!$response instanceof Frame\Response) {
|
||||
throw new NsqException();
|
||||
}
|
||||
|
||||
return $response;
|
||||
$size -= \strlen($chunk);
|
||||
} while (0 < $size);
|
||||
}
|
||||
});
|
||||
|
||||
$this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL));
|
||||
|
||||
$response = new Response($buffer);
|
||||
|
||||
if ($response->isHeartBeat()) {
|
||||
$this->command('NOP');
|
||||
|
||||
return $this->receive(
|
||||
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
|
||||
);
|
||||
}
|
||||
}
|
||||
// @codeCoverageIgnoreStart
|
||||
catch (Exception $e) {
|
||||
$this->disconnect();
|
||||
|
||||
throw ConnectionFail::fromThrowable($e);
|
||||
}
|
||||
// @codeCoverageIgnoreEnd
|
||||
|
||||
return $response;
|
||||
}
|
||||
|
||||
protected function response(): Response
|
||||
{
|
||||
return $this->receive() ?? throw UnexpectedResponse::null();
|
||||
}
|
||||
|
||||
private function socket(): Socket
|
||||
{
|
||||
if (null === $this->socket) {
|
||||
$this->connect();
|
||||
}
|
||||
|
||||
return $this->socket ?? throw new ConnectionFail('This connection is closed, create new one.');
|
||||
}
|
||||
}
|
||||
|
223
src/Consumer.php
223
src/Consumer.php
@ -4,245 +4,60 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\Failure;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Exception\ConsumerException;
|
||||
use Nsq\Frame\Response;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\call;
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
final class Consumer extends Connection
|
||||
{
|
||||
private int $rdy = 0;
|
||||
|
||||
/**
|
||||
* @var callable
|
||||
* Subscribe to a topic/channel.
|
||||
*/
|
||||
private $onMessage;
|
||||
|
||||
public function __construct(
|
||||
string $address,
|
||||
/**
|
||||
* @readonly
|
||||
*/
|
||||
public string $topic,
|
||||
/**
|
||||
* @readonly
|
||||
*/
|
||||
public string $channel,
|
||||
callable $onMessage,
|
||||
ClientConfig $clientConfig,
|
||||
LoggerInterface $logger,
|
||||
) {
|
||||
parent::__construct(
|
||||
$address,
|
||||
$clientConfig,
|
||||
$logger,
|
||||
);
|
||||
|
||||
$this->onMessage = $onMessage;
|
||||
|
||||
$context = compact('address', 'topic', 'channel');
|
||||
$this->onConnect(function () use ($context): void {
|
||||
$this->logger->debug('Consumer connected.', $context);
|
||||
});
|
||||
$this->onClose(function () use ($context): void {
|
||||
$this->logger->debug('Consumer disconnected.', $context);
|
||||
});
|
||||
$this->logger->debug('Consumer created.', $context);
|
||||
}
|
||||
|
||||
public static function create(
|
||||
string $address,
|
||||
string $topic,
|
||||
string $channel,
|
||||
callable $onMessage,
|
||||
?ClientConfig $clientConfig = null,
|
||||
?LoggerInterface $logger = null,
|
||||
): self {
|
||||
return new self(
|
||||
$address,
|
||||
$topic,
|
||||
$channel,
|
||||
$onMessage,
|
||||
$clientConfig ?? new ClientConfig(),
|
||||
$logger ?? new NullLogger(),
|
||||
);
|
||||
}
|
||||
|
||||
public function connect(): Promise
|
||||
public function sub(string $topic, string $channel): void
|
||||
{
|
||||
if ($this->isConnected()) {
|
||||
return call(static function (): void {
|
||||
});
|
||||
}
|
||||
|
||||
return call(function (): \Generator {
|
||||
yield parent::connect();
|
||||
|
||||
$buffer = new Buffer();
|
||||
|
||||
asyncCall(function () use ($buffer): \Generator {
|
||||
yield $this->write(Command::sub($this->topic, $this->channel));
|
||||
|
||||
if (null !== ($chunk = yield $this->read())) {
|
||||
$buffer->append($chunk);
|
||||
}
|
||||
|
||||
/** @var Response $response */
|
||||
$response = Parser::parse($buffer);
|
||||
|
||||
if (!$response->isOk()) {
|
||||
return new Failure(new ConsumerException('Fail subscription.'));
|
||||
}
|
||||
|
||||
yield $this->rdy(1);
|
||||
|
||||
/** @phpstan-ignore-next-line */
|
||||
asyncCall(function () use ($buffer): \Generator {
|
||||
while (null !== $chunk = yield $this->read()) {
|
||||
$buffer->append($chunk);
|
||||
|
||||
while ($frame = Parser::parse($buffer)) {
|
||||
switch (true) {
|
||||
case $frame instanceof Frame\Response:
|
||||
if ($frame->isHeartBeat()) {
|
||||
yield $this->write(Command::nop());
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
throw ConsumerException::response($frame);
|
||||
case $frame instanceof Frame\Error:
|
||||
$this->handleError($frame);
|
||||
|
||||
break;
|
||||
case $frame instanceof Frame\Message:
|
||||
asyncCall($this->onMessage, Message::compose($frame, $this));
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if ($this->rdy !== $this->clientConfig->rdyCount) {
|
||||
yield $this->rdy($this->clientConfig->rdyCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$this->close(false);
|
||||
});
|
||||
});
|
||||
});
|
||||
$this->command('SUB', [$topic, $channel])->response()->okOrFail();
|
||||
}
|
||||
|
||||
/**
|
||||
* Update RDY state (indicate you are ready to receive N messages).
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
*/
|
||||
public function rdy(int $count): Promise
|
||||
public function rdy(int $count): void
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
if ($this->rdy === $count) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ($this->rdy === $count) {
|
||||
return new Success(true);
|
||||
}
|
||||
$this->command('RDY', (string) $count);
|
||||
|
||||
$this->rdy = $count;
|
||||
|
||||
return call(function () use ($count): \Generator {
|
||||
try {
|
||||
yield $this->write(Command::rdy($count));
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish a message (indicate successful processing).
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
public function fin(string $id): Promise
|
||||
public function fin(string $id): void
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
}
|
||||
$this->command('FIN', $id);
|
||||
|
||||
return call(function () use ($id): \Generator {
|
||||
try {
|
||||
yield $this->write(Command::fin($id));
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
--$this->rdy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Re-queue a message (indicate failure to process) The re-queued message is placed at the tail of the queue,
|
||||
* equivalent to having just published it, but for various implementation specific reasons that behavior should not
|
||||
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
|
||||
* behaves identically to an explicit REQ.
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
*
|
||||
* @internal
|
||||
* Re-queue a message (indicate failure to process)
|
||||
* The re-queued message is placed at the tail of the queue, equivalent to having just published it,
|
||||
* but for various implementation specific reasons that behavior should not be explicitly relied upon and may change in the future.
|
||||
* Similarly, a message that is in-flight and times out behaves identically to an explicit REQ.
|
||||
*/
|
||||
public function req(string $id, int $timeout): Promise
|
||||
public function req(string $id, int $timeout): void
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
}
|
||||
$this->command('REQ', [$id, $timeout]);
|
||||
|
||||
return call(function () use ($id, $timeout): \Generator {
|
||||
try {
|
||||
yield $this->write(Command::req($id, $timeout));
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
--$this->rdy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the timeout for an in-flight message.
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
public function touch(string $id): Promise
|
||||
public function touch(string $id): void
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
}
|
||||
|
||||
return call(function () use ($id): \Generator {
|
||||
try {
|
||||
yield $this->write(Command::touch($id));
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
$this->command('TOUCH', $id);
|
||||
}
|
||||
}
|
||||
|
@ -1,99 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
/**
|
||||
* @psalm-immutable
|
||||
*/
|
||||
final class ErrorType
|
||||
{
|
||||
/**
|
||||
* A generic error type without any more hints.
|
||||
*/
|
||||
public const E_INVALID = true;
|
||||
/**
|
||||
* This error might be returned during multiple occasions. It can be returned for IDENTIFY, AUTH or MPUB messages.
|
||||
* It is caused for payloads that do not meet certain requirements. For IDENTIFY and AUTH, this is usually a bug in
|
||||
* the library and should be reported. For MPUB, this error can occur if the payload is larger than the maximum
|
||||
* payload size specified in the nsqd config.
|
||||
*/
|
||||
public const E_BAD_BODY = true;
|
||||
/**
|
||||
* This error indicates that the topic sent to nsqd is not valid.
|
||||
*/
|
||||
public const E_BAD_TOPIC = true;
|
||||
/**
|
||||
* This error indicates that the channel sent to nsqd is not valid.
|
||||
*/
|
||||
public const E_BAD_CHANNEL = true;
|
||||
/**
|
||||
* This error is returned by nsqd if the message in the payload of a publishing operation does not meet the
|
||||
* requirements of the server. This might be caused by too big payloads being sent to nsqd. You should consider
|
||||
* adding a limit to the payload size or increasing it in the nsqd config.
|
||||
*/
|
||||
public const E_BAD_MESSAGE = true;
|
||||
/**
|
||||
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
|
||||
* temporary error and can be caused by topics being added, deleted or cleared.
|
||||
*/
|
||||
public const E_PUB_FAILED = true;
|
||||
/**
|
||||
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
|
||||
* temporary error and can be caused by topics being added, deleted or cleared.
|
||||
*/
|
||||
public const E_MPUB_FAILED = true;
|
||||
/**
|
||||
* This error may happen if a error condition is met after validating the input on the nsqd side. This is usually a
|
||||
* temporary error and can be caused by topics being added, deleted or cleared.
|
||||
*/
|
||||
public const E_DPUB_FAILED = true;
|
||||
/**
|
||||
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
|
||||
* happen in particular for messages that are no longer queued on the server side.
|
||||
*/
|
||||
public const E_FIN_FAILED = false;
|
||||
/**
|
||||
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
|
||||
* happen in particular for messages that are no longer queued on the server side.
|
||||
*/
|
||||
public const E_REQ_FAILED = false;
|
||||
/**
|
||||
* This error may happen if a error condition is met after validating the input on the nsqd side. This can
|
||||
* happen in particular for messages that are no longer queued on the server side.
|
||||
*/
|
||||
public const E_TOUCH_FAILED = false;
|
||||
/**
|
||||
* This error indicates that the authorization of the client failed on the server side. This might be related
|
||||
* to connection issues to the authorization server. Depending on the authorization server implementation, this
|
||||
* might also indicate that the given auth secret in the [ClientConfig] is not known on the server or the server
|
||||
* denied authentication with the current connection properties (i.e. TLS status and IP).
|
||||
*/
|
||||
public const E_AUTH_FAILED = true;
|
||||
/**
|
||||
* This error happens if something breaks on the nsqd side while performing the authorization. This might be
|
||||
* caused by bugs in nsqd, the authorization server or network issues.
|
||||
*/
|
||||
public const E_AUTH_ERROR = true;
|
||||
/**
|
||||
* This error is sent by nsqd if the client attempts an authentication, but the server does not support it. This
|
||||
* should never happen using this library as authorization requests are only sent if the server supports it.
|
||||
* It is safe to expect that this error is never thrown.
|
||||
*/
|
||||
public const E_AUTH_DISABLED = true;
|
||||
/**
|
||||
* This error indicates that the client related to the authorization secret set in the [ClientConfig] is not
|
||||
* allowed to do the operation it tried to do.
|
||||
*/
|
||||
public const E_UNAUTHORIZED = true;
|
||||
|
||||
public static function terminable(Frame\Error $error): bool
|
||||
{
|
||||
$type = explode(' ', $error->data)[0];
|
||||
|
||||
$constant = 'self::'.$type;
|
||||
|
||||
return \defined($constant) ? \constant($constant) : self::E_INVALID;
|
||||
}
|
||||
}
|
@ -4,10 +4,8 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
final class AuthenticationRequired extends NsqException
|
||||
use RuntimeException;
|
||||
|
||||
final class AuthenticationRequired extends RuntimeException implements NsqException
|
||||
{
|
||||
public function __construct()
|
||||
{
|
||||
parent::__construct('NSQ requires authorization, set ClientConfig::$authSecret before connecting');
|
||||
}
|
||||
}
|
||||
|
19
src/Exception/ConnectionFail.php
Normal file
19
src/Exception/ConnectionFail.php
Normal file
@ -0,0 +1,19 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use RuntimeException;
|
||||
use Throwable;
|
||||
|
||||
final class ConnectionFail extends RuntimeException implements NsqException
|
||||
{
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
public static function fromThrowable(Throwable $throwable): self
|
||||
{
|
||||
return new self($throwable->getMessage(), (int) $throwable->getCode(), $throwable);
|
||||
}
|
||||
}
|
@ -1,15 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use Nsq\Frame\Response;
|
||||
|
||||
final class ConsumerException extends NsqException
|
||||
{
|
||||
public static function response(Response $response): self
|
||||
{
|
||||
return new self(sprintf('Consumer receive response "%s" from nsq, which not expected. ', $response->data));
|
||||
}
|
||||
}
|
@ -1,18 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use Psr\Log\LogLevel;
|
||||
|
||||
final class LookupException extends NsqException
|
||||
{
|
||||
public function level(): string
|
||||
{
|
||||
return match ($this->getMessage()) {
|
||||
'TOPIC_NOT_FOUND' => LogLevel::DEBUG,
|
||||
default => LogLevel::WARNING,
|
||||
};
|
||||
}
|
||||
}
|
26
src/Exception/MessageAlreadyFinished.php
Normal file
26
src/Exception/MessageAlreadyFinished.php
Normal file
@ -0,0 +1,26 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use Nsq\Message;
|
||||
use RuntimeException;
|
||||
|
||||
final class MessageAlreadyFinished extends RuntimeException implements NsqException
|
||||
{
|
||||
public static function finish(Message $message): self
|
||||
{
|
||||
return new self('Can\'t finish message as it already finished.');
|
||||
}
|
||||
|
||||
public static function requeue(Message $message): self
|
||||
{
|
||||
return new self('Can\'t requeue message as it already finished.');
|
||||
}
|
||||
|
||||
public static function touch(Message $message): self
|
||||
{
|
||||
return new self('Can\'t touch message as it already finished.');
|
||||
}
|
||||
}
|
@ -1,15 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use Nsq\Message;
|
||||
|
||||
final class MessageException extends NsqException
|
||||
{
|
||||
public static function processed(Message $message): self
|
||||
{
|
||||
return new self(sprintf('Message "%s" already processed.', $message->id));
|
||||
}
|
||||
}
|
11
src/Exception/NsqError.php
Normal file
11
src/Exception/NsqError.php
Normal file
@ -0,0 +1,11 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use RuntimeException;
|
||||
|
||||
final class NsqError extends RuntimeException implements NsqException
|
||||
{
|
||||
}
|
@ -4,6 +4,8 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
class NsqException extends \RuntimeException
|
||||
use Throwable;
|
||||
|
||||
interface NsqException extends Throwable
|
||||
{
|
||||
}
|
||||
|
@ -1,9 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
final class ServerException extends NsqException
|
||||
{
|
||||
}
|
@ -1,18 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
final class SnappyException extends NsqException
|
||||
{
|
||||
public static function notInstalled(): self
|
||||
{
|
||||
return new self('Snappy extension not installed.');
|
||||
}
|
||||
|
||||
public static function invalidHeader(): self
|
||||
{
|
||||
return new self('Invalid snappy protocol header.');
|
||||
}
|
||||
}
|
@ -1,9 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
final class StreamException extends NsqException
|
||||
{
|
||||
}
|
18
src/Exception/UnexpectedResponse.php
Normal file
18
src/Exception/UnexpectedResponse.php
Normal file
@ -0,0 +1,18 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Exception;
|
||||
|
||||
use RuntimeException;
|
||||
|
||||
final class UnexpectedResponse extends RuntimeException implements NsqException
|
||||
{
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
public static function null(): self
|
||||
{
|
||||
return new self('Response was expected, but null received.');
|
||||
}
|
||||
}
|
@ -1,31 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
abstract class Frame
|
||||
{
|
||||
public const TYPE_RESPONSE = 0;
|
||||
public const TYPE_ERROR = 1;
|
||||
public const TYPE_MESSAGE = 2;
|
||||
|
||||
public function __construct(public int $type)
|
||||
{
|
||||
}
|
||||
|
||||
public function response(): bool
|
||||
{
|
||||
return self::TYPE_RESPONSE === $this->type;
|
||||
}
|
||||
|
||||
public function error(): bool
|
||||
{
|
||||
return self::TYPE_ERROR === $this->type;
|
||||
}
|
||||
|
||||
public function message(): bool
|
||||
{
|
||||
return self::TYPE_MESSAGE === $this->type;
|
||||
}
|
||||
}
|
@ -1,24 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Frame;
|
||||
|
||||
use Nsq\Exception\ServerException;
|
||||
use Nsq\Frame;
|
||||
|
||||
/**
|
||||
* @psalm-immutable
|
||||
*/
|
||||
final class Error extends Frame
|
||||
{
|
||||
public function __construct(public string $data)
|
||||
{
|
||||
parent::__construct(self::TYPE_ERROR);
|
||||
}
|
||||
|
||||
public function toException(): ServerException
|
||||
{
|
||||
return new ServerException($this->data);
|
||||
}
|
||||
}
|
@ -1,19 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Frame;
|
||||
|
||||
use Nsq\Frame;
|
||||
|
||||
final class Message extends Frame
|
||||
{
|
||||
public function __construct(
|
||||
public int $timestamp,
|
||||
public int $attempts,
|
||||
public string $id,
|
||||
public string $body,
|
||||
) {
|
||||
parent::__construct(self::TYPE_MESSAGE);
|
||||
}
|
||||
}
|
@ -1,39 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Frame;
|
||||
|
||||
use Nsq\Frame;
|
||||
|
||||
/**
|
||||
* @psalm-immutable
|
||||
*/
|
||||
final class Response extends Frame
|
||||
{
|
||||
public const OK = 'OK';
|
||||
public const HEARTBEAT = '_heartbeat_';
|
||||
|
||||
public function __construct(public string $data)
|
||||
{
|
||||
parent::__construct(self::TYPE_RESPONSE);
|
||||
}
|
||||
|
||||
public function isOk(): bool
|
||||
{
|
||||
return self::OK === $this->data;
|
||||
}
|
||||
|
||||
public function isHeartBeat(): bool
|
||||
{
|
||||
return self::HEARTBEAT === $this->data;
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return array<mixed, mixed>
|
||||
*/
|
||||
public function toArray(): array
|
||||
{
|
||||
return json_decode($this->data, true, flags: JSON_THROW_ON_ERROR);
|
||||
}
|
||||
}
|
274
src/Lookup.php
274
src/Lookup.php
@ -1,274 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\Deferred;
|
||||
use Amp\Dns\DnsException;
|
||||
use Amp\Http\Client\DelegateHttpClient;
|
||||
use Amp\Http\Client\HttpClientBuilder;
|
||||
use Amp\Http\Client\Request;
|
||||
use Amp\Http\Client\Response;
|
||||
use Amp\NullCancellationToken;
|
||||
use Amp\Promise;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Config\LookupConfig;
|
||||
use Nsq\Exception\LookupException;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\call;
|
||||
use function Amp\delay;
|
||||
|
||||
final class Lookup
|
||||
{
|
||||
/**
|
||||
* @psalm-var array<string, array<string, Lookup\Producer>>
|
||||
*/
|
||||
private array $producers = [];
|
||||
|
||||
private array $consumers = [];
|
||||
|
||||
private array $running = [];
|
||||
|
||||
private array $topicWatchers = [];
|
||||
|
||||
public function __construct(
|
||||
private array $addresses,
|
||||
private LookupConfig $config,
|
||||
private LoggerInterface $logger,
|
||||
private DelegateHttpClient $httpClient,
|
||||
) {
|
||||
}
|
||||
|
||||
public static function create(
|
||||
string | array $address,
|
||||
LookupConfig $config = null,
|
||||
LoggerInterface $logger = null,
|
||||
DelegateHttpClient $httpClient = null,
|
||||
): self {
|
||||
return new self(
|
||||
(array) $address,
|
||||
$config ?? new LookupConfig(),
|
||||
$logger ?? new NullLogger(),
|
||||
$httpClient ?? HttpClientBuilder::buildDefault(),
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<Lookup\Producer[]>
|
||||
*/
|
||||
public function nodes(): Promise
|
||||
{
|
||||
return call(function (): \Generator {
|
||||
$requestHandler = function (string $uri): \Generator {
|
||||
/** @var Response $response */
|
||||
$response = yield $this->httpClient->request(new Request($uri.'/nodes'), new NullCancellationToken());
|
||||
|
||||
try {
|
||||
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
|
||||
} catch (LookupException $e) {
|
||||
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
|
||||
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
$promises = [];
|
||||
foreach ($this->addresses as $address) {
|
||||
$promises[$address] = call($requestHandler, $address);
|
||||
}
|
||||
|
||||
$nodes = [];
|
||||
/** @var Lookup\Response $response */
|
||||
foreach (yield $promises as $response) {
|
||||
foreach ($response->producers as $producer) {
|
||||
$nodes[$producer->toTcpUri()] = $producer;
|
||||
}
|
||||
}
|
||||
|
||||
return array_values($nodes);
|
||||
});
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
$this->producers = [];
|
||||
$this->consumers = [];
|
||||
$this->running = [];
|
||||
$this->topicWatchers = [];
|
||||
|
||||
$this->logger->info('Lookup stopped.');
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-suppress InvalidPropertyAssignmentValue
|
||||
*/
|
||||
public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void
|
||||
{
|
||||
if (null !== ($this->running[$topic][$channel] ?? null)) {
|
||||
throw new \InvalidArgumentException('Subscription already exists.');
|
||||
}
|
||||
|
||||
$this->running[$topic][$channel] = true;
|
||||
|
||||
asyncCall(function () use ($topic, $channel, $onMessage, $config): \Generator {
|
||||
while (true) {
|
||||
if (null === ($this->running[$topic][$channel] ?? null)) {
|
||||
return;
|
||||
}
|
||||
|
||||
/** @phpstan-ignore-next-line */
|
||||
$producers = $this->producers[$topic] ??= new Deferred();
|
||||
|
||||
if ($producers instanceof Deferred) {
|
||||
/** @var array<string, Lookup\Producer> $producers */
|
||||
$producers = yield $producers->promise();
|
||||
}
|
||||
|
||||
foreach (array_diff_key($this->consumers, $producers) as $address => $producer) {
|
||||
unset($this->consumers[$address]);
|
||||
}
|
||||
|
||||
foreach ($producers as $address => $producer) {
|
||||
if (null !== ($this->consumers[$address][$topic][$channel] ?? null)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->keepConnection(
|
||||
Consumer::create(
|
||||
$address,
|
||||
$topic,
|
||||
$channel,
|
||||
$onMessage,
|
||||
$config,
|
||||
$this->logger,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
yield delay($this->config->pollingInterval);
|
||||
}
|
||||
});
|
||||
|
||||
$this->watch($topic);
|
||||
|
||||
$this->logger->info('Subscribed.', compact('topic', 'channel'));
|
||||
}
|
||||
|
||||
public function unsubscribe(string $topic, string $channel): void
|
||||
{
|
||||
if (null === ($this->running[$topic][$channel] ?? null)) {
|
||||
$this->logger->debug('Not subscribed.', compact('topic', 'channel'));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
unset($this->running[$topic][$channel]);
|
||||
|
||||
if ([] === $this->running[$topic]) {
|
||||
unset($this->running[$topic]);
|
||||
}
|
||||
|
||||
$this->logger->info('Unsubscribed.', compact('topic', 'channel'));
|
||||
}
|
||||
|
||||
private function keepConnection(Consumer $consumer): void
|
||||
{
|
||||
$this->consumers[$consumer->address][$consumer->topic][$consumer->channel] = $consumer;
|
||||
|
||||
asyncCall(function () use ($consumer): \Generator {
|
||||
while (null !== ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
|
||||
try {
|
||||
yield $consumer->connect();
|
||||
} catch (DnsException $e) {
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
unset(
|
||||
$this->consumers[$consumer->address],
|
||||
$this->producers[$consumer->topic][$consumer->address],
|
||||
);
|
||||
|
||||
return;
|
||||
} catch (\Throwable $e) {
|
||||
$this->logger->error($e->getMessage(), ['exception' => $e]);
|
||||
|
||||
yield delay($this->config->pollingInterval);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
/** @phpstan-ignore-next-line */
|
||||
if (null === ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
|
||||
$consumer->close();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (!$consumer->isConnected()) {
|
||||
break;
|
||||
}
|
||||
|
||||
yield delay(500);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private function watch(string $topic): void
|
||||
{
|
||||
if (\array_key_exists($topic, $this->topicWatchers)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->topicWatchers[$topic] = true;
|
||||
|
||||
asyncCall(function () use ($topic): \Generator {
|
||||
$cancellationToken = new NullCancellationToken();
|
||||
$requestHandler = function (string $uri) use ($topic, $cancellationToken): \Generator {
|
||||
$this->logger->debug('Lookup', compact('topic'));
|
||||
|
||||
/** @var Response $response */
|
||||
$response = yield $this->httpClient->request(new Request($uri.'/lookup?topic='.$topic), $cancellationToken);
|
||||
|
||||
try {
|
||||
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
|
||||
} catch (LookupException $e) {
|
||||
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
|
||||
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
while (\array_key_exists($topic, $this->running)) {
|
||||
$promises = [];
|
||||
foreach ($this->addresses as $address) {
|
||||
$promises[$address] = call($requestHandler, $address);
|
||||
}
|
||||
|
||||
/** @var Lookup\Response[] $responses */
|
||||
$responses = yield $promises;
|
||||
|
||||
$producers = [];
|
||||
foreach ($responses as $response) {
|
||||
foreach ($response->producers as $producer) {
|
||||
$producers[$producer->toTcpUri()] = $producer;
|
||||
}
|
||||
}
|
||||
|
||||
/** @phpstan-ignore-next-line */
|
||||
if (($deferred = ($this->producers[$topic] ?? null)) instanceof Deferred) {
|
||||
$deferred->resolve($producers);
|
||||
}
|
||||
$this->producers[$topic] = $producers;
|
||||
|
||||
yield delay($this->config->pollingInterval);
|
||||
}
|
||||
|
||||
unset($this->topicWatchers[$topic]);
|
||||
});
|
||||
}
|
||||
}
|
@ -1,39 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Lookup;
|
||||
|
||||
final class Producer
|
||||
{
|
||||
public function __construct(
|
||||
public string $broadcastAddress,
|
||||
public string $hostname,
|
||||
public string $remoteAddress,
|
||||
public int $tcpPort,
|
||||
public int $httpPort,
|
||||
public string $version,
|
||||
public array $tombstones,
|
||||
public array $topics,
|
||||
) {
|
||||
}
|
||||
|
||||
public static function fromArray(array $array): self
|
||||
{
|
||||
return new self(
|
||||
$array['broadcast_address'],
|
||||
$array['hostname'],
|
||||
$array['remote_address'],
|
||||
$array['tcp_port'],
|
||||
$array['http_port'],
|
||||
$array['version'],
|
||||
$array['tombstones'] ?? [],
|
||||
$array['topics'] ?? [],
|
||||
);
|
||||
}
|
||||
|
||||
public function toTcpUri(): string
|
||||
{
|
||||
return sprintf('%s:%s', $this->broadcastAddress, $this->tcpPort);
|
||||
}
|
||||
}
|
@ -1,34 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Lookup;
|
||||
|
||||
use Nsq\Exception\LookupException;
|
||||
|
||||
final class Response
|
||||
{
|
||||
/**
|
||||
* @param string[] $channels
|
||||
* @param Producer[] $producers
|
||||
*/
|
||||
public function __construct(
|
||||
public array $channels,
|
||||
public array $producers,
|
||||
) {
|
||||
}
|
||||
|
||||
public static function fromJson(string $json): self
|
||||
{
|
||||
$array = json_decode($json, true, flags: JSON_THROW_ON_ERROR);
|
||||
|
||||
if (\array_key_exists('message', $array)) {
|
||||
throw new LookupException($array['message']);
|
||||
}
|
||||
|
||||
return new self(
|
||||
$array['channels'] ?? [],
|
||||
array_map([Producer::class, 'fromArray'], $array['producers']),
|
||||
);
|
||||
}
|
||||
}
|
103
src/Message.php
103
src/Message.php
@ -4,78 +4,75 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\Promise;
|
||||
use Nsq\Exception\MessageException;
|
||||
use Nsq\Exception\MessageAlreadyFinished;
|
||||
|
||||
final class Message
|
||||
{
|
||||
private bool $processed = false;
|
||||
|
||||
public function __construct(
|
||||
public string $id,
|
||||
public string $body,
|
||||
public int $timestamp,
|
||||
public int $attempts,
|
||||
private Consumer $consumer,
|
||||
) {
|
||||
}
|
||||
|
||||
public static function compose(Frame\Message $message, Consumer $consumer): self
|
||||
{
|
||||
return new self(
|
||||
$message->id,
|
||||
$message->body,
|
||||
$message->timestamp,
|
||||
$message->attempts,
|
||||
$consumer,
|
||||
);
|
||||
}
|
||||
|
||||
public function isProcessed(): bool
|
||||
{
|
||||
return $this->processed;
|
||||
}
|
||||
/**
|
||||
* @psalm-readonly
|
||||
*/
|
||||
public int $timestamp;
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<bool>
|
||||
* @psalm-readonly
|
||||
*/
|
||||
public function finish(): Promise
|
||||
{
|
||||
$this->markAsProcessedOrFail();
|
||||
|
||||
return $this->consumer->fin($this->id);
|
||||
}
|
||||
public int $attempts;
|
||||
|
||||
/**
|
||||
* @psalm-param positive-int|0 $timeout
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
* @psalm-readonly
|
||||
*/
|
||||
public function requeue(int $timeout): Promise
|
||||
{
|
||||
$this->markAsProcessedOrFail();
|
||||
|
||||
return $this->consumer->req($this->id, $timeout);
|
||||
}
|
||||
public string $id;
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<bool>
|
||||
* @psalm-readonly
|
||||
*/
|
||||
public function touch(): Promise
|
||||
public string $body;
|
||||
|
||||
private bool $finished = false;
|
||||
|
||||
private Consumer $consumer;
|
||||
|
||||
public function __construct(int $timestamp, int $attempts, string $id, string $body, Consumer $consumer)
|
||||
{
|
||||
if ($this->processed) {
|
||||
throw MessageException::processed($this);
|
||||
$this->timestamp = $timestamp;
|
||||
$this->attempts = $attempts;
|
||||
$this->id = $id;
|
||||
$this->body = $body;
|
||||
|
||||
$this->consumer = $consumer;
|
||||
}
|
||||
|
||||
public function isFinished(): bool
|
||||
{
|
||||
return $this->finished;
|
||||
}
|
||||
|
||||
public function finish(): void
|
||||
{
|
||||
if ($this->finished) {
|
||||
throw MessageAlreadyFinished::finish($this);
|
||||
}
|
||||
|
||||
return $this->consumer->touch($this->id);
|
||||
$this->consumer->fin($this->id);
|
||||
$this->finished = true;
|
||||
}
|
||||
|
||||
private function markAsProcessedOrFail(): void
|
||||
public function requeue(int $timeout): void
|
||||
{
|
||||
if ($this->processed) {
|
||||
throw MessageException::processed($this);
|
||||
if ($this->finished) {
|
||||
throw MessageAlreadyFinished::requeue($this);
|
||||
}
|
||||
|
||||
$this->processed = true;
|
||||
$this->consumer->req($this->id, $timeout);
|
||||
$this->finished = true;
|
||||
}
|
||||
|
||||
public function touch(): void
|
||||
{
|
||||
if ($this->finished) {
|
||||
throw MessageAlreadyFinished::touch($this);
|
||||
}
|
||||
|
||||
$this->consumer->touch($this->id);
|
||||
}
|
||||
}
|
||||
|
@ -1,47 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Nsq\Exception\NsqException;
|
||||
|
||||
class Parser
|
||||
{
|
||||
private const SIZE = 4;
|
||||
private const TYPE = 4;
|
||||
private const MESSAGE_HEADER_SIZE =
|
||||
8 + // timestamp
|
||||
2 + // attempts
|
||||
16 + // ID
|
||||
4; // Frame type
|
||||
|
||||
public static function parse(Buffer $buffer): ?Frame
|
||||
{
|
||||
if ($buffer->size() < self::SIZE) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$size = $buffer->readInt32();
|
||||
|
||||
if ($buffer->size() < $size + self::SIZE) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$buffer->discard(self::SIZE);
|
||||
|
||||
$type = $buffer->consumeInt32();
|
||||
|
||||
return match ($type) {
|
||||
Frame::TYPE_RESPONSE => new Frame\Response($buffer->consume($size - self::TYPE)),
|
||||
Frame::TYPE_ERROR => new Frame\Error($buffer->consume($size - self::TYPE)),
|
||||
Frame::TYPE_MESSAGE => new Frame\Message(
|
||||
timestamp: $buffer->consumeTimestamp(),
|
||||
attempts: $buffer->consumeAttempts(),
|
||||
id: $buffer->consumeMessageID(),
|
||||
body: $buffer->consume($size - self::MESSAGE_HEADER_SIZE),
|
||||
),
|
||||
default => throw new NsqException(sprintf('Unexpected frame type: "%s"', $type)),
|
||||
};
|
||||
}
|
||||
}
|
134
src/Producer.php
134
src/Producer.php
@ -4,127 +4,41 @@ declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Exception\NsqException;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\call;
|
||||
use function array_map;
|
||||
use function implode;
|
||||
use function pack;
|
||||
|
||||
final class Producer extends Connection
|
||||
{
|
||||
public function __construct(
|
||||
string $address,
|
||||
ClientConfig $clientConfig,
|
||||
LoggerInterface $logger,
|
||||
) {
|
||||
parent::__construct(
|
||||
$address,
|
||||
$clientConfig,
|
||||
$logger,
|
||||
);
|
||||
|
||||
$context = compact('address');
|
||||
$this->onConnect(function () use ($context): void {
|
||||
$this->logger->debug('Producer connected.', $context);
|
||||
});
|
||||
$this->onClose(function () use ($context): void {
|
||||
$this->logger->debug('Producer disconnected.', $context);
|
||||
});
|
||||
$this->logger->debug('Producer created.', $context);
|
||||
}
|
||||
|
||||
public static function create(
|
||||
string $address,
|
||||
ClientConfig $clientConfig = null,
|
||||
LoggerInterface $logger = null,
|
||||
): self {
|
||||
return new self(
|
||||
$address,
|
||||
$clientConfig ?? new ClientConfig(),
|
||||
$logger ?? new NullLogger(),
|
||||
);
|
||||
}
|
||||
|
||||
public function connect(): Promise
|
||||
/**
|
||||
* @psalm-suppress PossiblyFalseOperand
|
||||
*/
|
||||
public function pub(string $topic, string $body): void
|
||||
{
|
||||
if ($this->isConnected()) {
|
||||
return call(static function (): void {
|
||||
});
|
||||
}
|
||||
|
||||
return call(function (): \Generator {
|
||||
yield parent::connect();
|
||||
|
||||
$buffer = new Buffer();
|
||||
|
||||
asyncCall(function () use ($buffer): \Generator {
|
||||
while (null !== $chunk = yield $this->read()) {
|
||||
$buffer->append($chunk);
|
||||
|
||||
while ($frame = Parser::parse($buffer)) {
|
||||
switch (true) {
|
||||
case $frame instanceof Frame\Response:
|
||||
if ($frame->isHeartBeat()) {
|
||||
yield $this->write(Command::nop());
|
||||
}
|
||||
|
||||
// Ok received
|
||||
break;
|
||||
case $frame instanceof Frame\Error:
|
||||
$this->handleError($frame);
|
||||
|
||||
break;
|
||||
default:
|
||||
throw new NsqException('Unreachable statement.');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$this->close(false);
|
||||
});
|
||||
});
|
||||
$this->command('PUB', $topic, $body)->response()->okOrFail();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, string>|string $body
|
||||
* @psalm-param array<mixed, mixed> $bodies
|
||||
*
|
||||
* @psalm-param positive-int|0 $delay
|
||||
*
|
||||
* @psalm-return Promise<bool>
|
||||
* @psalm-suppress PossiblyFalseOperand
|
||||
*/
|
||||
public function publish(string $topic, string | array $body, int $delay = null): Promise
|
||||
public function mpub(string $topic, array $bodies): void
|
||||
{
|
||||
if (!$this->isConnected()) {
|
||||
return new Success(false);
|
||||
}
|
||||
$num = pack('N', \count($bodies));
|
||||
|
||||
return call(
|
||||
function (iterable $commands): \Generator {
|
||||
try {
|
||||
foreach ($commands as $command) {
|
||||
yield $this->write($command);
|
||||
}
|
||||
$mb = implode('', array_map(static function ($body): string {
|
||||
return pack('N', \strlen($body)).$body;
|
||||
}, $bodies));
|
||||
|
||||
return true;
|
||||
} catch (\Throwable) {
|
||||
return false;
|
||||
}
|
||||
},
|
||||
(static function () use ($topic, $body, $delay): \Generator {
|
||||
if (\is_array($body) && null === $delay) {
|
||||
yield Command::mpub($topic, $body);
|
||||
} elseif (null !== $delay) {
|
||||
foreach ((array) $body as $content) {
|
||||
yield Command::dpub($topic, $content, $delay);
|
||||
}
|
||||
} else {
|
||||
yield Command::pub($topic, $body);
|
||||
}
|
||||
})(),
|
||||
);
|
||||
$this->command('MPUB', $topic, $num.$mb)->response()->okOrFail();
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-suppress PossiblyFalseOperand
|
||||
*/
|
||||
public function dpub(string $topic, string $body, int $delay): void
|
||||
{
|
||||
$this->command('DPUB', [$topic, $delay], $body)->response()->okOrFail();
|
||||
}
|
||||
}
|
||||
|
63
src/Reconnect/ExponentialStrategy.php
Normal file
63
src/Reconnect/ExponentialStrategy.php
Normal file
@ -0,0 +1,63 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Reconnect;
|
||||
|
||||
use Nsq\Exception\ConnectionFail;
|
||||
use Psr\Log\LoggerAwareTrait;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
use Throwable;
|
||||
|
||||
final class ExponentialStrategy implements ReconnectStrategy
|
||||
{
|
||||
use LoggerAwareTrait;
|
||||
|
||||
private int $delay;
|
||||
|
||||
private int $nextTryAfter;
|
||||
|
||||
private int $attempt = 0;
|
||||
|
||||
private TimeProvider $timeProvider;
|
||||
|
||||
public function __construct(
|
||||
private int $minDelay = 8,
|
||||
private int $maxDelay = 32,
|
||||
TimeProvider $timeProvider = null,
|
||||
LoggerInterface $logger = null,
|
||||
) {
|
||||
$this->delay = 0;
|
||||
$this->timeProvider = $timeProvider ?? new RealTimeProvider();
|
||||
$this->nextTryAfter = $this->timeProvider->time();
|
||||
$this->logger = $logger ?? new NullLogger();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public function connect(callable $callable): void
|
||||
{
|
||||
$currentTime = $this->timeProvider->time();
|
||||
|
||||
if ($currentTime < $this->nextTryAfter) {
|
||||
throw new ConnectionFail('Time to reconnect has not yet come');
|
||||
}
|
||||
|
||||
try {
|
||||
$callable();
|
||||
} catch (Throwable $e) {
|
||||
$nextDelay = 0 === $this->delay ? $this->minDelay : $this->delay * 2;
|
||||
$this->delay = $nextDelay > $this->maxDelay ? $this->maxDelay : $nextDelay;
|
||||
$this->nextTryAfter = $currentTime + $this->delay;
|
||||
|
||||
$this->logger->warning('Reconnect #{attempt} after {delay}s', ['attempt' => ++$this->attempt, 'delay' => $this->delay]);
|
||||
|
||||
throw $e;
|
||||
}
|
||||
|
||||
$this->delay = 0;
|
||||
$this->attempt = 0;
|
||||
}
|
||||
}
|
13
src/Reconnect/RealTimeProvider.php
Normal file
13
src/Reconnect/RealTimeProvider.php
Normal file
@ -0,0 +1,13 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Reconnect;
|
||||
|
||||
final class RealTimeProvider implements TimeProvider
|
||||
{
|
||||
public function time(): int
|
||||
{
|
||||
return time();
|
||||
}
|
||||
}
|
15
src/Reconnect/ReconnectStrategy.php
Normal file
15
src/Reconnect/ReconnectStrategy.php
Normal file
@ -0,0 +1,15 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Reconnect;
|
||||
|
||||
use Nsq\Exception\ConnectionFail;
|
||||
|
||||
interface ReconnectStrategy
|
||||
{
|
||||
/**
|
||||
* @throws ConnectionFail
|
||||
*/
|
||||
public function connect(callable $callable): void;
|
||||
}
|
10
src/Reconnect/TimeProvider.php
Normal file
10
src/Reconnect/TimeProvider.php
Normal file
@ -0,0 +1,10 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Reconnect;
|
||||
|
||||
interface TimeProvider
|
||||
{
|
||||
public function time(): int;
|
||||
}
|
87
src/Response.php
Normal file
87
src/Response.php
Normal file
@ -0,0 +1,87 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Nsq\Exception\NsqError;
|
||||
use Nsq\Exception\UnexpectedResponse;
|
||||
use PHPinnacle\Buffer\ByteBuffer;
|
||||
use function json_decode;
|
||||
use function sprintf;
|
||||
use const JSON_THROW_ON_ERROR;
|
||||
|
||||
final class Response
|
||||
{
|
||||
private const OK = 'OK';
|
||||
private const HEARTBEAT = '_heartbeat_';
|
||||
private const TYPE_RESPONSE = 0;
|
||||
private const TYPE_ERROR = 1;
|
||||
private const TYPE_MESSAGE = 2;
|
||||
|
||||
private int $type;
|
||||
|
||||
private ByteBuffer $buffer;
|
||||
|
||||
public function __construct(ByteBuffer $buffer)
|
||||
{
|
||||
$this->type = $buffer->consumeUint32();
|
||||
$this->buffer = $buffer;
|
||||
}
|
||||
|
||||
public function okOrFail(): void
|
||||
{
|
||||
if (self::TYPE_ERROR === $this->type) {
|
||||
throw new NsqError($this->buffer->bytes());
|
||||
}
|
||||
|
||||
if (self::TYPE_RESPONSE !== $this->type) {
|
||||
// @codeCoverageIgnoreStart
|
||||
throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type));
|
||||
// @codeCoverageIgnoreEnd
|
||||
}
|
||||
|
||||
if (self::OK !== $this->buffer->bytes()) {
|
||||
// @codeCoverageIgnoreStart
|
||||
throw new UnexpectedResponse(sprintf('OK response expected, but "%s" received.', $this->buffer->bytes()));
|
||||
// @codeCoverageIgnoreEnd
|
||||
}
|
||||
}
|
||||
|
||||
public function isHeartBeat(): bool
|
||||
{
|
||||
return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes();
|
||||
}
|
||||
|
||||
/**
|
||||
* @phpstan-ignore-next-line
|
||||
*/
|
||||
public function toArray(): array
|
||||
{
|
||||
if (self::TYPE_RESPONSE !== $this->type) {
|
||||
// @codeCoverageIgnoreStart
|
||||
throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type));
|
||||
// @codeCoverageIgnoreEnd
|
||||
}
|
||||
|
||||
return json_decode($this->buffer->bytes(), true, flags: JSON_THROW_ON_ERROR);
|
||||
}
|
||||
|
||||
public function toMessage(Consumer $reader): Message
|
||||
{
|
||||
if (self::TYPE_MESSAGE !== $this->type) {
|
||||
// @codeCoverageIgnoreStart
|
||||
throw new UnexpectedResponse(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type));
|
||||
// @codeCoverageIgnoreEnd
|
||||
}
|
||||
|
||||
$buffer = new ByteBuffer($this->buffer->bytes());
|
||||
|
||||
$timestamp = $buffer->consumeInt64();
|
||||
$attempts = $buffer->consumeUint16();
|
||||
$id = $buffer->consume(Bytes::BYTES_ID);
|
||||
$body = $buffer->flush();
|
||||
|
||||
return new Message($timestamp, $attempts, $id, $body, $reader);
|
||||
}
|
||||
}
|
@ -1,26 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\ByteStream\ClosedException;
|
||||
use Amp\Promise;
|
||||
|
||||
interface Stream
|
||||
{
|
||||
/**
|
||||
* @psalm-return Promise<null|string>
|
||||
*/
|
||||
public function read(): Promise;
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
*/
|
||||
public function write(string $data): Promise;
|
||||
|
||||
/**
|
||||
* @throws ClosedException
|
||||
*/
|
||||
public function close(): void;
|
||||
}
|
@ -1,104 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Stream;
|
||||
|
||||
use Amp\Promise;
|
||||
use Nsq\Buffer;
|
||||
use Nsq\Exception\StreamException;
|
||||
use Nsq\Stream;
|
||||
|
||||
use function Amp\call;
|
||||
|
||||
class GzipStream implements Stream
|
||||
{
|
||||
private ?\InflateContext $inflate = null;
|
||||
|
||||
private ?\DeflateContext $deflate = null;
|
||||
|
||||
private Buffer $buffer;
|
||||
|
||||
public function __construct(private Stream $stream, private int $level, string $bytes = '')
|
||||
{
|
||||
/** @var false|\InflateContext $inflate */
|
||||
$inflate = @inflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
|
||||
/** @var \DeflateContext|false $deflate */
|
||||
$deflate = @deflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
|
||||
|
||||
if (false === $inflate) {
|
||||
throw new StreamException('Failed initializing inflate context');
|
||||
}
|
||||
|
||||
if (false === $deflate) {
|
||||
throw new StreamException('Failed initializing deflate context');
|
||||
}
|
||||
|
||||
$this->inflate = $inflate;
|
||||
$this->deflate = $deflate;
|
||||
$this->buffer = new Buffer($bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function read(): Promise
|
||||
{
|
||||
return call(function (): \Generator {
|
||||
if (null === $this->inflate) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if ($this->buffer->empty()) {
|
||||
$chunk = yield $this->stream->read();
|
||||
|
||||
if (null !== $chunk) {
|
||||
$this->buffer->append($chunk);
|
||||
}
|
||||
}
|
||||
|
||||
$data = $this->buffer->flush();
|
||||
|
||||
if ('' === $data) {
|
||||
return null;
|
||||
}
|
||||
/** @psalm-suppress UndefinedFunction,InvalidArgument */
|
||||
$decompressed = inflate_add($this->inflate, $data, ZLIB_SYNC_FLUSH);
|
||||
|
||||
if (false === $decompressed) {
|
||||
throw new StreamException('Failed adding data to deflate context');
|
||||
}
|
||||
|
||||
return $decompressed;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function write(string $data): Promise
|
||||
{
|
||||
if (null === $this->deflate) {
|
||||
throw new StreamException('The stream has already been closed');
|
||||
}
|
||||
|
||||
/** @psalm-suppress UndefinedFunction,InvalidArgument */
|
||||
$compressed = deflate_add($this->deflate, $data, ZLIB_SYNC_FLUSH);
|
||||
|
||||
if (false === $compressed) {
|
||||
throw new StreamException('Failed adding data to deflate context');
|
||||
}
|
||||
|
||||
return $this->stream->write($compressed);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public function close(): void
|
||||
{
|
||||
$this->stream->close();
|
||||
$this->inflate = null;
|
||||
$this->deflate = null;
|
||||
}
|
||||
}
|
@ -1,37 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Stream;
|
||||
|
||||
use Amp\Failure;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use Nsq\Exception\NsqException;
|
||||
use Nsq\Stream;
|
||||
|
||||
final class NullStream implements Stream
|
||||
{
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function read(): Promise
|
||||
{
|
||||
return new Success(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function write(string $data): Promise
|
||||
{
|
||||
return new Failure(new NsqException('Connection closed.'));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function close(): void
|
||||
{
|
||||
}
|
||||
}
|
@ -1,122 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Stream;
|
||||
|
||||
use Amp\Promise;
|
||||
use Nsq\Buffer;
|
||||
use Nsq\Exception\SnappyException;
|
||||
use Nsq\Stream;
|
||||
|
||||
use function Amp\call;
|
||||
|
||||
class SnappyStream implements Stream
|
||||
{
|
||||
private const IDENTIFIER = [0xFF, 0x06, 0x00, 0x00, 0x73, 0x4E, 0x61, 0x50, 0x70, 0x59];
|
||||
private const SIZE_HEADER = 4;
|
||||
private const SIZE_CHECKSUM = 4;
|
||||
private const SIZE_CHUNK = 65536;
|
||||
private const TYPE_IDENTIFIER = 0xFF;
|
||||
private const TYPE_COMPRESSED = 0x00;
|
||||
private const TYPE_UNCOMPRESSED = 0x01;
|
||||
private const TYPE_PADDING = 0xFE;
|
||||
|
||||
private Buffer $buffer;
|
||||
|
||||
public function __construct(private Stream $stream, string $bytes = '')
|
||||
{
|
||||
if (!\function_exists('snappy_uncompress') || !\function_exists('snappy_compress')) {
|
||||
throw SnappyException::notInstalled();
|
||||
}
|
||||
|
||||
$this->buffer = new Buffer($bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function read(): Promise
|
||||
{
|
||||
return call(function (): \Generator {
|
||||
if ($this->buffer->size() < self::SIZE_HEADER && null !== ($chunk = yield $this->stream->read())) {
|
||||
$this->buffer->append($chunk);
|
||||
}
|
||||
|
||||
$type = $this->buffer->readUInt32LE();
|
||||
|
||||
$size = $type >> 8;
|
||||
$type &= 0xFF;
|
||||
|
||||
while ($this->buffer->size() < $size && null !== ($chunk = yield $this->stream->read())) {
|
||||
$this->buffer->append($chunk);
|
||||
}
|
||||
|
||||
switch ($type) {
|
||||
case self::TYPE_IDENTIFIER:
|
||||
$this->buffer->discard($size);
|
||||
|
||||
return $this->read();
|
||||
case self::TYPE_COMPRESSED:
|
||||
$this->buffer->discard(self::SIZE_CHECKSUM);
|
||||
|
||||
/** @psalm-suppress UndefinedFunction */
|
||||
return snappy_uncompress($this->buffer->consume($size - self::SIZE_HEADER));
|
||||
case self::TYPE_UNCOMPRESSED:
|
||||
$this->buffer->discard(self::SIZE_CHECKSUM);
|
||||
|
||||
return $this->buffer->consume($size - self::SIZE_HEADER);
|
||||
case self::TYPE_PADDING:
|
||||
return $this->read();
|
||||
default:
|
||||
throw SnappyException::invalidHeader();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function write(string $data): Promise
|
||||
{
|
||||
return call(function () use ($data): Promise {
|
||||
/** @var string $result */
|
||||
$result = pack('CCCCCCCCCC', ...self::IDENTIFIER);
|
||||
|
||||
foreach (str_split($data, self::SIZE_CHUNK) as $chunk) {
|
||||
$result .= $this->compress($chunk);
|
||||
}
|
||||
|
||||
return $this->stream->write($result);
|
||||
});
|
||||
}
|
||||
|
||||
public function close(): void
|
||||
{
|
||||
$this->stream->close();
|
||||
}
|
||||
|
||||
private function compress(string $uncompressed): string
|
||||
{
|
||||
/** @psalm-suppress UndefinedFunction */
|
||||
$compressed = snappy_compress($uncompressed);
|
||||
|
||||
\assert(\is_string($compressed));
|
||||
|
||||
[$type, $data] = \strlen($compressed) <= 0.875 * \strlen($uncompressed)
|
||||
? [self::TYPE_COMPRESSED, $compressed]
|
||||
: [self::TYPE_UNCOMPRESSED, $uncompressed];
|
||||
|
||||
/** @psalm-suppress PossiblyFalseArgument */
|
||||
$unpacked = unpack('N', hash('crc32c', $uncompressed, true));
|
||||
\assert(\is_array($unpacked));
|
||||
|
||||
$checksum = $unpacked[1];
|
||||
$checksum = (($checksum >> 15) | ($checksum << 17)) + 0xA282EAD8 & 0xFFFFFFFF;
|
||||
|
||||
$size = (\strlen($data) + 4) << 8;
|
||||
|
||||
/** @psalm-suppress PossiblyFalseOperand */
|
||||
return pack('VV', $type + $size, $checksum).$data;
|
||||
}
|
||||
}
|
@ -1,82 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\Stream;
|
||||
|
||||
use Amp\Promise;
|
||||
use Amp\Socket\ClientTlsContext;
|
||||
use Amp\Socket\ConnectContext;
|
||||
use Amp\Socket\EncryptableSocket;
|
||||
use Nsq\Stream;
|
||||
|
||||
use function Amp\call;
|
||||
use function Amp\Socket\connect;
|
||||
|
||||
class SocketStream implements Stream
|
||||
{
|
||||
public function __construct(private EncryptableSocket $socket)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<self>
|
||||
*/
|
||||
public static function connect(string $uri, int $timeout = 0, int $attempts = 0, bool $noDelay = false): Promise
|
||||
{
|
||||
return call(function () use ($uri, $timeout, $attempts, $noDelay): \Generator {
|
||||
$context = new ConnectContext();
|
||||
|
||||
if ($timeout > 0) {
|
||||
$context = $context->withConnectTimeout($timeout);
|
||||
}
|
||||
|
||||
if ($attempts > 0) {
|
||||
$context = $context->withMaxAttempts($attempts);
|
||||
}
|
||||
|
||||
if ($noDelay) {
|
||||
$context = $context->withTcpNoDelay();
|
||||
}
|
||||
|
||||
$context = $context->withTlsContext(
|
||||
(new ClientTlsContext(''))
|
||||
->withoutPeerVerification(),
|
||||
);
|
||||
|
||||
return new self(yield connect($uri, $context));
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<null|string>
|
||||
*/
|
||||
public function read(): Promise
|
||||
{
|
||||
return $this->socket->read();
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
*/
|
||||
public function write(string $data): Promise
|
||||
{
|
||||
return $this->socket->write($data);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public function close(): void
|
||||
{
|
||||
$this->socket->close();
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Promise<void>
|
||||
*/
|
||||
public function setupTls(): Promise
|
||||
{
|
||||
return $this->socket->setupTls();
|
||||
}
|
||||
}
|
39
src/Subscriber.php
Normal file
39
src/Subscriber.php
Normal file
@ -0,0 +1,39 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Generator;
|
||||
|
||||
final class Subscriber
|
||||
{
|
||||
public const STOP = 0;
|
||||
|
||||
private Consumer $reader;
|
||||
|
||||
public function __construct(Consumer $reader)
|
||||
{
|
||||
$this->reader = $reader;
|
||||
}
|
||||
|
||||
/**
|
||||
* @psalm-return Generator<int, Message|float|null, int|float|null, void>
|
||||
*/
|
||||
public function subscribe(string $topic, string $channel): Generator
|
||||
{
|
||||
$this->reader->sub($topic, $channel);
|
||||
|
||||
while (true) {
|
||||
$this->reader->rdy(1);
|
||||
|
||||
$command = yield $this->reader->receive()?->toMessage($this->reader);
|
||||
|
||||
if (self::STOP === $command) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
$this->reader->disconnect();
|
||||
}
|
||||
}
|
@ -7,11 +7,6 @@ use PHPUnit\Framework\TestCase;
|
||||
|
||||
final class ClientConfigTest extends TestCase
|
||||
{
|
||||
public function testNegotiationPayload(): void
|
||||
{
|
||||
self::assertJson((new ClientConfig())->asNegotiationPayload());
|
||||
}
|
||||
|
||||
public function testInvalidCompression(): void
|
||||
{
|
||||
$this->expectException(InvalidArgumentException::class);
|
||||
@ -19,42 +14,4 @@ final class ClientConfigTest extends TestCase
|
||||
|
||||
new ClientConfig(deflate: true, snappy: true);
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider array
|
||||
*/
|
||||
public function testFromArray(array $data, array $expected): void
|
||||
{
|
||||
self::assertSame($expected, get_object_vars(ClientConfig::fromArray($data)));
|
||||
}
|
||||
|
||||
public function array(): Generator
|
||||
{
|
||||
$default = get_object_vars(new ClientConfig());
|
||||
|
||||
yield 'Empty array' => [[], $default];
|
||||
|
||||
yield 'With wrong keys' => [['bla' => 'bla'], $default];
|
||||
|
||||
$custom = [
|
||||
'authSecret' => 'SomeSecret',
|
||||
'connectTimeout' => 100,
|
||||
'maxAttempts' => 10,
|
||||
'tcpNoDelay' => true,
|
||||
'rdyCount' => 1,
|
||||
'featureNegotiation' => true,
|
||||
'clientId' => 'SomeGorgeousClientId',
|
||||
'deflate' => true,
|
||||
'deflateLevel' => 1,
|
||||
'heartbeatInterval' => 31111,
|
||||
'hostname' => gethostname(),
|
||||
'msgTimeout' => 59999,
|
||||
'sampleRate' => 25,
|
||||
'tls' => true,
|
||||
'snappy' => false,
|
||||
'userAgent' => 'nsqphp/test',
|
||||
];
|
||||
|
||||
yield 'Full filled' => [$custom, $custom];
|
||||
}
|
||||
}
|
||||
|
@ -1,28 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use Nsq\ErrorType;
|
||||
use Nsq\Frame\Error;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
final class ErrorTypeTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @dataProvider data
|
||||
*/
|
||||
public function testConstructor(Error $frame, bool $isConnectionTerminated): void
|
||||
{
|
||||
self::assertSame($isConnectionTerminated, ErrorType::terminable($frame));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \Generator<int, array{0: Error, 1: bool}>
|
||||
*/
|
||||
public function data(): Generator
|
||||
{
|
||||
yield [new Error('E_BAD_BODY'), true];
|
||||
yield [new Error('bla_bla'), true];
|
||||
yield [new Error('E_REQ_FAILED'), false];
|
||||
}
|
||||
}
|
90
tests/ExponentialStrategyTest.php
Normal file
90
tests/ExponentialStrategyTest.php
Normal file
@ -0,0 +1,90 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use Nsq\Exception\ConnectionFail;
|
||||
use Nsq\Reconnect\ExponentialStrategy;
|
||||
use Nsq\Reconnect\TimeProvider;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
final class ExponentialStrategyTest extends TestCase
|
||||
{
|
||||
public function testTimeNotYetCome(): void
|
||||
{
|
||||
$timeProvider = new FakeTimeProvider();
|
||||
$strategy = new ExponentialStrategy(
|
||||
minDelay: 8,
|
||||
maxDelay: 32,
|
||||
timeProvider: $timeProvider,
|
||||
);
|
||||
|
||||
$successConnect = static function (int $time = null) use ($strategy, $timeProvider): void {
|
||||
$timeProvider($time);
|
||||
|
||||
$strategy->connect(static function (): void {
|
||||
});
|
||||
};
|
||||
$failConnect = static function (int $time = null) use ($strategy, $timeProvider): void {
|
||||
$timeProvider($time);
|
||||
|
||||
try {
|
||||
$strategy->connect(function (): void {
|
||||
throw new ConnectionFail('Time come but failed');
|
||||
});
|
||||
} catch (ConnectionFail $e) {
|
||||
self::assertSame('Time come but failed', $e->getMessage());
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
self::fail('Expecting exception with message "Time come but failed"');
|
||||
};
|
||||
$timeNotCome = static function (int $time = null) use ($strategy, $timeProvider): void {
|
||||
$timeProvider($time);
|
||||
|
||||
try {
|
||||
$strategy->connect(function (): void {
|
||||
throw new ConnectionFail('');
|
||||
});
|
||||
} catch (ConnectionFail $e) {
|
||||
self::assertSame('Time to reconnect has not yet come', $e->getMessage());
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
self::fail('Was expecting exception with message "Time to reconnect has not yet come"');
|
||||
};
|
||||
|
||||
$failConnect(0);
|
||||
$timeNotCome(7);
|
||||
$failConnect(8);
|
||||
$timeNotCome(22);
|
||||
$timeNotCome(13);
|
||||
$failConnect(24);
|
||||
$successConnect(56);
|
||||
$failConnect();
|
||||
$timeNotCome();
|
||||
$timeNotCome(63);
|
||||
$failConnect(64);
|
||||
|
||||
$this->expectException(ConnectionFail::class);
|
||||
$this->expectExceptionMessage('Time to reconnect has not yet come');
|
||||
|
||||
$successConnect();
|
||||
}
|
||||
}
|
||||
|
||||
class FakeTimeProvider implements TimeProvider
|
||||
{
|
||||
public int $time = 0;
|
||||
|
||||
public function time(): int
|
||||
{
|
||||
return $this->time;
|
||||
}
|
||||
|
||||
public function __invoke(int $time = null): void
|
||||
{
|
||||
$this->time = $time ?? $this->time;
|
||||
}
|
||||
}
|
@ -2,10 +2,8 @@
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use Amp\Loop;
|
||||
use Amp\Success;
|
||||
use Nsq\Consumer;
|
||||
use Nsq\Exception\MessageException;
|
||||
use Nsq\Exception\MessageAlreadyFinished;
|
||||
use Nsq\Message;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
@ -16,12 +14,16 @@ final class MessageTest extends TestCase
|
||||
*/
|
||||
public function testDoubleFinish(Message $message): void
|
||||
{
|
||||
$this->expectException(MessageException::class);
|
||||
self::assertFalse($message->isFinished());
|
||||
|
||||
Loop::run(function () use ($message): Generator {
|
||||
yield $message->finish();
|
||||
yield $message->finish();
|
||||
});
|
||||
$message->finish();
|
||||
|
||||
self::assertTrue($message->isFinished());
|
||||
|
||||
$this->expectException(MessageAlreadyFinished::class);
|
||||
$this->expectExceptionMessage('Can\'t finish message as it already finished.');
|
||||
|
||||
$message->finish();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -29,12 +31,16 @@ final class MessageTest extends TestCase
|
||||
*/
|
||||
public function testDoubleRequeue(Message $message): void
|
||||
{
|
||||
$this->expectException(MessageException::class);
|
||||
self::assertFalse($message->isFinished());
|
||||
|
||||
Loop::run(function () use ($message): Generator {
|
||||
yield $message->requeue(1);
|
||||
yield $message->requeue(5);
|
||||
});
|
||||
$message->requeue(1);
|
||||
|
||||
self::assertTrue($message->isFinished());
|
||||
|
||||
$this->expectException(MessageAlreadyFinished::class);
|
||||
$this->expectExceptionMessage('Can\'t requeue message as it already finished.');
|
||||
|
||||
$message->requeue(5);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -42,12 +48,14 @@ final class MessageTest extends TestCase
|
||||
*/
|
||||
public function testTouchAfterFinish(Message $message): void
|
||||
{
|
||||
$this->expectException(MessageException::class);
|
||||
self::assertFalse($message->isFinished());
|
||||
|
||||
Loop::run(function () use ($message): Generator {
|
||||
yield $message->finish();
|
||||
yield $message->touch();
|
||||
});
|
||||
$message->finish();
|
||||
|
||||
$this->expectException(MessageAlreadyFinished::class);
|
||||
$this->expectExceptionMessage('Can\'t touch message as it already finished.');
|
||||
|
||||
$message->touch();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -55,12 +63,6 @@ final class MessageTest extends TestCase
|
||||
*/
|
||||
public function messages(): Generator
|
||||
{
|
||||
/** @phpstan-ignore-next-line */
|
||||
$consumer = $this->createMock(Consumer::class);
|
||||
$consumer->method('fin')->willReturn(new Success());
|
||||
$consumer->method('touch')->willReturn(new Success());
|
||||
$consumer->method('req')->willReturn(new Success());
|
||||
|
||||
yield [new Message('id', 'body', 0, 0, $consumer)];
|
||||
yield [new Message(0, 0, 'id', 'body', $this->createStub(Consumer::class))];
|
||||
}
|
||||
}
|
||||
|
@ -3,35 +3,90 @@
|
||||
declare(strict_types=1);
|
||||
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Consumer;
|
||||
use Nsq\Message;
|
||||
use Nsq\Producer;
|
||||
use Nsq\Subscriber;
|
||||
use Nyholm\NSA;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
final class NsqTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @dataProvider configs
|
||||
*/
|
||||
public function test(ClientConfig $clientConfig): void
|
||||
public function test(): void
|
||||
{
|
||||
self::markTestSkipped('');
|
||||
}
|
||||
$producer = new Producer('tcp://localhost:4150');
|
||||
$producer->pub(__FUNCTION__, __FUNCTION__);
|
||||
|
||||
/**
|
||||
* @return Generator<string, array<int, ClientConfig>>
|
||||
*/
|
||||
public function configs(): Generator
|
||||
{
|
||||
yield 'default' => [
|
||||
new ClientConfig(
|
||||
heartbeatInterval: 3000,
|
||||
snappy: false,
|
||||
$consumer = new Consumer(
|
||||
address: 'tcp://localhost:4150',
|
||||
clientConfig: new ClientConfig(
|
||||
heartbeatInterval: 1000,
|
||||
readTimeout: 1,
|
||||
),
|
||||
];
|
||||
);
|
||||
$subscriber = new Subscriber($consumer);
|
||||
$generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__);
|
||||
|
||||
yield 'snappy' => [
|
||||
new ClientConfig(
|
||||
heartbeatInterval: 3000,
|
||||
snappy: true,
|
||||
),
|
||||
];
|
||||
/** @var null|Message $message */
|
||||
$message = $generator->current();
|
||||
|
||||
self::assertInstanceOf(Message::class, $message);
|
||||
self::assertSame(__FUNCTION__, $message->body);
|
||||
$message->finish();
|
||||
|
||||
$generator->next();
|
||||
self::assertNull($generator->current());
|
||||
|
||||
$producer->mpub(__FUNCTION__, [
|
||||
'First mpub message.',
|
||||
'Second mpub message.',
|
||||
]);
|
||||
|
||||
$generator->next();
|
||||
/** @var null|Message $message */
|
||||
$message = $generator->current();
|
||||
self::assertInstanceOf(Message::class, $message);
|
||||
self::assertSame('First mpub message.', $message->body);
|
||||
$message->finish();
|
||||
|
||||
$generator->next();
|
||||
/** @var null|Message $message */
|
||||
$message = $generator->current();
|
||||
self::assertInstanceOf(Message::class, $message);
|
||||
self::assertSame('Second mpub message.', $message->body);
|
||||
$message->requeue(0);
|
||||
|
||||
$generator->next();
|
||||
/** @var null|Message $message */
|
||||
$message = $generator->current();
|
||||
self::assertInstanceOf(Message::class, $message);
|
||||
self::assertSame('Second mpub message.', $message->body);
|
||||
$message->finish();
|
||||
|
||||
$producer->dpub(__FUNCTION__, 'Deferred message.', 2000);
|
||||
|
||||
$generator->next();
|
||||
/** @var null|Message $message */
|
||||
$message = $generator->current();
|
||||
self::assertNull($message);
|
||||
|
||||
NSA::setProperty(
|
||||
NSA::getProperty($consumer, 'clientConfig'),
|
||||
'readTimeout',
|
||||
10,
|
||||
);
|
||||
|
||||
$generator->next();
|
||||
|
||||
/** @var null|Message $message */
|
||||
$message = $generator->current();
|
||||
self::assertInstanceOf(Message::class, $message);
|
||||
self::assertSame('Deferred message.', $message->body);
|
||||
$message->touch();
|
||||
$message->finish();
|
||||
|
||||
self::assertTrue($consumer->isReady());
|
||||
$generator->send(Subscriber::STOP);
|
||||
self::assertFalse($consumer->isReady());
|
||||
}
|
||||
}
|
||||
|
@ -2,8 +2,7 @@
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use Amp\Loop;
|
||||
use Nsq\Exception\ServerException;
|
||||
use Nsq\Exception\NsqError;
|
||||
use Nsq\Producer;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
@ -14,18 +13,16 @@ final class ProducerTest extends TestCase
|
||||
*/
|
||||
public function testPubFail(string $topic, string $body, string $exceptionMessage): void
|
||||
{
|
||||
$this->expectException(ServerException::class);
|
||||
$this->expectException(NsqError::class);
|
||||
$this->expectExceptionMessage($exceptionMessage);
|
||||
|
||||
$producer = Producer::create('tcp://localhost:4150');
|
||||
|
||||
Loop::run(static function () use ($producer, $topic, $body): Generator {
|
||||
yield $producer->connect();
|
||||
|
||||
yield $producer->publish($topic, $body);
|
||||
});
|
||||
$producer = new Producer('tcp://localhost:4150');
|
||||
$producer->pub($topic, $body);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Generator<string, array>
|
||||
*/
|
||||
public function pubFails(): Generator
|
||||
{
|
||||
yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0'];
|
||||
|
Reference in New Issue
Block a user