commit 7c6284efcbf4d2b0cf7e55b745205a5638228508 Author: Konstantin Grachev Date: Mon Jan 18 16:09:51 2021 +0300 Initial commit diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..6d1f7e8 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,122 @@ +name: CI + +on: + - pull_request + - push + +jobs: + tests: + name: Tests + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: + - ubuntu-latest + php: + - '7.4' + - '8.0' + dependencies: + - lowest + - highest + + services: + nsqd: + image: nsqio/nsq:v1.2.0 + options: --entrypoint /nsqd + ports: + - 4150:4150 + + steps: + - name: Checkout + uses: actions/checkout@v2 + + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + coverage: pcov + + - name: Setup Problem Matchers for PHPUnit + run: echo "::add-matcher::${{ runner.tool_cache }}/phpunit.json" + + - name: Determine Composer cache directory + id: composer-cache + run: echo "::set-output name=directory::$(composer config cache-dir)" + + - name: Cache Composer dependencies + uses: actions/cache@v2 + with: + path: ${{ steps.composer-cache.outputs.directory }} + key: ${{ runner.os }}-${{ matrix.php }}-composer-${{ matrix.dependencies }}-${{ hashFiles('**/composer.lock') }} + restore-keys: ${{ runner.os }}-${{ matrix.php }}-${{ matrix.dependencies }}-composer- + + - name: Install highest dependencies + run: composer update --no-progress --no-interaction --prefer-dist + if: ${{ matrix.dependencies == 'highest' }} + + - name: Install lowest dependencies + run: composer update --no-progress --no-interaction --prefer-dist --prefer-lowest + if: ${{ matrix.dependencies == 'lowest' }} + + - name: Run tests + run: vendor/bin/phpunit --coverage-clover=build/coverage-report.xml + + - name: Upload code coverage + uses: codecov/codecov-action@v1 + with: + file: build/coverage-report.xml + + php-cs-fixer: + name: PHP-CS-Fixer + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v2 + + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: '7.4' + + - name: Install dependencies + run: composer update --no-progress --no-interaction --prefer-dist + + - name: Run script + run: composer phpcs + + phpstan: + name: PHPStan + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v2 + + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: '7.4' + + - name: Install dependencies + run: composer update --no-progress --no-interaction --prefer-dist + + - name: Run script + run: composer phpstan + + psalm: + name: Psalm + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v2 + + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: '7.4' + + - name: Install dependencies + run: composer update --no-progress --no-interaction --prefer-dist + + - name: Run script + run: composer psalm diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4fbb073 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/vendor/ +/composer.lock diff --git a/.php_cs.dist b/.php_cs.dist new file mode 100644 index 0000000..53d9a56 --- /dev/null +++ b/.php_cs.dist @@ -0,0 +1,19 @@ +#!/usr/bin/env php +setRiskyAllowed(true) + ->setRules([ + '@PhpCsFixer' => true, + '@PhpCsFixer:risky' => true, + '@PSR12' => true, + '@PSR12:risky' => true, + 'declare_strict_types' => true, + 'php_unit_internal_class' => false, + 'php_unit_test_class_requires_covers' => false, + 'yoda_style' => true, + ]) + ->setFinder( + PhpCsFixer\Finder::create() + ->in(__DIR__) + ); diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0a55468 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 nsqphp + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..3d9efcb --- /dev/null +++ b/README.md @@ -0,0 +1,42 @@ +# Nsq PHP + + + +A NSQ Client library for PHP. + +[![Latest Stable Version](https://poser.pugx.org/nsq/nsq/v)](//packagist.org/packages/nsq/nsq) [![Total Downloads](https://poser.pugx.org/nsq/nsq/downloads)](//packagist.org/packages/nsq/nsq) [![Latest Unstable Version](https://poser.pugx.org/nsq/nsq/v/unstable)](//packagist.org/packages/nsq/nsq) [![License](https://poser.pugx.org/nsq/nsq/license)](//packagist.org/packages/nsq/nsq) +[![codecov](https://codecov.io/gh/nsqphp/nsqphp/branch/main/graph/badge.svg?token=AYUMC3OO2B)](https://codecov.io/gh/nsqphp/nsqphp) + +Installation +------------ + +This library is installable via [Composer](https://getcomposer.org/): + +```bash +composer require nsq/nsq +``` + +Requirements +------------ + +This library requires PHP 7.4 or later. + +Although not required, it is recommended that you install the [phpinnacle/ext-buffer](https://github.com/phpinnacle/ext-buffer) to speed up [phpinnacle/buffer](https://github.com/phpinnacle/buffer) . + +Features +-------- + +- [x] SUB +- [x] PUB +- [ ] Feature Negotiation +- [ ] Discovery +- [ ] Backoff +- [ ] TLS +- [ ] Snappy +- [ ] Sampling +- [ ] AUTH + +License: +-------- + +The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information. Maintained by [Spiral Scout](https://spiralscout.com). diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..e84810f --- /dev/null +++ b/composer.json @@ -0,0 +1,52 @@ +{ + "name": "nsq/nsq", + "type": "library", + "description": "NSQ Client for PHP", + "homepage": "https://github.com/nsqphp/nsqphp", + "license": "MIT", + "authors": [ + { + "name": "Konstantin Grachev", + "email": "me@grachevko.ru" + } + ], + "require": { + "php": ">=7.4", + "ext-json": "*", + "clue/socket-raw": "^1.5", + "phpinnacle/buffer": "^1.2" + }, + "require-dev": { + "ergebnis/composer-normalize": "9999999-dev", + "friendsofphp/php-cs-fixer": "^2.18", + "phpstan/phpstan": "^0.12.68", + "phpstan/phpstan-phpunit": "^0.12.17", + "phpstan/phpstan-strict-rules": "^0.12.9", + "phpunit/phpunit": "^9.5", + "vimeo/psalm": "^4.4" + }, + "config": { + "sort-packages": true + }, + "autoload": { + "psr-4": { + "Nsq\\": "src/" + } + }, + "minimum-stability": "dev", + "prefer-stable": true, + "scripts": { + "phpcs": [ + "vendor/bin/php-cs-fixer fix --verbose --diff --dry-run" + ], + "phpstan": [ + "vendor/bin/phpstan analyse" + ], + "psalm": [ + "vendor/bin/psalm" + ], + "tests": [ + "vendor/bin/phpunit --verbose" + ] + } +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..8bacd1c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,14 @@ +version: '3.7' + +services: + nsqd: + image: nsqio/nsq:v1.2.0 + command: /nsqd + ports: + - 4150:4150 + + nsqadmin: + image: nsqio/nsq:v1.2.0 + command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:4171 + ports: + - 4171:4171 diff --git a/logo.png b/logo.png new file mode 100644 index 0000000..59e0020 Binary files /dev/null and b/logo.png differ diff --git a/phpstan.neon b/phpstan.neon new file mode 100644 index 0000000..43e22d0 --- /dev/null +++ b/phpstan.neon @@ -0,0 +1,10 @@ +includes: + - vendor/phpstan/phpstan-phpunit/extension.neon + - vendor/phpstan/phpstan-strict-rules/rules.neon + - vendor/phpstan/phpstan/conf/bleedingEdge.neon + +parameters: + level: 7 + paths: + - src + - tests diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..4492973 --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,20 @@ + + + + + ./src/ + + + + + + + + tests + + + diff --git a/psalm.xml b/psalm.xml new file mode 100644 index 0000000..54abdfa --- /dev/null +++ b/psalm.xml @@ -0,0 +1,16 @@ + + + + + + + + + diff --git a/src/Config.php b/src/Config.php new file mode 100644 index 0000000..d15130c --- /dev/null +++ b/src/Config.php @@ -0,0 +1,19 @@ +address = $address; + } +} diff --git a/src/Connection.php b/src/Connection.php new file mode 100644 index 0000000..a2f48e8 --- /dev/null +++ b/src/Connection.php @@ -0,0 +1,136 @@ +socket = $socket; + } + + /** + * @psalm-suppress UnsafeInstantiation + * + * @return static + */ + public static function connect(Config $config): self + { + $socket = (new Factory())->createClient($config->address); + $socket->write(self::MAGIC_V2); + + // @phpstan-ignore-next-line + return new self($socket); + } + + /** + * @psalm-param array $arr + * + * @psalm-suppress PossiblyFalseOperand + */ + public function identify(array $arr): string + { + $body = json_encode($arr, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT); + $size = pack('N', \strlen($body)); + + return 'IDENTIFY '.PHP_EOL.$size.$body; + } + + /** + * @psalm-suppress PossiblyFalseOperand + */ + public function auth(string $secret): string + { + $size = pack('N', \strlen($secret)); + + return 'AUTH'.PHP_EOL.$size.$secret; + } + + public function write(string $buffer): void + { + if ($this->closed) { + throw new LogicException('This connection is closed, create new one.'); + } + + try { + $this->socket->write($buffer); + } catch (Throwable $e) { + $this->closed = true; + + throw $e; + } + } + + public function read(): ?Message + { + $socket = $this->socket; + + $buffer = new ByteBuffer($socket->read(self::BYTES_SIZE + self::BYTES_TYPE)); + $size = $buffer->consumeUint32(); + $type = $buffer->consumeUint32(); + + $buffer->append($socket->read($size - self::BYTES_TYPE)); + + if (self::TYPE_RESPONSE === $type) { + $response = $buffer->consume($size - self::BYTES_TYPE); + + if (self::OK === $response || self::CLOSE_WAIT === $response) { + return null; + } + + if (self::HEARTBEAT === $response) { + $socket->write('NOP'.PHP_EOL); + + return null; + } + + throw new LogicException(sprintf('Unexpected response from nsq: "%s"', $response)); + } + + if (self::TYPE_ERROR === $type) { + throw new LogicException(sprintf('NSQ return error: "%s"', $socket->read($size))); + } + + if (self::TYPE_MESSAGE !== $type) { + throw new LogicException(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $type)); + } + + $timestamp = $buffer->consumeInt64(); + $attempts = $buffer->consumeUint16(); + $id = $buffer->consume(self::BYTES_ID); + $body = $buffer->consume($size - self::BYTES_TYPE - self::BYTES_TIMESTAMP - self::BYTES_ATTEMPTS - self::BYTES_ID); + + return new Message($timestamp, $attempts, $id, $body); + } +} diff --git a/src/Envelope.php b/src/Envelope.php new file mode 100644 index 0000000..85cbc90 --- /dev/null +++ b/src/Envelope.php @@ -0,0 +1,51 @@ +message = $message; + $this->acknowledge = $ack; + $this->requeue = $req; + $this->touching = $touch; + } + + public function ack(): void + { + \call_user_func($this->acknowledge); + } + + public function retry(int $timeout): void + { + \call_user_func($this->requeue, $timeout); + } + + public function touch(): void + { + \call_user_func($this->touching); + } +} diff --git a/src/Message.php b/src/Message.php new file mode 100644 index 0000000..7b6fe1a --- /dev/null +++ b/src/Message.php @@ -0,0 +1,27 @@ +timestamp = $timestamp; + $this->attempts = $attempts; + $this->id = $id; + $this->body = $body; + } +} diff --git a/src/Reader.php b/src/Reader.php new file mode 100644 index 0000000..fcbe16f --- /dev/null +++ b/src/Reader.php @@ -0,0 +1,99 @@ +connection = $connection; + } + + public function __destruct() + { + $this->close(); + } + + /** + * Subscribe to a topic/channel. + */ + public function sub(string $topic, string $channel): void + { + $buffer = sprintf('SUB %s %s', $topic, $channel).PHP_EOL; + + $this->connection->write($buffer); + $this->connection->read(); + } + + /** + * Update RDY state (indicate you are ready to receive N messages). + */ + public function rdy(int $count): void + { + $this->connection->write('RDY '.$count.PHP_EOL); + } + + /** + * Finish a message (indicate successful processing). + */ + public function fin(string $id): void + { + $this->connection->write('FIN '.$id.PHP_EOL); + } + + /** + * Re-queue a message (indicate failure to process) + * The re-queued message is placed at the tail of the queue, equivalent to having just published it, + * but for various implementation specific reasons that behavior should not be explicitly relied upon and may change in the future. + * Similarly, a message that is in-flight and times out behaves identically to an explicit REQ. + */ + public function req(string $id, int $timeout): void + { + $this->connection->write(sprintf('REQ %s %s', $id, $timeout).PHP_EOL); + } + + /** + * Reset the timeout for an in-flight message. + */ + public function touch(string $id): void + { + $this->connection->write('TOUCH '.$id.PHP_EOL); + } + + public function consume(?float $timeout = null): ?Message + { + if (false === $this->connection->socket->selectRead($timeout)) { + return null; + } + + return $this->connection->read() ?? $this->consume(0); + } + + /** + * Cleanly close your connection (no more messages are sent). + */ + public function close(): void + { + if ($this->connection->closed) { + return; + } + + $this->connection->closed = true; + + $this->connection->socket->write('CLS'.PHP_EOL); + $this->connection->read(); + + try { + $this->connection->socket->close(); + } catch (Throwable $e) { + } + } +} diff --git a/src/Subscriber.php b/src/Subscriber.php new file mode 100644 index 0000000..1aeef57 --- /dev/null +++ b/src/Subscriber.php @@ -0,0 +1,74 @@ +reader = $reader; + } + + /** + * @psalm-return Generator + */ + public function subscribe(string $topic, string $channel, ?float $timeout = 0): Generator + { + $reader = $this->reader; + $reader->sub($topic, $channel); + $reader->rdy(1); + + while (true) { + $message = $reader->consume($timeout); + + if (null === $message) { + if (true === yield null) { + break; + } + + continue; + } + + $finished = false; + $envelop = new Envelope( + $message, + static function () use ($reader, $message, &$finished): void { + if ($finished) { + throw new LogicException('Can\'t ack, message already finished.'); + } + + $finished = true; + + $reader->fin($message->id); + }, + static function (int $timeout) use ($reader, $message, &$finished): void { + if ($finished) { + throw new LogicException('Can\'t retry, message already finished.'); + } + + $finished = true; + + $reader->req($message->id, $timeout); + }, + static function () use ($reader, $message): void { + $reader->touch($message->id); + }, + ); + + if (true === yield $envelop) { + break; + } + + $reader->rdy(1); + } + + $reader->close(); + } +} diff --git a/src/Writer.php b/src/Writer.php new file mode 100644 index 0000000..2c9a182 --- /dev/null +++ b/src/Writer.php @@ -0,0 +1,68 @@ +connection = $connection; + } + + /** + * @psalm-suppress PossiblyFalseOperand + */ + public function pub(string $topic, string $body): void + { + $size = pack('N', \strlen($body)); + + $buffer = 'PUB '.$topic.PHP_EOL.$size.$body; + + $this->connection->write($buffer); + $this->connection->read(); + } + + /** + * @psalm-param array $bodies + * + * @psalm-suppress PossiblyFalseOperand + */ + public function mpub(string $topic, array $bodies): void + { + $num = pack('N', \count($bodies)); + + $mb = implode('', array_map(static function ($body): string { + return pack('N', \strlen($body)).$body; + }, $bodies)); + + $size = pack('N', \strlen($num.$mb)); + + $buffer = 'MPUB '.$topic.PHP_EOL.$size.$num.$mb; + + $this->connection->write($buffer); + $this->connection->read(); + } + + /** + * @psalm-suppress PossiblyFalseOperand + */ + public function dpub(string $topic, int $deferTime, string $body): void + { + $size = pack('N', \strlen($body)); + + $buffer = sprintf('DPUB %s %s', $topic, $deferTime).PHP_EOL.$size.$body; + + $this->connection->write($buffer); + $this->connection->read(); + } +} diff --git a/tests/NsqTest.php b/tests/NsqTest.php new file mode 100644 index 0000000..7bde73b --- /dev/null +++ b/tests/NsqTest.php @@ -0,0 +1,35 @@ +pub(__FUNCTION__, __FUNCTION__); + + $subscriber = new Subscriber(new Reader(Connection::connect($config))); + $generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__, 1); + + $envelope = $generator->current(); + + static::assertInstanceOf(Envelope::class, $envelope); + /** @var Envelope $envelope */ + static::assertSame(__FUNCTION__, $envelope->message->body); + $envelope->ack(); + + $generator->next(); + static::assertNull($generator->current()); + } +}