61 Commits

Author SHA1 Message Date
350f08c2c1 Fix and suppress static analyze errors 2022-09-11 23:07:46 +03:00
bd8d13692f Remove infection, revert dg/bypass-finals 2022-09-11 22:15:11 +03:00
9414042f57 Bump min req php ^8.1 2022-09-11 22:10:39 +03:00
c88fdb6354 Bump dependencies 2022-09-11 22:05:17 +03:00
3bf8444e06 tests: add testNegotiationPayload 2022-09-11 22:03:37 +03:00
d4b29c69db Fix some psalm/phpstan errors 2021-09-19 02:34:53 +03:00
381ba88f8d Simplify discovery producer example 2021-09-19 02:13:11 +03:00
43ab797ee0 Add Makefile 2021-09-19 02:10:09 +03:00
45048320a8 Lookup: graceful shutdown 2021-09-19 01:57:53 +03:00
be696f17b5 Lookup: reconnections 2021-09-19 01:30:30 +03:00
d0307b47e6 Fix wrong return type 2021-09-19 00:44:31 +03:00
bac144582e Use psalm-return instead of return for generics 2021-09-19 00:44:20 +03:00
5e43f9b0df Prevent multiple watchers for one topic 2021-09-19 00:42:26 +03:00
e1725ea140 Return bool from public api of Producer and Consumer to indicate of success of process and prevent throwing exception from it 2021-09-19 00:42:26 +03:00
b130b09a82 Decrease default ready count to 100 2021-09-18 23:17:53 +03:00
155e896543 Unset previous producers on lookup newer 2021-09-18 23:01:56 +03:00
2c07fb1aca Remove unnecessary Producer::run and Consumer::run 2021-09-18 22:55:44 +03:00
97b3d8206c Refactor logging 2021-09-18 22:55:44 +03:00
c8cd41777f Close connection on error while read or write 2021-09-18 22:46:03 +03:00
fcd1f256ff Add Connection::onCallback, chain callbacks onClose 2021-09-18 22:39:43 +03:00
530b03974e Graceful close connection 2021-09-18 22:17:43 +03:00
083bc44c9c Decouple Consumer and Producer from direct call Streams 2021-09-18 22:17:43 +03:00
aa3333bfba Stream::close can throw Amp\ByteStream\ClosedException 2021-09-18 22:14:56 +03:00
679573ad0a update readme 2021-09-15 01:54:35 +03:00
32f226942e Add discovery examples 2021-09-15 01:53:20 +03:00
dbe312ddf1 Lookup fix cancelling topic watcher 2021-09-15 01:35:26 +03:00
47194b30f3 Refactor Lookup 2021-09-15 01:26:47 +03:00
5bab748952 Add Lookup/Producer::toTcpUri 2021-09-15 01:26:20 +03:00
43b92e9bb9 cs fix 2021-09-15 01:26:06 +03:00
c505d62533 Add logging on Consumer disconnected 2021-09-15 01:26:00 +03:00
f3f67bedd3 add Connection::onClose 2021-09-15 01:25:47 +03:00
af4e86d219 php-cs-fixer add types_spaces rule 2021-09-15 01:25:27 +03:00
53d7813198 Add Connection::isConnected() 2021-09-13 23:49:52 +03:00
56cdda1a0d Add some logs 2021-09-13 23:49:15 +03:00
7984d09e83 Return Failure on try to write to NullStream 2021-09-13 23:48:17 +03:00
3c7686405d Dynamic log level on LookupException 2021-09-13 23:47:45 +03:00
6428a1ec33 Make Lookup/{Response,Producer} compatible to /nodes endpoint 2021-09-13 23:47:20 +03:00
65adecde3f Leave one nsq service in favor of use docker-compose scale 2021-09-13 23:46:24 +03:00
e3e83212c4 deflate 2021-09-04 01:56:49 +03:00
ca2c2ee633 Lookup (#14) 2021-09-04 01:55:34 +03:00
e9dce19e25 Fix: Message::touch must not mark message as processed 2021-07-08 18:35:31 +03:00
a913fb0907 Add rdyCount to ClientConfig 2021-07-08 17:41:59 +03:00
6c8c30a1bd Bump ergebnis/composer-normalize to 2.15 2021-06-17 18:23:56 +03:00
a08bccac45 phpstan: ignore Return type of call to method PHPUnit\Framework\TestCase::createMock() contains unresolvable type. 2021-06-17 16:28:24 +03:00
55480ab2c0 Bump friendsofphp/php-cs-fixer up to 3.0 2021-06-17 16:21:26 +03:00
4546c5085f Bump infection/infection up to 0.23 2021-06-17 16:18:55 +03:00
b6f4726002 Fix: ClientConfig::$connectionTimeout is a milliseconds 2021-06-17 15:46:51 +03:00
e3c64f6f09 tls 2021-06-09 20:08:15 +03:00
411fabb1f5 Fix: get default settings from real object 2021-06-09 18:20:46 +03:00
34847e2467 Mark ServerConfig as internal 2021-06-09 18:17:00 +03:00
6e50fa2258 Refactoring configs. Use connections settings on establishing connection. Create ClientConfig from array. 2021-06-09 18:15:45 +03:00
ca54c7ad80 Add symfony/var-dumper for dev 2021-06-09 18:13:17 +03:00
4c00fb0fd5 Message::requeue $timeout must be positive-int or zero 2021-06-09 17:08:46 +03:00
d3e1788d23 Refactoring: Simplify Message methods, add isProcessed method. 2021-06-09 16:33:18 +03:00
2fc7e37120 Producer::publish $delay must be positive-int or zero 2021-06-09 15:34:29 +03:00
92d8304a6a Remove Producer::defer 2021-03-10 22:12:09 +03:00
3e4e8c3802 Fix ErrorTypeTest::data() return phpdoc 2021-03-10 21:19:58 +03:00
2f638b9c75 Remove Reader and ConsumerInterface 2021-03-10 21:19:58 +03:00
9f004417fa Make connect methods idempotent 2021-02-28 20:26:08 +03:00
e670cb161c amphp (#11) 2021-02-26 00:59:52 +03:00
9cefa847a9 Add .gitattributes 2021-02-11 15:12:03 +03:00
63 changed files with 2150 additions and 1036 deletions

1
.env Normal file
View File

@@ -0,0 +1 @@
NSQ_VERSION=1.2.1

21
.gitattributes vendored Normal file
View File

@@ -0,0 +1,21 @@
# Exclude build/test files from archive
/.editorconfig export-ignore
/.gitattributes export-ignore
/.github export-ignore
/.gitignore export-ignore
/.php_cs export-ignore
/.php_cs.dist export-ignore
/.psalm export-ignore
/docs export-ignore
/examples export-ignore
/infection.json export-ignore
/infection.json.dist export-ignore
/phpstan.neon export-ignore
/phpunit.xml export-ignore
/phpunit.xml.dist export-ignore
/psalm.xml export-ignore
/tests export-ignore
# Configure diff output for .php and .phar files.
*.php diff=php
*.phar -diff

View File

@@ -14,7 +14,7 @@ jobs:
os:
- ubuntu-latest
php:
- '8.0'
- '8.1'
dependencies:
- lowest
- highest
@@ -33,6 +33,7 @@ jobs:
with:
php-version: ${{ matrix.php }}
coverage: pcov
extensions: kjdev/php-ext-snappy@0.2.1
env:
update: true
@@ -76,7 +77,7 @@ jobs:
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.0'
php-version: '8.1'
env:
update: true
@@ -107,7 +108,8 @@ jobs:
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.0'
php-version: '8.1'
extensions: snappy-kjdev/php-ext-snappy@0.2.1
env:
update: true
@@ -138,7 +140,8 @@ jobs:
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.0'
php-version: '8.1'
extensions: snappy-kjdev/php-ext-snappy@0.2.1
env:
update: true
@@ -158,45 +161,3 @@ jobs:
- name: Run script
run: vendor/bin/psalm --output-format=github
infection:
name: Infection
runs-on: ubuntu-latest
services:
nsqd:
image: nsqio/nsq:v1.2.0
options: --entrypoint /nsqd
ports:
- 4150:4150
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.0'
coverage: pcov
env:
update: true
- name: Get composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
run: composer update --no-progress --no-interaction --prefer-dist
- name: Run script
env:
STRYKER_DASHBOARD_API_KEY: ${{ secrets.STRYKER_DASHBOARD_API_KEY }}
run: |
git fetch --depth=1 origin $GITHUB_BASE_REF
php vendor/bin/infection -j2 --git-diff-filter=A --git-diff-base=origin/$GITHUB_BASE_REF --logger-github --ignore-msi-with-no-mutations --only-covered

2
.gitignore vendored
View File

@@ -1,6 +1,6 @@
/vendor/
/composer.lock
/.php_cs.cache
/.php-cs-fixer.cache
/.phpunit.result.cache
/infection.log

45
.php-cs-fixer.php Normal file
View File

@@ -0,0 +1,45 @@
<?php
$finder = PhpCsFixer\Finder::create()
->in(__DIR__)
->exclude('vendor');
return (new PhpCsFixer\Config())
->setRiskyAllowed(true)
->setRules([
'@PhpCsFixer' => true,
'@PhpCsFixer:risky' => true,
'@PSR12' => true,
'@PSR12:risky' => true,
'blank_line_before_statement' => [
'statements' => [
'continue',
'do',
'exit',
'goto',
'if',
'return',
'switch',
'throw',
'try',
],
],
'declare_strict_types' => true,
'global_namespace_import' => [
'import_classes' => false,
'import_constants' => false,
'import_functions' => false,
],
'php_unit_internal_class' => false,
'php_unit_test_case_static_method_calls' => ['call_type' => 'self'],
'php_unit_test_class_requires_covers' => false,
'phpdoc_to_comment' => false,
'yoda_style' => true,
'trailing_comma_in_multiline' => [
'after_heredoc' => true,
'elements' => ['arrays', 'arguments', 'parameters'],
],
'types_spaces' => ['space' => 'single'],
])
->setFinder($finder)
->setCacheFile(__DIR__.'/.php-cs-fixer.cache');

View File

@@ -1,28 +0,0 @@
#!/usr/bin/env php
<?php
return (new PhpCsFixer\Config())
->setRiskyAllowed(true)
->setRules([
'@PhpCsFixer' => true,
'@PhpCsFixer:risky' => true,
'@PSR12' => true,
'@PSR12:risky' => true,
'braces' => [
'allow_single_line_closure' => true,
],
'blank_line_before_statement' => [
'statements' => ['continue', 'do', 'die', 'exit', 'goto', 'if', 'return', 'switch', 'throw', 'try'],
],
'declare_strict_types' => true,
'global_namespace_import' => ['import_classes' => false, 'import_constants' => false, 'import_functions' => false],
'php_unit_internal_class' => false,
'php_unit_test_case_static_method_calls'=> ['call_type' => 'self'],
'php_unit_test_class_requires_covers' => false,
'phpdoc_to_comment' => false,
'yoda_style' => true,
])
->setFinder(
PhpCsFixer\Finder::create()
->in(__DIR__)
);

20
Makefile Normal file
View File

@@ -0,0 +1,20 @@
all: install composer-validate php-cs-fixer psalm phpstan phpunit
install:
composer install
psalm:
php vendor/bin/psalm
phpstan:
php vendor/bin/phpstan analyse
phpunit:
php vendor/bin/phpunit
php-cs-fixer:
php vendor/bin/php-cs-fixer fix
composer-validate:
composer validate

View File

@@ -1,6 +1,6 @@
# Nsq PHP
<img src="https://github.com/nsqphp/nsqphp/raw/main/logo.png" alt="" align="left" width="150">
<img src="https://github.com/nsqphp/nsqphp/raw/main/docs/logo.png" alt="" align="left" width="150">
PHP Client for [NSQ](https://nsq.io/).
@@ -31,10 +31,10 @@ Features
- [x] PUB
- [x] SUB
- [X] Feature Negotiation
- [ ] Discovery
- [X] Discovery
- [ ] Backoff
- [ ] TLS
- [ ] Deflate
- [X] TLS
- [X] Deflate
- [X] Snappy
- [X] Sampling
- [X] AUTH
@@ -47,53 +47,59 @@ Usage
```php
use Nsq\Producer;
$producer = new Producer(address: 'tcp://nsqd:4150');
$producer = Producer::create(address: 'tcp://nsqd:4150');
// Publish a message to a topic
$producer->pub('topic', 'Simple message');
$producer->publish('topic', 'Simple message');
// Publish multiple messages to a topic (atomically)
$producer->mpub('topic', [
$producer->publish('topic', [
'Message one',
'Message two',
]);
// Publish a deferred message to a topic
$producer->dpub('topic', 'Deferred message', delay: 5000);
$producer->publish('topic', 'Deferred message', delay: 5000);
```
### Consumer
```php
use Nsq\Consumer;
use Nsq\Protocol\Message;
use Nsq\Message;
$consumer = new Consumer(
$consumer = Consumer::create(
address: 'tcp://nsqd:4150',
topic: 'topic',
channel: 'channel',
address: 'tcp://nsqd:4150',
onMessage: static function (Message $message): Generator {
yield $message->touch(); // Reset the timeout for an in-flight message
yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
yield $message->finish(); // Finish a message (indicate successful processing)
},
);
```
// Simple blocking loop based on generator
$generator = $consumer->generator();
### Lookup
foreach ($generator as $message) {
if ($message instanceof Message) {
$payload = $message->body;
```php
use Nsq\Lookup;
use Nsq\Message;
// handle message
$lookup = new Lookup('http://nsqlookupd0:4161');
$lookup = new Lookup(['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161']);
$message->touch(); // Reset the timeout for an in-flight message
$message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
$message->finish(); // Finish a message (indicate successful processing)
}
$callable = static function (Message $message): Generator {
yield $message->touch(); // Reset the timeout for an in-flight message
yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)
yield $message->finish(); // Finish a message (indicate successful processing)
};
// In case of nothing received during timeout generator will return NULL
// Here we can do something between messages, like pcntl_signal_dispatch()
$lookup->subscribe(topic: 'topic', channel: 'channel', onMessage: $callable);
$lookup->subscribe(topic: 'anotherTopic', channel: 'channel', onMessage: $callable);
// Gracefully close connection (loop will be ended)
$generator->send(0);
}
$lookup->unsubscribe(topic: 'local', channel: 'channel');
$lookup->stop(); // unsubscribe all
```
### Integrations

View File

@@ -11,39 +11,49 @@
}
],
"require": {
"php": "^8.0.1",
"php": "^8.1",
"ext-json": "*",
"amphp/http-client": "^4.6",
"amphp/socket": "^1.1",
"composer/semver": "^3.2",
"phpinnacle/buffer": "^1.2",
"psr/log": "^1.1"
"psr/log": "^3.0"
},
"require-dev": {
"amphp/log": "^1.1",
"dg/bypass-finals": "^1.3",
"ergebnis/composer-normalize": "9999999-dev",
"friendsofphp/php-cs-fixer": "^2.18",
"infection/infection": "^0.20.2",
"ergebnis/composer-normalize": "^2.15",
"friendsofphp/php-cs-fixer": "^3.4",
"nyholm/nsa": "^1.2",
"phpstan/phpstan": "^0.12.68",
"phpstan/phpstan-phpunit": "^0.12.17",
"phpstan/phpstan-strict-rules": "^0.12.9",
"phpstan/phpstan": "^1.8",
"phpstan/phpstan-phpunit": "^1.1",
"phpstan/phpstan-strict-rules": "^1.3",
"phpunit/phpunit": "^9.5",
"symfony/var-dumper": "^6.1",
"vimeo/psalm": "^4.4"
},
"config": {
"sort-packages": true
"sort-packages": true,
"allow-plugins": {
"ergebnis/composer-normalize": true,
"infection/extension-installer": true
}
},
"autoload": {
"psr-4": {
"Nsq\\": "src/"
}
},
"autoload-dev": {
"files": [
"vendor/symfony/var-dumper/Resources/functions/dump.php"
]
},
"minimum-stability": "dev",
"prefer-stable": true,
"scripts": {
"cs": [
"vendor/bin/php-cs-fixer fix"
"vendor/bin/php-cs-fixer fix --using-cache=no"
],
"cs-check": [
"vendor/bin/php-cs-fixer fix --verbose --diff --dry-run"
@@ -60,7 +70,7 @@
"vendor/bin/psalm"
],
"test": [
"@norm-check",
"@norm",
"@cs",
"@phpstan",
"@psalm",

View File

@@ -2,19 +2,56 @@ version: '3.7'
services:
nsqd:
image: nsqio/nsq:v1.2.0
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqd'
command: /nsqd -log-level debug
# command: /nsqd
ports:
- 4150:4150
- 4151:4151
command: >-
nsqd
--log-level debug
--lookupd-tcp-address nsqlookupd0:4160
--lookupd-tcp-address nsqlookupd1:4160
--lookupd-tcp-address nsqlookupd2:4160
nsqlookupd0:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd0'
command: /nsqlookupd -log-level debug
nsqlookupd1:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd1'
command: /nsqlookupd -log-level debug
nsqlookupd2:
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqlookupd2'
command: /nsqlookupd -log-level debug
nsqadmin:
image: nsqio/nsq:v1.2.0
image: nsqio/nsq:v${NSQ_VERSION}
labels:
ru.grachevko.dhu: 'nsqadmin'
command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171
ports:
- 4171:4171
command:
- nsqadmin
- --http-address=0.0.0.0:4171
- --lookupd-http-address=nsqlookupd0:4161
- --lookupd-http-address=nsqlookupd1:4161
- --lookupd-http-address=nsqlookupd2:4161
depends_on:
- nsqlookupd0
- nsqlookupd1
- nsqlookupd2
tail:
image: nsqio/nsq:v${NSQ_VERSION}
command: >-
nsq_tail
--channel nsq_tail
--topic local
--lookupd-http-address nsqlookupd1:4161
depends_on:
- nsqd
- nsqlookupd1

View File

Before

Width:  |  Height:  |  Size: 98 KiB

After

Width:  |  Height:  |  Size: 98 KiB

View File

@@ -0,0 +1,47 @@
<?php
declare(strict_types=1);
require dirname(__DIR__).'/../vendor/autoload.php';
use Amp\ByteStream;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Loop;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Lookup;
use Nsq\Message;
Loop::run(static function () {
$handler = new StreamHandler(ByteStream\getStdout());
$handler->setFormatter(new ConsoleFormatter());
$logger = new Logger('consumer', [$handler], [new PsrLogMessageProcessor()]);
$callable = static function (Message $message) {
yield $message->finish();
};
$clientConfig = new ClientConfig();
$lookupConfig = new LookupConfig();
$watcherId = Loop::repeat(5000, function () {
yield Amp\Dns\resolver()->reloadConfig();
});
$lookup = Lookup::create(
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
$lookupConfig,
$logger,
);
$lookup->subscribe('local', 'local', $callable, $clientConfig);
Loop::delay(10000, function () use ($lookup, $watcherId) {
$lookup->stop();
Loop::cancel($watcherId);
});
});

View File

@@ -0,0 +1,102 @@
<?php
declare(strict_types=1);
require dirname(__DIR__).'/../vendor/autoload.php';
use Amp\ByteStream;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Loop;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Lookup;
use Nsq\Producer;
use function Amp\asyncCall;
use function Amp\delay;
Loop::run(static function () {
$handler = new StreamHandler(ByteStream\getStdout());
$handler->setFormatter(new ConsoleFormatter());
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
$clientConfig = new ClientConfig();
/** @var Producer[] $producers */
$producers = [];
$lookupConfig = new LookupConfig();
$lookup = Lookup::create(
['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161'],
$lookupConfig,
$logger,
);
$isRunning = true;
asyncCall(static function () use ($lookup, $clientConfig, $logger, &$producers, &$isRunning) {
while ($isRunning) {
/** @var Lookup\Producer[] $nodes */
$nodes = yield $lookup->nodes();
foreach ($nodes as $node) {
$address = $node->toTcpUri();
if (array_key_exists($address, $producers)) {
continue;
}
asyncCall(function () use ($address, $clientConfig, $logger, &$producers) {
yield ($producers[$address] = Producer::create($address, $clientConfig, $logger))
->onClose(function () use (&$producers, $address) {
unset($producers[$address]);
})
->connect()
;
});
}
yield delay(5000);
yield Amp\Dns\resolver()->reloadConfig(); // for reload /etc/hosts
}
});
Loop::delay(5000, function () use (&$isRunning, $logger) {
$logger->info('Stopping producer.');
$isRunning = false;
});
$counter = 0;
while (true) {
if (!$isRunning) {
foreach ($producers as $producer) {
$producer->close();
}
break;
}
if ([] === $producers) {
yield delay(200);
continue;
}
$index = array_rand($producers);
$producer = $producers[$index];
if (!$producer->isConnected()) {
yield delay(100);
continue;
}
yield $producer->publish('local', 'This is message of count '.$counter++);
}
});

View File

@@ -9,12 +9,12 @@ use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Nsq\Config\ClientConfig;
use Nsq\Consumer;
use Nsq\Protocol\Message;
use Nsq\Message;
use function Amp\call;
Loop::run(static function () {
@@ -24,16 +24,6 @@ Loop::run(static function () {
$consumer = new Consumer(
'tcp://localhost:4150',
clientConfig: new ClientConfig(
deflate: false,
snappy: false,
),
logger: $logger,
);
yield $consumer->connect();
yield $consumer->listen(
topic: 'local',
channel: 'local',
onMessage: static function (Message $message) use ($logger): Promise {
@@ -41,9 +31,14 @@ Loop::run(static function () {
$logger->info('Received: {body}', ['body' => $message->body]);
yield $message->finish();
return new Success(false);
});
}
},
clientConfig: new ClientConfig(
deflate: false,
snappy: true,
),
logger: $logger,
);
yield $consumer->connect();
});

View File

@@ -22,12 +22,15 @@ Loop::run(static function () {
'tcp://localhost:4150',
clientConfig: new ClientConfig(
deflate: false,
snappy: false,
heartbeatInterval: 5000,
snappy: true,
),
logger: $logger,
);
yield $producer->connect();
yield $producer->pub(topic: 'topic', body: 'Message body!');
while (true) {
yield $producer->publish(topic: 'local', body: array_fill(0, 200, 'Message body!'));
}
});

View File

@@ -8,3 +8,9 @@ parameters:
paths:
- src
- tests
ignoreErrors:
-
message: '#no value type specified in iterable type array#'
paths:
- %currentWorkingDirectory%/src
- %currentWorkingDirectory%/tests

View File

@@ -1,6 +1,5 @@
<?xml version="1.0"?>
<psalm
allowPhpStormGenerics="true"
ignoreInternalFunctionFalseReturn="false"
ignoreInternalFunctionNullReturn="false"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

37
src/Buffer.php Normal file
View File

@@ -0,0 +1,37 @@
<?php
declare(strict_types=1);
namespace Nsq;
use PHPinnacle\Buffer\ByteBuffer;
/**
* @psalm-suppress
*/
final class Buffer extends ByteBuffer
{
public function readUInt32LE(): int
{
$unpacked = unpack('V', $this->consume(4));
\assert(\is_array($unpacked) && \array_key_exists(1, $unpacked));
return $unpacked[1];
}
public function consumeTimestamp(): int
{
return $this->consumeUint64();
}
public function consumeAttempts(): int
{
return $this->consumeUint16();
}
public function consumeMessageID(): string
{
return $this->consume(16);
}
}

111
src/Command.php Normal file
View File

@@ -0,0 +1,111 @@
<?php
declare(strict_types=1);
namespace Nsq;
use PHPinnacle\Buffer\ByteBuffer;
/**
* @internal
*/
final class Command
{
public static function magic(): string
{
return ' V2';
}
public static function identify(string $data): string
{
return self::pack('IDENTIFY', data: $data);
}
public static function auth(?string $authSecret): string
{
return self::pack('AUTH', data: $authSecret);
}
public static function nop(): string
{
return self::pack('NOP');
}
public static function cls(): string
{
return self::pack('CLS');
}
public static function rdy(int $count): string
{
return self::pack('RDY', (string) $count);
}
public static function fin(string $id): string
{
return self::pack('FIN', $id);
}
public static function req(string $id, int $timeout): string
{
return self::pack('REQ', [$id, $timeout]);
}
public static function touch(string $id): string
{
return self::pack('TOUCH', $id);
}
public static function pub(string $topic, string $body): string
{
return self::pack('PUB', $topic, $body);
}
/**
* @param array<int, string> $bodies
*/
public static function mpub(string $topic, array $bodies): string
{
static $buffer;
$buffer ??= new ByteBuffer();
$buffer->appendUint32(\count($bodies));
foreach ($bodies as $body) {
$buffer->appendUint32(\strlen($body));
$buffer->append($body);
}
return self::pack('MPUB', $topic, $buffer->flush());
}
public static function dpub(string $topic, string $body, int $delay): string
{
return self::pack('DPUB', [$topic, $delay], $body);
}
public static function sub(string $topic, string $channel): string
{
return self::pack('SUB', [$topic, $channel]);
}
/**
* @param array<int, scalar>|string $params
*/
private static function pack(string $command, array | string $params = [], string $data = null): string
{
static $buffer;
$buffer ??= new Buffer();
$command = implode(' ', [$command, ...((array) $params)]);
$buffer->append($command.PHP_EOL);
if (null !== $data) {
$buffer->appendUint32(\strlen($data));
$buffer->append($data);
}
return $buffer->flush();
}
}

View File

@@ -13,7 +13,7 @@ use Composer\InstalledVersions;
*
* @psalm-immutable
*/
final class ClientConfig implements \JsonSerializable
final class ClientConfig
{
/**
* @psalm-suppress ImpureFunctionCall
@@ -26,9 +26,26 @@ final class ClientConfig implements \JsonSerializable
public ?string $authSecret = null,
/**
* The timeout for establishing a connection in seconds.
* The timeout for establishing a connection in milliseconds.
*/
public int $connectTimeout = 10,
public int $connectTimeout = 10000,
/**
* The max attempts for establishing a connection.
*/
public int $maxAttempts = 0,
/**
* Use tcp_nodelay for establishing a connection.
*/
public bool $tcpNoDelay = false,
public int $rdyCount = 100,
/**
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
* it will send back a JSON payload of supported features and metadata.
*/
public bool $featureNegotiation = true,
/**
* An identifier used to disambiguate this client (i.e. something specific to the consumer).
@@ -73,12 +90,6 @@ final class ClientConfig implements \JsonSerializable
*/
public int $sampleRate = 0,
/**
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
* it will send back a JSON payload of supported features and metadata.
*/
public bool $featureNegotiation = true,
/**
* Enable TLS for this connection.
*/
@@ -89,11 +100,6 @@ final class ClientConfig implements \JsonSerializable
*/
public bool $snappy = false,
/**
* The read timeout for connection sockets and for awaiting responses from nsq.
*/
public int $readTimeout = 5,
/**
* A string identifying the agent for this client in the spirit of HTTP.
*/
@@ -114,12 +120,14 @@ final class ClientConfig implements \JsonSerializable
}
}
/**
* @phpstan-ignore-next-line
*/
public function jsonSerialize(): array
public static function fromArray(array $array): self
{
return [
return new self(...array_intersect_key($array, get_class_vars(self::class)));
}
public function asNegotiationPayload(): string
{
$data = [
'client_id' => $this->clientId,
'deflate' => $this->deflate,
'deflate_level' => $this->deflateLevel,
@@ -132,10 +140,7 @@ final class ClientConfig implements \JsonSerializable
'tls_v1' => $this->tls,
'user_agent' => $this->userAgent,
];
}
public function toString(): string
{
return json_encode($this, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
return json_encode($data, JSON_THROW_ON_ERROR);
}
}

View File

@@ -0,0 +1,13 @@
<?php
declare(strict_types=1);
namespace Nsq\Config;
final class LookupConfig
{
public function __construct(
public int $pollingInterval = 10000,
) {
}
}

View File

@@ -8,8 +8,10 @@ namespace Nsq\Config;
* The configuration object that holds the config status for a single Connection.
*
* @psalm-immutable
*
* @internal
*/
final class ConnectionConfig
final class ServerConfig
{
public function __construct(
/**
@@ -82,9 +84,6 @@ final class ConnectionConfig
) {
}
/**
* @phpstan-ignore-next-line
*/
public static function fromArray(array $array): self
{
return new self(

View File

@@ -4,254 +4,266 @@ declare(strict_types=1);
namespace Nsq;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\ZlibInputStream;
use Amp\ByteStream\ZlibOutputStream;
use Amp\ByteStream\ClosedException;
use Amp\Failure;
use Amp\Promise;
use Amp\Socket\Socket;
use Amp\Success;
use Nsq\Config\ClientConfig;
use Nsq\Config\ConnectionConfig;
use Nsq\Config\ServerConfig;
use Nsq\Exception\AuthenticationRequired;
use Nsq\Exception\BadResponse;
use Nsq\Exception\NotConnected;
use Nsq\Exception\NsqError;
use Nsq\Exception\NsqException;
use Nsq\Protocol\Error;
use Nsq\Protocol\Frame;
use Nsq\Protocol\Message;
use Nsq\Protocol\Response;
use Nsq\Stream\NsqInputStream;
use Nsq\Frame\Response;
use Nsq\Stream\GzipStream;
use Nsq\Stream\NullStream;
use Nsq\Stream\SnappyInputStream;
use Nsq\Stream\SnappyOutputStream;
use PHPinnacle\Buffer\ByteBuffer;
use Nsq\Stream\SnappyStream;
use Nsq\Stream\SocketStream;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function Amp\asyncCall;
use function Amp\call;
use function Amp\Socket\connect;
/**
* @internal
*/
abstract class Connection
{
private ?Socket $socket = null;
private Stream $stream;
private InputStream $inputStream;
/**
* @var callable
*/
private $onConnectCallback;
private OutputStream $outputStream;
/**
* @var callable
*/
private $onCloseCallback;
private ByteBuffer $buffer;
protected ?ConnectionConfig $connectionConfig = null;
protected ClientConfig $clientConfig;
protected LoggerInterface $logger;
final public function __construct(
private string $address,
ClientConfig $clientConfig = null,
?LoggerInterface $logger = null,
public function __construct(
/**
* @readonly
*/
public string $address,
protected ClientConfig $clientConfig,
protected LoggerInterface $logger,
) {
$this->buffer = new ByteBuffer();
$this->inputStream = $this->outputStream = new NullStream();
$this->clientConfig = $clientConfig ?? new ClientConfig();
$this->logger = $logger ?? new NullLogger();
$this->stream = new NullStream();
$this->onConnectCallback = static function (): void {
};
$this->onCloseCallback = static function (): void {
};
}
public function __destruct()
{
$this->close();
$this->close(false);
}
public function isConnected(): bool
{
return !$this->stream instanceof NullStream;
}
/**
* @return Promise<void>
* @psalm-return Promise<void>
*/
public function connect(): Promise
{
return call(function (): \Generator {
$this->socket = $this->outputStream = yield connect($this->address);
$this->inputStream = new NsqInputStream($this->socket);
$buffer = new Buffer();
yield $this->outputStream->write(' V2');
/** @var SocketStream $stream */
$stream = yield SocketStream::connect(
$this->address,
$this->clientConfig->connectTimeout,
$this->clientConfig->maxAttempts,
$this->clientConfig->tcpNoDelay,
);
yield $stream->write(Command::magic());
yield $stream->write(Command::identify($this->clientConfig->asNegotiationPayload()));
yield $this->command('IDENTIFY', data: $this->clientConfig->toString());
/** @var Response $response */
$response = yield $this->readResponse();
$this->connectionConfig = ConnectionConfig::fromArray($response->toArray());
$response = yield $this->response($stream, $buffer);
$serverConfig = ServerConfig::fromArray($response->toArray());
if ($this->connectionConfig->snappy) {
$this->inputStream = new NsqInputStream(
new SnappyInputStream($this->inputStream, $this->logger),
);
$this->outputStream = new SnappyOutputStream($this->outputStream);
if ($serverConfig->tls) {
yield $stream->setupTls();
$this->checkIsOK();
/** @var Response $response */
$response = yield $this->response($stream, $buffer);
if (!$response->isOk()) {
throw new NsqException();
}
}
if ($this->connectionConfig->deflate) {
$this->inputStream = new NsqInputStream(
new ZlibInputStream($this->socket, ZLIB_ENCODING_DEFLATE, [
'level' => $this->connectionConfig->deflateLevel,
]),
);
$this->outputStream = new ZlibOutputStream($this->socket, ZLIB_ENCODING_DEFLATE, [
'level' => $this->connectionConfig->deflateLevel,
]);
if ($serverConfig->snappy) {
$stream = new SnappyStream($stream, $buffer->flush());
$this->checkIsOK();
/** @var Response $response */
$response = yield $this->response($stream, $buffer);
if (!$response->isOk()) {
throw new NsqException();
}
}
if ($this->connectionConfig->authRequired) {
if ($serverConfig->deflate) {
$stream = new GzipStream($stream, $serverConfig->deflateLevel, $buffer->flush());
/** @var Response $response */
$response = yield $this->response($stream, $buffer);
if (!$response->isOk()) {
throw new NsqException();
}
}
if ($serverConfig->authRequired) {
if (null === $this->clientConfig->authSecret) {
yield $this->close();
throw new AuthenticationRequired();
}
yield $this->command('AUTH', data: $this->clientConfig->authSecret);
$response = yield $this->readResponse();
yield $stream->write(Command::auth($this->clientConfig->authSecret));
/** @var Response $response */
$response = yield $this->response($stream, $buffer);
$this->logger->info('Authorization response: '.http_build_query($response->toArray()));
}
$this->stream = $stream;
($this->onConnectCallback)();
});
}
/**
* Cleanly close your connection (no more messages are sent).
*
* @return Promise<void>
*/
public function close(): Promise
public function close(bool $graceful = true): void
{
if (null === $this->socket) {
return new Success();
if (!$this->isConnected()) {
return;
}
return call(function (): \Generator {
yield $this->command('CLS');
$logger = $this->logger;
[$stream, $this->stream] = [$this->stream, new NullStream()];
if (null !== $this->socket) {
$this->socket->close();
if ($graceful) {
$this->logger->debug('Graceful disconnect.', [
'class' => static::class,
'address' => $this->address,
]);
$this->socket = null;
asyncCall(static function () use ($stream, $logger): \Generator {
try {
yield $stream->write(Command::cls());
} catch (\Throwable $e) {
$logger->warning($e->getMessage(), ['exception' => $e]);
}
$stream->close();
});
return;
}
public function isClosed(): bool
try {
$stream->close();
} catch (ClosedException) {
}
($this->onCloseCallback)();
}
public function onConnect(callable $callback): static
{
return null === $this->socket;
}
/**
* @param array<int, int|string>|string $params
*
* @return Promise<void>
*/
protected function command(string $command, array | string $params = [], string $data = null): Promise
{
if (null === $this->socket) {
return new Failure(new NotConnected());
}
$command = implode(' ', [$command, ...((array) $params)]);
$buffer = $this->buffer->append($command.PHP_EOL);
if (null !== $data) {
$buffer->appendUint32(\strlen($data));
$buffer->append($data);
}
$this->logger->debug('Sending: {bytes}', ['bytes' => $buffer->bytes()]);
return $this->outputStream->write($buffer->flush());
}
/**
* @return Promise<Frame>
*/
protected function readFrame(): Promise
{
return call(function (): \Generator {
$bytes = yield $this->inputStream->read();
$this->logger->debug('Receiving: {bytes}', ['bytes' => $bytes]);
if (null === $bytes) {
throw new NotConnected();
}
$buffer = $this->buffer->append($bytes);
$frame = match ($type = $buffer->consumeUint32()) {
0 => new Response($buffer->flush()),
1 => new Error($buffer->flush()),
2 => new Message(
timestamp: $buffer->consumeInt64(),
attempts: $buffer->consumeUint16(),
id: $buffer->consume(Bytes::BYTES_ID),
body: $buffer->flush(),
consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'),
),
default => throw new NsqException('Unexpected frame type: '.$type)
$previous = $this->onConnectCallback;
$this->onConnectCallback = static function () use ($previous, $callback): void {
$previous();
$callback();
};
if ($frame instanceof Response && $frame->isHeartBeat()) {
yield $this->command('NOP');
return $this->readFrame();
return $this;
}
return $frame;
public function onClose(callable $callback): static
{
$previous = $this->onCloseCallback;
$this->onCloseCallback = static function () use ($previous, $callback): void {
$previous();
$callback();
};
return $this;
}
/**
* @psalm-return Promise<null|string>
*/
protected function read(): Promise
{
return call(function (): \Generator {
try {
return yield $this->stream->read();
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
$this->close(false);
return new Failure($e);
}
});
}
/**
* @return Promise<void>
* @psalm-return Promise<void>
*/
protected function checkIsOK(): Promise
protected function write(string $data): Promise
{
return call(function (): \Generator {
/** @var Response $response */
$response = yield $this->readResponse();
return call(function () use ($data): \Generator {
try {
return yield $this->stream->write($data);
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
if (!$response->isOk()) {
throw new BadResponse($response);
$this->close(false);
return new Failure($e);
}
$this->logger->debug('Ok checked.');
return call(static function (): void {});
});
}
/**
* @return Promise<Response>
*/
private function readResponse(): Promise
protected function handleError(Frame\Error $error): void
{
return call(function (): \Generator {
$frame = yield $this->readFrame();
$this->logger->error($error->data);
if ($frame instanceof Error) {
if ($frame->type->terminateConnection) {
if (ErrorType::terminable($error)) {
$this->close();
throw $error->toException();
}
}
throw new NsqError($frame);
/**
* @psalm-return Promise<Frame\Response>
*/
private function response(Stream $stream, Buffer $buffer): Promise
{
return call(function () use ($stream, $buffer): \Generator {
while (true) {
$response = Parser::parse($buffer);
if (null === $response && null !== ($chunk = yield $stream->read())) {
$buffer->append($chunk);
continue;
}
if (!$frame instanceof Response) {
throw new NsqException('Unreachable statement.');
if (!$response instanceof Frame\Response) {
throw new NsqException();
}
return $frame;
return $response;
}
});
}
}

View File

@@ -7,105 +7,192 @@ namespace Nsq;
use Amp\Failure;
use Amp\Promise;
use Amp\Success;
use Nsq\Exception\NsqError;
use Nsq\Exception\NsqException;
use Nsq\Protocol\Error;
use Nsq\Protocol\Message;
use Nsq\Protocol\Response;
use Nsq\Config\ClientConfig;
use Nsq\Exception\ConsumerException;
use Nsq\Frame\Response;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function Amp\asyncCall;
use function Amp\call;
/**
* @internal
*/
final class Consumer extends Connection
{
private int $rdy = 0;
/**
* @return Promise<void>
* @var callable
*/
public function listen(
private $onMessage;
public function __construct(
string $address,
/**
* @readonly
*/
public string $topic,
/**
* @readonly
*/
public string $channel,
callable $onMessage,
ClientConfig $clientConfig,
LoggerInterface $logger,
) {
parent::__construct(
$address,
$clientConfig,
$logger,
);
$this->onMessage = $onMessage;
$context = compact('address', 'topic', 'channel');
$this->onConnect(function () use ($context): void {
$this->logger->debug('Consumer connected.', $context);
});
$this->onClose(function () use ($context): void {
$this->logger->debug('Consumer disconnected.', $context);
});
$this->logger->debug('Consumer created.', $context);
}
public static function create(
string $address,
string $topic,
string $channel,
callable $onMessage,
): Promise {
return call(function () use ($topic, $channel, $onMessage): \Generator {
yield $this->command('SUB', [$topic, $channel]);
yield $this->checkIsOK();
?ClientConfig $clientConfig = null,
?LoggerInterface $logger = null,
): self {
return new self(
$address,
$topic,
$channel,
$onMessage,
$clientConfig ?? new ClientConfig(),
$logger ?? new NullLogger(),
);
}
asyncCall(function () use ($onMessage): \Generator {
yield $this->rdy(2500);
public function connect(): Promise
{
if ($this->isConnected()) {
return call(static function (): void {
});
}
while ($message = yield $this->readMessage()) {
$command = yield $onMessage($message);
return call(function (): \Generator {
yield parent::connect();
$buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator {
yield $this->write(Command::sub($this->topic, $this->channel));
if (null !== ($chunk = yield $this->read())) {
$buffer->append($chunk);
}
/** @var Response $response */
$response = Parser::parse($buffer);
if (!$response->isOk()) {
return new Failure(new ConsumerException('Fail subscription.'));
}
yield $this->rdy(1);
/** @phpstan-ignore-next-line */
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->write(Command::nop());
if (true === $command) {
break;
}
if ($this->rdy < 1000) {
yield $this->rdy(2500);
throw ConsumerException::response($frame);
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
case $frame instanceof Frame\Message:
asyncCall($this->onMessage, Message::compose($frame, $this));
break;
}
if ($this->rdy !== $this->clientConfig->rdyCount) {
yield $this->rdy($this->clientConfig->rdyCount);
}
}
}
return new Success();
$this->close(false);
});
});
}
/**
* @return Promise<Message>
*/
public function readMessage(): Promise
{
return call(function (): \Generator {
$frame = yield $this->readFrame();
if ($frame instanceof Message) {
return $frame;
}
if ($frame instanceof Error) {
if ($frame->type->terminateConnection) {
yield $this->close();
}
throw new NsqError($frame);
}
throw new NsqException('Unreachable statement.');
});
}
/**
* Update RDY state (indicate you are ready to receive N messages).
*
* @return Promise<void>
* @psalm-return Promise<bool>
*/
public function rdy(int $count): Promise
{
if (!$this->isConnected()) {
return new Success(false);
}
if ($this->rdy === $count) {
return call(static function (): void {});
return new Success(true);
}
$this->rdy = $count;
return $this->command('RDY', (string) $count);
return call(function () use ($count): \Generator {
try {
yield $this->write(Command::rdy($count));
return true;
} catch (\Throwable) {
return false;
}
});
}
/**
* Finish a message (indicate successful processing).
*
* @return Promise<void>
* @psalm-return Promise<bool>
*
* @internal
*/
public function fin(string $id): Promise
{
$promise = $this->command('FIN', $id);
$promise->onResolve(function (): void {
--$this->rdy;
});
if (!$this->isConnected()) {
return new Success(false);
}
return $promise;
return call(function () use ($id): \Generator {
try {
yield $this->write(Command::fin($id));
return true;
} catch (\Throwable) {
return false;
}
});
}
/**
@@ -114,29 +201,48 @@ final class Consumer extends Connection
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
* behaves identically to an explicit REQ.
*
* @return Promise<void>
* @psalm-return Promise<bool>
*
* @internal
*/
public function req(string $id, int $timeout): Promise
{
$promise = $this->command('REQ', [$id, $timeout]);
$promise->onResolve(function (): void {
--$this->rdy;
});
if (!$this->isConnected()) {
return new Success(false);
}
return $promise;
return call(function () use ($id, $timeout): \Generator {
try {
yield $this->write(Command::req($id, $timeout));
return true;
} catch (\Throwable) {
return false;
}
});
}
/**
* Reset the timeout for an in-flight message.
*
* @return Promise<void>
* @psalm-return Promise<bool>
*
* @internal
*/
public function touch(string $id): Promise
{
return $this->command('TOUCH', $id);
if (!$this->isConnected()) {
return new Success(false);
}
return call(function () use ($id): \Generator {
try {
yield $this->write(Command::touch($id));
return true;
} catch (\Throwable) {
return false;
}
});
}
}

View File

@@ -2,7 +2,7 @@
declare(strict_types=1);
namespace Nsq\Protocol;
namespace Nsq;
/**
* @psalm-immutable
@@ -88,13 +88,12 @@ final class ErrorType
*/
public const E_UNAUTHORIZED = true;
/**
* A boolean indicating whether or not an [Error] with this type terminates the connection or not.
*/
public bool $terminateConnection;
public function __construct(public string $type)
public static function terminable(Frame\Error $error): bool
{
$this->terminateConnection = \constant('self::'.$this->type) ?? self::E_INVALID;
$type = explode(' ', $error->data)[0];
$constant = 'self::'.$type;
return \defined($constant) ? \constant($constant) : self::E_INVALID;
}
}

View File

@@ -1,15 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Nsq\Protocol\Response;
final class BadResponse extends NsqException
{
public function __construct(Response $response)
{
parent::__construct($response->msg);
}
}

View File

@@ -1,16 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
final class ConnectionFail extends NsqException
{
/**
* @codeCoverageIgnore
*/
public static function fromThrowable(\Throwable $throwable): self
{
return new self($throwable->getMessage(), (int) $throwable->getCode(), $throwable);
}
}

View File

@@ -0,0 +1,15 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Nsq\Frame\Response;
final class ConsumerException extends NsqException
{
public static function response(Response $response): self
{
return new self(sprintf('Consumer receive response "%s" from nsq, which not expected. ', $response->data));
}
}

View File

@@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Psr\Log\LogLevel;
final class LookupException extends NsqException
{
public function level(): string
{
return match ($this->getMessage()) {
'TOPIC_NOT_FOUND' => LogLevel::DEBUG,
default => LogLevel::WARNING,
};
}
}

View File

@@ -1,25 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Nsq\Protocol\Message;
final class MessageAlreadyFinished extends NsqException
{
public static function finish(Message $message): self
{
return new self('Can\'t finish message as it already finished.');
}
public static function requeue(Message $message): self
{
return new self('Can\'t requeue message as it already finished.');
}
public static function touch(Message $message): self
{
return new self('Can\'t touch message as it already finished.');
}
}

View File

@@ -0,0 +1,15 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Nsq\Message;
final class MessageException extends NsqException
{
public static function processed(Message $message): self
{
return new self(sprintf('Message "%s" already processed.', $message->id));
}
}

View File

@@ -1,15 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
use Nsq\Protocol\Error;
final class NsqError extends NsqException
{
public function __construct(Error $error)
{
parent::__construct($error->rawData);
}
}

View File

@@ -4,6 +4,6 @@ declare(strict_types=1);
namespace Nsq\Exception;
final class NotConnected extends NsqException
final class ServerException extends NsqException
{
}

View File

@@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
final class SnappyException extends NsqException
{
public static function notInstalled(): self
{
return new self('Snappy extension not installed.');
}
public static function invalidHeader(): self
{
return new self('Invalid snappy protocol header.');
}
}

View File

@@ -0,0 +1,9 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
final class StreamException extends NsqException
{
}

31
src/Frame.php Normal file
View File

@@ -0,0 +1,31 @@
<?php
declare(strict_types=1);
namespace Nsq;
abstract class Frame
{
public const TYPE_RESPONSE = 0;
public const TYPE_ERROR = 1;
public const TYPE_MESSAGE = 2;
public function __construct(public int $type)
{
}
public function response(): bool
{
return self::TYPE_RESPONSE === $this->type;
}
public function error(): bool
{
return self::TYPE_ERROR === $this->type;
}
public function message(): bool
{
return self::TYPE_MESSAGE === $this->type;
}
}

24
src/Frame/Error.php Normal file
View File

@@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
namespace Nsq\Frame;
use Nsq\Exception\ServerException;
use Nsq\Frame;
/**
* @psalm-immutable
*/
final class Error extends Frame
{
public function __construct(public string $data)
{
parent::__construct(self::TYPE_ERROR);
}
public function toException(): ServerException
{
return new ServerException($this->data);
}
}

19
src/Frame/Message.php Normal file
View File

@@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
namespace Nsq\Frame;
use Nsq\Frame;
final class Message extends Frame
{
public function __construct(
public int $timestamp,
public int $attempts,
public string $id,
public string $body,
) {
parent::__construct(self::TYPE_MESSAGE);
}
}

39
src/Frame/Response.php Normal file
View File

@@ -0,0 +1,39 @@
<?php
declare(strict_types=1);
namespace Nsq\Frame;
use Nsq\Frame;
/**
* @psalm-immutable
*/
final class Response extends Frame
{
public const OK = 'OK';
public const HEARTBEAT = '_heartbeat_';
public function __construct(public string $data)
{
parent::__construct(self::TYPE_RESPONSE);
}
public function isOk(): bool
{
return self::OK === $this->data;
}
public function isHeartBeat(): bool
{
return self::HEARTBEAT === $this->data;
}
/**
* @psalm-return array<mixed, mixed>
*/
public function toArray(): array
{
return json_decode($this->data, true, flags: JSON_THROW_ON_ERROR);
}
}

274
src/Lookup.php Normal file
View File

@@ -0,0 +1,274 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Amp\Deferred;
use Amp\Dns\DnsException;
use Amp\Http\Client\DelegateHttpClient;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\NullCancellationToken;
use Amp\Promise;
use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig;
use Nsq\Exception\LookupException;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function Amp\asyncCall;
use function Amp\call;
use function Amp\delay;
final class Lookup
{
/**
* @psalm-var array<string, array<string, Lookup\Producer>>
*/
private array $producers = [];
private array $consumers = [];
private array $running = [];
private array $topicWatchers = [];
public function __construct(
private array $addresses,
private LookupConfig $config,
private LoggerInterface $logger,
private DelegateHttpClient $httpClient,
) {
}
public static function create(
string | array $address,
LookupConfig $config = null,
LoggerInterface $logger = null,
DelegateHttpClient $httpClient = null,
): self {
return new self(
(array) $address,
$config ?? new LookupConfig(),
$logger ?? new NullLogger(),
$httpClient ?? HttpClientBuilder::buildDefault(),
);
}
/**
* @psalm-return Promise<Lookup\Producer[]>
*/
public function nodes(): Promise
{
return call(function (): \Generator {
$requestHandler = function (string $uri): \Generator {
/** @var Response $response */
$response = yield $this->httpClient->request(new Request($uri.'/nodes'), new NullCancellationToken());
try {
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
} catch (LookupException $e) {
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
return null;
}
};
$promises = [];
foreach ($this->addresses as $address) {
$promises[$address] = call($requestHandler, $address);
}
$nodes = [];
/** @var Lookup\Response $response */
foreach (yield $promises as $response) {
foreach ($response->producers as $producer) {
$nodes[$producer->toTcpUri()] = $producer;
}
}
return array_values($nodes);
});
}
public function stop(): void
{
$this->producers = [];
$this->consumers = [];
$this->running = [];
$this->topicWatchers = [];
$this->logger->info('Lookup stopped.');
}
/**
* @psalm-suppress InvalidPropertyAssignmentValue
*/
public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void
{
if (null !== ($this->running[$topic][$channel] ?? null)) {
throw new \InvalidArgumentException('Subscription already exists.');
}
$this->running[$topic][$channel] = true;
asyncCall(function () use ($topic, $channel, $onMessage, $config): \Generator {
while (true) {
if (null === ($this->running[$topic][$channel] ?? null)) {
return;
}
/** @phpstan-ignore-next-line */
$producers = $this->producers[$topic] ??= new Deferred();
if ($producers instanceof Deferred) {
/** @var array<string, Lookup\Producer> $producers */
$producers = yield $producers->promise();
}
foreach (array_diff_key($this->consumers, $producers) as $address => $producer) {
unset($this->consumers[$address]);
}
foreach ($producers as $address => $producer) {
if (null !== ($this->consumers[$address][$topic][$channel] ?? null)) {
continue;
}
$this->keepConnection(
Consumer::create(
$address,
$topic,
$channel,
$onMessage,
$config,
$this->logger,
),
);
}
yield delay($this->config->pollingInterval);
}
});
$this->watch($topic);
$this->logger->info('Subscribed.', compact('topic', 'channel'));
}
public function unsubscribe(string $topic, string $channel): void
{
if (null === ($this->running[$topic][$channel] ?? null)) {
$this->logger->debug('Not subscribed.', compact('topic', 'channel'));
return;
}
unset($this->running[$topic][$channel]);
if ([] === $this->running[$topic]) {
unset($this->running[$topic]);
}
$this->logger->info('Unsubscribed.', compact('topic', 'channel'));
}
private function keepConnection(Consumer $consumer): void
{
$this->consumers[$consumer->address][$consumer->topic][$consumer->channel] = $consumer;
asyncCall(function () use ($consumer): \Generator {
while (null !== ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
try {
yield $consumer->connect();
} catch (DnsException $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
unset(
$this->consumers[$consumer->address],
$this->producers[$consumer->topic][$consumer->address],
);
return;
} catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
yield delay($this->config->pollingInterval);
continue;
}
while (true) {
/** @phpstan-ignore-next-line */
if (null === ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
$consumer->close();
return;
}
if (!$consumer->isConnected()) {
break;
}
yield delay(500);
}
}
});
}
private function watch(string $topic): void
{
if (\array_key_exists($topic, $this->topicWatchers)) {
return;
}
$this->topicWatchers[$topic] = true;
asyncCall(function () use ($topic): \Generator {
$cancellationToken = new NullCancellationToken();
$requestHandler = function (string $uri) use ($topic, $cancellationToken): \Generator {
$this->logger->debug('Lookup', compact('topic'));
/** @var Response $response */
$response = yield $this->httpClient->request(new Request($uri.'/lookup?topic='.$topic), $cancellationToken);
try {
return Lookup\Response::fromJson(yield $response->getBody()->buffer());
} catch (LookupException $e) {
$this->logger->log($e->level(), $uri.' '.$e->getMessage());
return null;
}
};
while (\array_key_exists($topic, $this->running)) {
$promises = [];
foreach ($this->addresses as $address) {
$promises[$address] = call($requestHandler, $address);
}
/** @var Lookup\Response[] $responses */
$responses = yield $promises;
$producers = [];
foreach ($responses as $response) {
foreach ($response->producers as $producer) {
$producers[$producer->toTcpUri()] = $producer;
}
}
/** @phpstan-ignore-next-line */
if (($deferred = ($this->producers[$topic] ?? null)) instanceof Deferred) {
$deferred->resolve($producers);
}
$this->producers[$topic] = $producers;
yield delay($this->config->pollingInterval);
}
unset($this->topicWatchers[$topic]);
});
}
}

39
src/Lookup/Producer.php Normal file
View File

@@ -0,0 +1,39 @@
<?php
declare(strict_types=1);
namespace Nsq\Lookup;
final class Producer
{
public function __construct(
public string $broadcastAddress,
public string $hostname,
public string $remoteAddress,
public int $tcpPort,
public int $httpPort,
public string $version,
public array $tombstones,
public array $topics,
) {
}
public static function fromArray(array $array): self
{
return new self(
$array['broadcast_address'],
$array['hostname'],
$array['remote_address'],
$array['tcp_port'],
$array['http_port'],
$array['version'],
$array['tombstones'] ?? [],
$array['topics'] ?? [],
);
}
public function toTcpUri(): string
{
return sprintf('%s:%s', $this->broadcastAddress, $this->tcpPort);
}
}

34
src/Lookup/Response.php Normal file
View File

@@ -0,0 +1,34 @@
<?php
declare(strict_types=1);
namespace Nsq\Lookup;
use Nsq\Exception\LookupException;
final class Response
{
/**
* @param string[] $channels
* @param Producer[] $producers
*/
public function __construct(
public array $channels,
public array $producers,
) {
}
public static function fromJson(string $json): self
{
$array = json_decode($json, true, flags: JSON_THROW_ON_ERROR);
if (\array_key_exists('message', $array)) {
throw new LookupException($array['message']);
}
return new self(
$array['channels'] ?? [],
array_map([Producer::class, 'fromArray'], $array['producers']),
);
}
}

81
src/Message.php Normal file
View File

@@ -0,0 +1,81 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Amp\Promise;
use Nsq\Exception\MessageException;
final class Message
{
private bool $processed = false;
public function __construct(
public string $id,
public string $body,
public int $timestamp,
public int $attempts,
private Consumer $consumer,
) {
}
public static function compose(Frame\Message $message, Consumer $consumer): self
{
return new self(
$message->id,
$message->body,
$message->timestamp,
$message->attempts,
$consumer,
);
}
public function isProcessed(): bool
{
return $this->processed;
}
/**
* @psalm-return Promise<bool>
*/
public function finish(): Promise
{
$this->markAsProcessedOrFail();
return $this->consumer->fin($this->id);
}
/**
* @psalm-param positive-int|0 $timeout
*
* @psalm-return Promise<bool>
*/
public function requeue(int $timeout): Promise
{
$this->markAsProcessedOrFail();
return $this->consumer->req($this->id, $timeout);
}
/**
* @psalm-return Promise<bool>
*/
public function touch(): Promise
{
if ($this->processed) {
throw MessageException::processed($this);
}
return $this->consumer->touch($this->id);
}
private function markAsProcessedOrFail(): void
{
if ($this->processed) {
throw MessageException::processed($this);
}
$this->processed = true;
}
}

47
src/Parser.php Normal file
View File

@@ -0,0 +1,47 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Nsq\Exception\NsqException;
class Parser
{
private const SIZE = 4;
private const TYPE = 4;
private const MESSAGE_HEADER_SIZE =
8 + // timestamp
2 + // attempts
16 + // ID
4; // Frame type
public static function parse(Buffer $buffer): ?Frame
{
if ($buffer->size() < self::SIZE) {
return null;
}
$size = $buffer->readInt32();
if ($buffer->size() < $size + self::SIZE) {
return null;
}
$buffer->discard(self::SIZE);
$type = $buffer->consumeInt32();
return match ($type) {
Frame::TYPE_RESPONSE => new Frame\Response($buffer->consume($size - self::TYPE)),
Frame::TYPE_ERROR => new Frame\Error($buffer->consume($size - self::TYPE)),
Frame::TYPE_MESSAGE => new Frame\Message(
timestamp: $buffer->consumeTimestamp(),
attempts: $buffer->consumeAttempts(),
id: $buffer->consumeMessageID(),
body: $buffer->consume($size - self::MESSAGE_HEADER_SIZE),
),
default => throw new NsqException(sprintf('Unexpected frame type: "%s"', $type)),
};
}
}

View File

@@ -5,55 +5,126 @@ declare(strict_types=1);
namespace Nsq;
use Amp\Promise;
use PHPinnacle\Buffer\ByteBuffer;
use Amp\Success;
use Nsq\Config\ClientConfig;
use Nsq\Exception\NsqException;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function Amp\asyncCall;
use function Amp\call;
/**
* @psalm-suppress PropertyNotSetInConstructor
*/
final class Producer extends Connection
{
/**
* @return Promise<void>
*/
public function pub(string $topic, string $body): Promise
public function __construct(
string $address,
ClientConfig $clientConfig,
LoggerInterface $logger,
) {
parent::__construct(
$address,
$clientConfig,
$logger,
);
$context = compact('address');
$this->onConnect(function () use ($context): void {
$this->logger->debug('Producer connected.', $context);
});
$this->onClose(function () use ($context): void {
$this->logger->debug('Producer disconnected.', $context);
});
$this->logger->debug('Producer created.', $context);
}
public static function create(
string $address,
ClientConfig $clientConfig = null,
LoggerInterface $logger = null,
): self {
return new self(
$address,
$clientConfig ?? new ClientConfig(),
$logger ?? new NullLogger(),
);
}
public function connect(): Promise
{
return call(function () use ($topic, $body): \Generator {
yield $this->command('PUB', $topic, $body);
yield $this->checkIsOK();
if ($this->isConnected()) {
return call(static function (): void {
});
}
return call(function (): \Generator {
yield parent::connect();
$buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->write(Command::nop());
}
// Ok received
break;
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
default:
throw new NsqException('Unreachable statement.');
}
}
}
$this->close(false);
});
});
}
/**
* @psalm-param array<int, mixed> $bodies
* @param array<int, string>|string $body
*
* @return Promise<void>
* @psalm-param positive-int|0 $delay
*
* @psalm-return Promise<bool>
*/
public function mpub(string $topic, array $bodies): Promise
public function publish(string $topic, string | array $body, int $delay = null): Promise
{
return call(function () use ($topic, $bodies): \Generator {
$buffer = new ByteBuffer();
$buffer->appendUint32(\count($bodies));
foreach ($bodies as $body) {
$buffer->appendUint32(\strlen($body));
$buffer->append($body);
if (!$this->isConnected()) {
return new Success(false);
}
yield $this->command('MPUB', $topic, $buffer->flush());
yield $this->checkIsOK();
});
return call(
function (iterable $commands): \Generator {
try {
foreach ($commands as $command) {
yield $this->write($command);
}
/**
* @return Promise<void>
*/
public function dpub(string $topic, string $body, int $delay): Promise
{
return call(function () use ($topic, $body, $delay): \Generator {
yield $this->command('DPUB', [$topic, $delay], $body);
yield $this->checkIsOK();
});
return true;
} catch (\Throwable) {
return false;
}
},
(static function () use ($topic, $body, $delay): \Generator {
if (\is_array($body) && null === $delay) {
yield Command::mpub($topic, $body);
} elseif (null !== $delay) {
foreach ((array) $body as $content) {
yield Command::dpub($topic, $content, $delay);
}
} else {
yield Command::pub($topic, $body);
}
})(),
);
}
}

View File

@@ -1,22 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
use Nsq\Bytes;
/**
* @psalm-immutable
*/
final class Error extends Frame
{
public ErrorType $type;
public function __construct(public string $rawData)
{
parent::__construct(\strlen($this->rawData) + Bytes::BYTES_TYPE);
$this->type = new ErrorType(explode(' ', $this->rawData)[0]);
}
}

View File

@@ -1,16 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
abstract class Frame
{
public function __construct(
/**
* @psalm-readonly
*/
public int $length,
) {
}
}

View File

@@ -1,101 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
use Amp\Failure;
use Amp\Promise;
use Nsq\Bytes;
use Nsq\Consumer;
use Nsq\Exception\MessageAlreadyFinished;
final class Message extends Frame
{
/**
* @psalm-readonly
*/
public int $timestamp;
/**
* @psalm-readonly
*/
public int $attempts;
/**
* @psalm-readonly
*/
public string $id;
/**
* @psalm-readonly
*/
public string $body;
private bool $finished = false;
private Consumer $consumer;
public function __construct(int $timestamp, int $attempts, string $id, string $body, Consumer $consumer)
{
parent::__construct(
Bytes::BYTES_TYPE
+ Bytes::BYTES_TIMESTAMP
+ Bytes::BYTES_ATTEMPTS
+ Bytes::BYTES_ID
+ \strlen($body)
);
$this->timestamp = $timestamp;
$this->attempts = $attempts;
$this->id = $id;
$this->body = $body;
$this->consumer = $consumer;
}
public function isFinished(): bool
{
return $this->finished;
}
/**
* @return Promise<void>
*/
public function finish(): Promise
{
if ($this->finished) {
return new Failure(MessageAlreadyFinished::finish($this));
}
$this->finished = true;
return $this->consumer->fin($this->id);
}
/**
* @return Promise<void>
*/
public function requeue(int $timeout): Promise
{
if ($this->finished) {
return new Failure(MessageAlreadyFinished::requeue($this));
}
$this->finished = true;
return $this->consumer->req($this->id, $timeout);
}
/**
* @return Promise<void>
*/
public function touch(): Promise
{
if ($this->finished) {
return new Failure(MessageAlreadyFinished::touch($this));
}
return $this->consumer->touch($this->id);
}
}

View File

@@ -1,39 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Protocol;
use Nsq\Bytes;
/**
* @psalm-immutable
*/
final class Response extends Frame
{
public const OK = 'OK';
public const HEARTBEAT = '_heartbeat_';
public function __construct(public string $msg)
{
parent::__construct(\strlen($this->msg) + Bytes::BYTES_TYPE);
}
public function isOk(): bool
{
return self::OK === $this->msg;
}
public function isHeartBeat(): bool
{
return self::HEARTBEAT === $this->msg;
}
/**
* @return array<mixed, mixed>
*/
public function toArray(): array
{
return json_decode($this->msg, true, flags: JSON_THROW_ON_ERROR);
}
}

26
src/Stream.php Normal file
View File

@@ -0,0 +1,26 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Amp\ByteStream\ClosedException;
use Amp\Promise;
interface Stream
{
/**
* @psalm-return Promise<null|string>
*/
public function read(): Promise;
/**
* @psalm-return Promise<void>
*/
public function write(string $data): Promise;
/**
* @throws ClosedException
*/
public function close(): void;
}

104
src/Stream/GzipStream.php Normal file
View File

@@ -0,0 +1,104 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\Promise;
use Nsq\Buffer;
use Nsq\Exception\StreamException;
use Nsq\Stream;
use function Amp\call;
class GzipStream implements Stream
{
private ?\InflateContext $inflate = null;
private ?\DeflateContext $deflate = null;
private Buffer $buffer;
public function __construct(private Stream $stream, private int $level, string $bytes = '')
{
/** @var false|\InflateContext $inflate */
$inflate = @inflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
/** @var \DeflateContext|false $deflate */
$deflate = @deflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
if (false === $inflate) {
throw new StreamException('Failed initializing inflate context');
}
if (false === $deflate) {
throw new StreamException('Failed initializing deflate context');
}
$this->inflate = $inflate;
$this->deflate = $deflate;
$this->buffer = new Buffer($bytes);
}
/**
* {@inheritdoc}
*/
public function read(): Promise
{
return call(function (): \Generator {
if (null === $this->inflate) {
return null;
}
if ($this->buffer->empty()) {
$chunk = yield $this->stream->read();
if (null !== $chunk) {
$this->buffer->append($chunk);
}
}
$data = $this->buffer->flush();
if ('' === $data) {
return null;
}
/** @psalm-suppress UndefinedFunction,InvalidArgument */
$decompressed = inflate_add($this->inflate, $data, ZLIB_SYNC_FLUSH);
if (false === $decompressed) {
throw new StreamException('Failed adding data to deflate context');
}
return $decompressed;
});
}
/**
* {@inheritdoc}
*/
public function write(string $data): Promise
{
if (null === $this->deflate) {
throw new StreamException('The stream has already been closed');
}
/** @psalm-suppress UndefinedFunction,InvalidArgument */
$compressed = deflate_add($this->deflate, $data, ZLIB_SYNC_FLUSH);
if (false === $compressed) {
throw new StreamException('Failed adding data to deflate context');
}
return $this->stream->write($compressed);
}
/**
* {@inheritDoc}
*/
public function close(): void
{
$this->stream->close();
$this->inflate = null;
$this->deflate = null;
}
}

View File

@@ -1,57 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\ByteStream\InputStream;
use Amp\Promise;
use Nsq\Bytes;
use Nsq\Exception\NotConnected;
use PHPinnacle\Buffer\ByteBuffer;
use function Amp\call;
final class NsqInputStream implements InputStream
{
private ByteBuffer $buffer;
public function __construct(
private InputStream $inputStream,
) {
$this->buffer = new ByteBuffer();
}
/**
* {@inheritDoc}
*/
public function read(): Promise
{
return call(function (): \Generator {
$buffer = $this->buffer;
while ($buffer->size() < Bytes::BYTES_SIZE) {
$bytes = yield $this->inputStream->read();
if (null === $bytes) {
throw new NotConnected();
}
$buffer->append($bytes);
}
$size = $buffer->consumeUint32();
while ($buffer->size() < $size) {
$bytes = yield $this->inputStream->read();
if (null === $bytes) {
throw new NotConnected();
}
$buffer->append($bytes);
}
return $buffer->consume($size);
});
}
}

View File

@@ -4,39 +4,34 @@ declare(strict_types=1);
namespace Nsq\Stream;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\Failure;
use Amp\Promise;
use Nsq\Exception\NotConnected;
use Amp\Success;
use Nsq\Exception\NsqException;
use Nsq\Stream;
final class NullStream implements InputStream, OutputStream
final class NullStream implements Stream
{
/**
* {@inheritDoc}
* {@inheritdoc}
*/
public function read(): Promise
{
return new Failure(new NotConnected());
return new Success(null);
}
/**
* {@inheritDoc}
*
* @return Promise<void>
* {@inheritdoc}
*/
public function write(string $data): Promise
{
return new Failure(new NotConnected());
return new Failure(new NsqException('Connection closed.'));
}
/**
* {@inheritDoc}
*
* @return Promise<void>
* {@inheritdoc}
*/
public function end(string $finalData = ''): Promise
public function close(): void
{
return new Failure(new NotConnected());
}
}

View File

@@ -1,106 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\ByteStream\InputStream;
use Amp\Promise;
use Nsq\Exception\NotConnected;
use PHPinnacle\Buffer\ByteBuffer;
use Psr\Log\LoggerInterface;
use function Amp\call;
final class SnappyInputStream implements InputStream
{
private ByteBuffer $buffer;
public function __construct(
private InputStream $inputStream,
private LoggerInterface $logger,
) {
if (!\function_exists('snappy_uncompress')) {
throw new \LogicException('Snappy extension not installed.');
}
$this->buffer = new ByteBuffer();
}
/**
* {@inheritDoc}
*/
public function read(): Promise
{
return call(function (): \Generator {
$buffer = $this->buffer;
while ($buffer->size() < 4) {
$bytes = yield $this->inputStream->read();
if (null === $bytes) {
throw new NotConnected();
}
$buffer->append($bytes);
}
/** @phpstan-ignore-next-line */
$chunkType = unpack('V', $buffer->consume(4))[1];
$size = $chunkType >> 8;
$chunkType &= 0xff;
$this->logger->debug('Snappy receive chunk [{chunk}], size [{size}]', [
'chunk' => $chunkType,
'size' => $size,
]);
while ($buffer->size() < $size) {
$bytes = yield $this->inputStream->read();
if (null === $bytes) {
throw new NotConnected();
}
$buffer->append($bytes);
}
switch ($chunkType) {
case 0xff:
$this->logger->debug('Snappy identifier chunk');
$buffer->discard(6); // discard identifier body
break;
case 0x00: // 'compressed',
$this->logger->debug('Snappy compressed chunk');
$data = $buffer
->discard(4) // discard checksum
->consume($size)
;
$this->logger->debug('Snappy compressed data [{data}]', ['data' => $data]);
return snappy_uncompress($data);
case 0x01: // 'uncompressed',
$this->logger->debug('Snappy uncompressed chunk');
$data = $buffer
->discard(4) // discard checksum
->consume($size)
;
$this->logger->debug('Snappy uncompressed data [{data}]', ['data' => $data]);
return $data;
case 0xfe:// 'padding',
$this->logger->debug('Snappy padding chunk');
$buffer->discard($size); // TODO ?
}
return $this->read();
});
}
}

View File

@@ -1,74 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\ByteStream\OutputStream;
use Amp\Promise;
use PHPinnacle\Buffer\ByteBuffer;
final class SnappyOutputStream implements OutputStream
{
private ByteBuffer $buffer;
public function __construct(
private OutputStream $outputStream,
) {
if (!\function_exists('snappy_compress')) {
throw new \LogicException('Snappy extension not installed.');
}
$this->buffer = new ByteBuffer();
}
/**
* {@inheritDoc}
*
* @return Promise<void>
*/
public function write(string $data): Promise
{
$identifierFrame = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
$compressedFrame = 0x00;
$uncompressedFrame = 0x01; // 11
$maxChunkLength = 65536;
$buffer = $this->buffer;
foreach ($identifierFrame as $bite) {
$buffer->appendUint8($bite);
}
foreach (str_split($data, $maxChunkLength) as $chunk) {
$compressedChunk = snappy_compress($chunk);
[$chunk, $chunkType] = \strlen($compressedChunk) <= 0.875 * \strlen($data)
? [$compressedChunk, $compressedFrame]
: [$data, $uncompressedFrame];
/** @var string $checksum */
$checksum = hash('crc32c', $data, true);
/** @phpstan-ignore-next-line */
$checksum = unpack('N', $checksum)[1];
$maskedChecksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff;
$size = (\strlen($chunk) + 4) << 8;
$buffer->append(pack('V', $chunkType + $size));
$buffer->append(pack('V', $maskedChecksum));
$buffer->append($chunk);
}
return $this->outputStream->write($buffer->flush());
}
/**
* {@inheritDoc}
*
* @return Promise<void>
*/
public function end(string $finalData = ''): Promise
{
return $this->outputStream->end($finalData);
}
}

122
src/Stream/SnappyStream.php Normal file
View File

@@ -0,0 +1,122 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\Promise;
use Nsq\Buffer;
use Nsq\Exception\SnappyException;
use Nsq\Stream;
use function Amp\call;
class SnappyStream implements Stream
{
private const IDENTIFIER = [0xFF, 0x06, 0x00, 0x00, 0x73, 0x4E, 0x61, 0x50, 0x70, 0x59];
private const SIZE_HEADER = 4;
private const SIZE_CHECKSUM = 4;
private const SIZE_CHUNK = 65536;
private const TYPE_IDENTIFIER = 0xFF;
private const TYPE_COMPRESSED = 0x00;
private const TYPE_UNCOMPRESSED = 0x01;
private const TYPE_PADDING = 0xFE;
private Buffer $buffer;
public function __construct(private Stream $stream, string $bytes = '')
{
if (!\function_exists('snappy_uncompress') || !\function_exists('snappy_compress')) {
throw SnappyException::notInstalled();
}
$this->buffer = new Buffer($bytes);
}
/**
* {@inheritdoc}
*/
public function read(): Promise
{
return call(function (): \Generator {
if ($this->buffer->size() < self::SIZE_HEADER && null !== ($chunk = yield $this->stream->read())) {
$this->buffer->append($chunk);
}
$type = $this->buffer->readUInt32LE();
$size = $type >> 8;
$type &= 0xFF;
while ($this->buffer->size() < $size && null !== ($chunk = yield $this->stream->read())) {
$this->buffer->append($chunk);
}
switch ($type) {
case self::TYPE_IDENTIFIER:
$this->buffer->discard($size);
return $this->read();
case self::TYPE_COMPRESSED:
$this->buffer->discard(self::SIZE_CHECKSUM);
/** @psalm-suppress UndefinedFunction */
return snappy_uncompress($this->buffer->consume($size - self::SIZE_HEADER));
case self::TYPE_UNCOMPRESSED:
$this->buffer->discard(self::SIZE_CHECKSUM);
return $this->buffer->consume($size - self::SIZE_HEADER);
case self::TYPE_PADDING:
return $this->read();
default:
throw SnappyException::invalidHeader();
}
});
}
/**
* {@inheritdoc}
*/
public function write(string $data): Promise
{
return call(function () use ($data): Promise {
/** @var string $result */
$result = pack('CCCCCCCCCC', ...self::IDENTIFIER);
foreach (str_split($data, self::SIZE_CHUNK) as $chunk) {
$result .= $this->compress($chunk);
}
return $this->stream->write($result);
});
}
public function close(): void
{
$this->stream->close();
}
private function compress(string $uncompressed): string
{
/** @psalm-suppress UndefinedFunction */
$compressed = snappy_compress($uncompressed);
\assert(\is_string($compressed));
[$type, $data] = \strlen($compressed) <= 0.875 * \strlen($uncompressed)
? [self::TYPE_COMPRESSED, $compressed]
: [self::TYPE_UNCOMPRESSED, $uncompressed];
/** @psalm-suppress PossiblyFalseArgument */
$unpacked = unpack('N', hash('crc32c', $uncompressed, true));
\assert(\is_array($unpacked));
$checksum = $unpacked[1];
$checksum = (($checksum >> 15) | ($checksum << 17)) + 0xA282EAD8 & 0xFFFFFFFF;
$size = (\strlen($data) + 4) << 8;
/** @psalm-suppress PossiblyFalseOperand */
return pack('VV', $type + $size, $checksum).$data;
}
}

View File

@@ -0,0 +1,82 @@
<?php
declare(strict_types=1);
namespace Nsq\Stream;
use Amp\Promise;
use Amp\Socket\ClientTlsContext;
use Amp\Socket\ConnectContext;
use Amp\Socket\EncryptableSocket;
use Nsq\Stream;
use function Amp\call;
use function Amp\Socket\connect;
class SocketStream implements Stream
{
public function __construct(private EncryptableSocket $socket)
{
}
/**
* @psalm-return Promise<self>
*/
public static function connect(string $uri, int $timeout = 0, int $attempts = 0, bool $noDelay = false): Promise
{
return call(function () use ($uri, $timeout, $attempts, $noDelay): \Generator {
$context = new ConnectContext();
if ($timeout > 0) {
$context = $context->withConnectTimeout($timeout);
}
if ($attempts > 0) {
$context = $context->withMaxAttempts($attempts);
}
if ($noDelay) {
$context = $context->withTcpNoDelay();
}
$context = $context->withTlsContext(
(new ClientTlsContext(''))
->withoutPeerVerification(),
);
return new self(yield connect($uri, $context));
});
}
/**
* @psalm-return Promise<null|string>
*/
public function read(): Promise
{
return $this->socket->read();
}
/**
* @psalm-return Promise<void>
*/
public function write(string $data): Promise
{
return $this->socket->write($data);
}
/**
* {@inheritDoc}
*/
public function close(): void
{
$this->socket->close();
}
/**
* @psalm-return Promise<void>
*/
public function setupTls(): Promise
{
return $this->socket->setupTls();
}
}

View File

@@ -7,6 +7,11 @@ use PHPUnit\Framework\TestCase;
final class ClientConfigTest extends TestCase
{
public function testNegotiationPayload(): void
{
self::assertJson((new ClientConfig())->asNegotiationPayload());
}
public function testInvalidCompression(): void
{
$this->expectException(InvalidArgumentException::class);
@@ -14,4 +19,42 @@ final class ClientConfigTest extends TestCase
new ClientConfig(deflate: true, snappy: true);
}
/**
* @dataProvider array
*/
public function testFromArray(array $data, array $expected): void
{
self::assertSame($expected, get_object_vars(ClientConfig::fromArray($data)));
}
public function array(): Generator
{
$default = get_object_vars(new ClientConfig());
yield 'Empty array' => [[], $default];
yield 'With wrong keys' => [['bla' => 'bla'], $default];
$custom = [
'authSecret' => 'SomeSecret',
'connectTimeout' => 100,
'maxAttempts' => 10,
'tcpNoDelay' => true,
'rdyCount' => 1,
'featureNegotiation' => true,
'clientId' => 'SomeGorgeousClientId',
'deflate' => true,
'deflateLevel' => 1,
'heartbeatInterval' => 31111,
'hostname' => gethostname(),
'msgTimeout' => 59999,
'sampleRate' => 25,
'tls' => true,
'snappy' => false,
'userAgent' => 'nsqphp/test',
];
yield 'Full filled' => [$custom, $custom];
}
}

28
tests/ErrorTypeTest.php Normal file
View File

@@ -0,0 +1,28 @@
<?php
declare(strict_types=1);
use Nsq\ErrorType;
use Nsq\Frame\Error;
use PHPUnit\Framework\TestCase;
final class ErrorTypeTest extends TestCase
{
/**
* @dataProvider data
*/
public function testConstructor(Error $frame, bool $isConnectionTerminated): void
{
self::assertSame($isConnectionTerminated, ErrorType::terminable($frame));
}
/**
* @return \Generator<int, array{0: Error, 1: bool}>
*/
public function data(): Generator
{
yield [new Error('E_BAD_BODY'), true];
yield [new Error('bla_bla'), true];
yield [new Error('E_REQ_FAILED'), false];
}
}

View File

@@ -2,12 +2,12 @@
declare(strict_types=1);
use Amp\Loop;
use Amp\Success;
use Nsq\Consumer;
use Nsq\Exception\MessageAlreadyFinished;
use Nsq\Protocol\Message;
use Nsq\Exception\MessageException;
use Nsq\Message;
use PHPUnit\Framework\TestCase;
use function Amp\Promise\wait;
final class MessageTest extends TestCase
{
@@ -16,16 +16,12 @@ final class MessageTest extends TestCase
*/
public function testDoubleFinish(Message $message): void
{
self::assertFalse($message->isFinished());
$this->expectException(MessageException::class);
wait($message->finish());
self::assertTrue($message->isFinished());
$this->expectException(MessageAlreadyFinished::class);
$this->expectExceptionMessage('Can\'t finish message as it already finished.');
wait($message->finish());
Loop::run(function () use ($message): Generator {
yield $message->finish();
yield $message->finish();
});
}
/**
@@ -33,16 +29,12 @@ final class MessageTest extends TestCase
*/
public function testDoubleRequeue(Message $message): void
{
self::assertFalse($message->isFinished());
$this->expectException(MessageException::class);
wait($message->requeue(1));
self::assertTrue($message->isFinished());
$this->expectException(MessageAlreadyFinished::class);
$this->expectExceptionMessage('Can\'t requeue message as it already finished.');
wait($message->requeue(5));
Loop::run(function () use ($message): Generator {
yield $message->requeue(1);
yield $message->requeue(5);
});
}
/**
@@ -50,14 +42,12 @@ final class MessageTest extends TestCase
*/
public function testTouchAfterFinish(Message $message): void
{
self::assertFalse($message->isFinished());
$this->expectException(MessageException::class);
wait($message->finish());
$this->expectException(MessageAlreadyFinished::class);
$this->expectExceptionMessage('Can\'t touch message as it already finished.');
wait($message->touch());
Loop::run(function () use ($message): Generator {
yield $message->finish();
yield $message->touch();
});
}
/**
@@ -65,11 +55,12 @@ final class MessageTest extends TestCase
*/
public function messages(): Generator
{
/** @phpstan-ignore-next-line */
$consumer = $this->createMock(Consumer::class);
$consumer->method('fin')->willReturn(new Success());
$consumer->method('touch')->willReturn(new Success());
$consumer->method('req')->willReturn(new Success());
yield [new Message(0, 0, 'id', 'body', $consumer)];
yield [new Message('id', 'body', 0, 0, $consumer)];
}
}

View File

@@ -24,7 +24,6 @@ final class NsqTest extends TestCase
new ClientConfig(
heartbeatInterval: 3000,
snappy: false,
readTimeout: 1,
),
];
@@ -32,7 +31,6 @@ final class NsqTest extends TestCase
new ClientConfig(
heartbeatInterval: 3000,
snappy: true,
readTimeout: 1,
),
];
}

View File

@@ -2,10 +2,10 @@
declare(strict_types=1);
use Nsq\Exception\NsqError;
use Amp\Loop;
use Nsq\Exception\ServerException;
use Nsq\Producer;
use PHPUnit\Framework\TestCase;
use function Amp\Promise\wait;
final class ProducerTest extends TestCase
{
@@ -14,18 +14,18 @@ final class ProducerTest extends TestCase
*/
public function testPubFail(string $topic, string $body, string $exceptionMessage): void
{
$this->expectException(NsqError::class);
$this->expectException(ServerException::class);
$this->expectExceptionMessage($exceptionMessage);
$producer = new Producer('tcp://localhost:4150');
$producer = Producer::create('tcp://localhost:4150');
wait($producer->connect());
wait($producer->pub($topic, $body));
Loop::run(static function () use ($producer, $topic, $body): Generator {
yield $producer->connect();
yield $producer->publish($topic, $body);
});
}
/**
* @return Generator<string, array>
*/
public function pubFails(): Generator
{
yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0'];

View File

@@ -1,30 +0,0 @@
<?php
declare(strict_types=1);
namespace Protocol;
use Nsq\Protocol\ErrorType;
use PHPUnit\Framework\TestCase;
final class ErrorTypeTest extends TestCase
{
/**
* @dataProvider data
*/
public function testConstructor(string $type, bool $isConnectionTerminated): void
{
$errorType = new ErrorType($type);
self::assertSame($isConnectionTerminated, $errorType->terminateConnection);
}
/**
* @return \Generator<string, array<int, bool|string>>
*/
public function data(): \Generator
{
foreach ((new \ReflectionClass(ErrorType::class))->getConstants() as $constant => $isTerminated) {
yield $constant => [$constant, $isTerminated];
}
}
}