diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 6d77160..1cd3829 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -15,15 +15,12 @@ jobs: - ubuntu-latest php: - '8.1' + nsq: + - nsq-1.2.0.linux-amd64.go1.12.9 + - nsq-1.2.1.linux-amd64.go1.16.6 dependencies: - lowest - highest - services: - nsqd: - image: nsqio/nsq:v1.2.0 - options: --entrypoint /nsqd - ports: - - 4150:4150 steps: - name: Checkout uses: actions/checkout@v2 @@ -37,6 +34,12 @@ jobs: env: update: true + - name: Download NSQ + run: | + curl -sSL "http://bitly-downloads.s3.amazonaws.com/nsq/${{ matrix.nsq }}.tar.gz" \ + | tar -xzv --strip-components=1 + ./bin/nsqd --version + - name: Setup Problem Matchers for PHPUnit run: echo "::add-matcher::${{ runner.tool_cache }}/phpunit.json" diff --git a/.gitignore b/.gitignore index 378fe3f..32311c5 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,14 @@ /.php-cs-fixer.cache /.phpunit.result.cache /infection.log + +# Nsq +bin/nsq_stat +bin/nsq_tail +bin/nsq_to_file +bin/nsq_to_http +bin/nsq_to_nsq +bin/nsqadmin +bin/nsqd +bin/nsqlookupd +bin/to_nsq diff --git a/composer.json b/composer.json index d66a049..0af8557 100644 --- a/composer.json +++ b/composer.json @@ -29,6 +29,8 @@ "phpstan/phpstan-phpunit": "^1.1", "phpstan/phpstan-strict-rules": "^1.3", "phpunit/phpunit": "^9.5", + "symfony/filesystem": "^6.1", + "symfony/process": "^6.1", "symfony/var-dumper": "^6.1", "vimeo/psalm": "^4.4" }, @@ -45,6 +47,9 @@ } }, "autoload-dev": { + "classmap": [ + "tests/" + ], "files": [ "vendor/symfony/var-dumper/Resources/functions/dump.php" ] diff --git a/tests/Nsqd.php b/tests/Nsqd.php new file mode 100644 index 0000000..c116ad5 --- /dev/null +++ b/tests/Nsqd.php @@ -0,0 +1,95 @@ +mkdir($this->dataPath); + + $nsqd = new Process([ + './nsqd', + sprintf('-data-path=%s', $this->dataPath), + sprintf('-http-address=0.0.0.0:%s', $this->httpPort), + sprintf('-tcp-address=0.0.0.0:%s', $this->tcpPort), + '-log-level=debug', + ], dirname(__DIR__).'/bin'); + + $nsqd->start(); + + while (false === @fsockopen('localhost', $this->tcpPort)) { + if (!$nsqd->isRunning()) { + throw new RuntimeException($nsqd->getErrorOutput()); + } + + usleep(10000); + } + + $this->process = $nsqd; + $this->address = sprintf('tcp://localhost:%s', $this->tcpPort); + } + + public static function create(): self + { + do { + $dir = sprintf('/tmp/%s', bin2hex(random_bytes(5))); + } while (is_dir($dir)); + + return new self( + $dir, + findFreePort(), + findFreePort(), + ); + } + + public function tail(string $topic, string $channel, int $messages): Process + { + $tail = new Process( + [ + './nsq_tail', + sprintf('-nsqd-tcp-address=localhost:%s', $this->tcpPort), + sprintf('-topic=%s', $topic), + sprintf('-channel=%s', $channel), + sprintf('-n=%s', $messages), + '-print-topic', + ], + dirname(__DIR__).'/bin', + timeout: 10, + ); + + $tail->start(); + + return $tail; + } + + public function __destruct() + { + $this->process->stop(); + self::$fs->remove($this->dataPath); + } +} + +function findFreePort(): int +{ + $sock = socket_create_listen(0); + assert($sock instanceof \Socket); + + socket_getsockname($sock, $addr, $port); + socket_close($sock); + + return $port; +} diff --git a/tests/ProducerTest.php b/tests/ProducerTest.php index 0c1fc3f..20dc7bf 100644 --- a/tests/ProducerTest.php +++ b/tests/ProducerTest.php @@ -7,17 +7,75 @@ use Nsq\Exception\ServerException; use Nsq\Producer; use PHPUnit\Framework\TestCase; +use function Amp\Promise\wait; + final class ProducerTest extends TestCase { + /** + * @dataProvider bodies + */ + public function testPublish(string $body): void + { + $nsqd = Nsqd::create(); + $tail = $nsqd->tail('test', 'test', 1); + + $producer = Producer::create($nsqd->address); + + wait($producer->connect()); + self::assertTrue(wait($producer->publish('test', $body))); + + self::assertSame(0, $tail->wait()); + + self::assertSame("test | {$body}", trim($tail->getOutput())); + } + + /** + * @dataProvider bodies + */ + public function testPublishMultiple(string $body): void + { + $nsqd = Nsqd::create(); + $tail = $nsqd->tail('test', 'test', 2); + + $producer = Producer::create($nsqd->address); + + wait($producer->connect()); + self::assertTrue(wait($producer->publish('test', [$body, $body]))); + + self::assertSame(0, $tail->wait()); + + self::assertSame("test | {$body}\ntest | {$body}", trim($tail->getOutput())); + } + + /** + * @dataProvider bodies + */ + public function testPublishDeferred(string $body): void + { + $nsqd = Nsqd::create(); + $tail = $nsqd->tail('test', 'test', 1); + + $producer = Producer::create($nsqd->address); + + wait($producer->connect()); + self::assertTrue(wait($producer->publish('test', $body, 1))); + + self::assertSame(0, $tail->wait()); + + self::assertSame("test | {$body}", trim($tail->getOutput())); + } + /** * @dataProvider pubFails */ public function testPubFail(string $topic, string $body, string $exceptionMessage): void { + $nsqd = Nsqd::create(); + $this->expectException(ServerException::class); $this->expectExceptionMessage($exceptionMessage); - $producer = Producer::create('tcp://localhost:4150'); + $producer = Producer::create($nsqd->address); Loop::run(static function () use ($producer, $topic, $body): Generator { yield $producer->connect(); @@ -31,4 +89,10 @@ final class ProducerTest extends TestCase yield 'Empty body' => ['test', '', 'E_BAD_MESSAGE PUB invalid message body size 0']; yield 'Invalid topic' => ['test$%^&', '', 'E_BAD_TOPIC PUB topic name "test$%^&" is not valid']; } + + public function bodies(): Generator + { + yield 'Simple Body' => ['Simple Body']; + yield 'Body with special chars' => ['test$%^&']; + } }