From 2f638b9c75754436c21a4777b6cfadcfd7c28434 Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Wed, 10 Mar 2021 21:06:55 +0300 Subject: [PATCH] Remove Reader and ConsumerInterface --- examples/reader.php | 39 ------- src/Consumer.php | 2 +- src/ConsumerInterface.php | 47 -------- src/Message.php | 4 +- src/Reader.php | 222 -------------------------------------- tests/MessageTest.php | 4 +- 6 files changed, 5 insertions(+), 313 deletions(-) delete mode 100644 examples/reader.php delete mode 100644 src/ConsumerInterface.php delete mode 100644 src/Reader.php diff --git a/examples/reader.php b/examples/reader.php deleted file mode 100644 index 68dddee..0000000 --- a/examples/reader.php +++ /dev/null @@ -1,39 +0,0 @@ -setFormatter(new ConsoleFormatter()); -$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]); - -$reader = new Reader( - 'tcp://localhost:4150', - topic: 'local', - channel: 'local', - clientConfig: new ClientConfig( - deflate: false, - snappy: false, - ), - logger: $logger, -); - -wait($reader->connect()); - -while (true) { - $message = wait($reader->consume()); - - $logger->info('Received: {body}', ['body' => $message->body]); - - wait($message->finish()); -} diff --git a/src/Consumer.php b/src/Consumer.php index 00c40cd..fa680d6 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -15,7 +15,7 @@ use Psr\Log\NullLogger; use function Amp\asyncCall; use function Amp\call; -final class Consumer extends Connection implements ConsumerInterface +final class Consumer extends Connection { private int $rdy = 0; diff --git a/src/ConsumerInterface.php b/src/ConsumerInterface.php deleted file mode 100644 index 672adcf..0000000 --- a/src/ConsumerInterface.php +++ /dev/null @@ -1,47 +0,0 @@ - - */ - public function rdy(int $count): Promise; - - /** - * Finish a message (indicate successful processing). - * - * @return Promise - * - * @internal - */ - public function fin(string $id): Promise; - - /** - * Re-queue a message (indicate failure to process) The re-queued message is placed at the tail of the queue, - * equivalent to having just published it, but for various implementation specific reasons that behavior should not - * be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out - * behaves identically to an explicit REQ. - * - * @return Promise - * - * @internal - */ - public function req(string $id, int $timeout): Promise; - - /** - * Reset the timeout for an in-flight message. - * - * @return Promise - * - * @internal - */ - public function touch(string $id): Promise; -} diff --git a/src/Message.php b/src/Message.php index 8ec1104..3141f1b 100644 --- a/src/Message.php +++ b/src/Message.php @@ -17,11 +17,11 @@ final class Message public string $body, public int $timestamp, public int $attempts, - private ConsumerInterface $consumer, + private Consumer $consumer, ) { } - public static function compose(Frame\Message $message, ConsumerInterface $consumer): self + public static function compose(Frame\Message $message, Consumer $consumer): self { return new self( $message->id, diff --git a/src/Reader.php b/src/Reader.php deleted file mode 100644 index a823682..0000000 --- a/src/Reader.php +++ /dev/null @@ -1,222 +0,0 @@ -> - */ - private array $deferreds = []; - - /** - * @var array - */ - private array $messages = []; - - public function __construct( - private string $address, - private string $topic, - private string $channel, - ClientConfig $clientConfig, - private LoggerInterface $logger, - ) { - parent::__construct( - $this->address, - $clientConfig, - $this->logger, - ); - } - - public static function create( - string $address, - string $topic, - string $channel, - ?ClientConfig $clientConfig = null, - ?LoggerInterface $logger = null, - ): self { - return new self( - $address, - $topic, - $channel, - $clientConfig ?? new ClientConfig(), - $logger ?? new NullLogger(), - ); - } - - /** - * {@inheritdoc} - */ - public function connect(): Promise - { - if (!$this->stream instanceof NullStream) { - return call(static function (): void { - }); - } - - return call(function (): \Generator { - yield parent::connect(); - - $this->run(); - }); - } - - private function run(): void - { - $buffer = new Buffer(); - - asyncCall(function () use ($buffer): \Generator { - yield $this->stream->write(Command::sub($this->topic, $this->channel)); - - if (null !== ($chunk = yield $this->stream->read())) { - $buffer->append($chunk); - } - - /** @var Response $response */ - $response = Parser::parse($buffer); - - if (!$response->isOk()) { - throw new ConsumerException('Fail subscription.'); - } - - yield $this->rdy(1); - - asyncCall( - function () use ($buffer): \Generator { - while (null !== $chunk = yield $this->stream->read()) { - $buffer->append($chunk); - - while ($frame = Parser::parse($buffer)) { - switch (true) { - case $frame instanceof Frame\Response: - if ($frame->isHeartBeat()) { - yield $this->stream->write(Command::nop()); - - break; - } - - throw ConsumerException::response($frame); - case $frame instanceof Frame\Error: - $this->handleError($frame); - - $deferred = array_pop($this->deferreds); - - if (null !== $deferred) { - $deferred->fail($frame->toException()); - } - - break; - case $frame instanceof Frame\Message: - $message = Message::compose($frame, $this); - - $deferred = array_pop($this->deferreds); - - if (null === $deferred) { - $this->messages[] = $message; - } else { - $deferred->resolve($message); - } - - break; - } - } - } - - $this->stream = new NullStream(); - } - ); - }); - } - - /** - * @return Promise - */ - public function consume(): Promise - { - $message = array_pop($this->messages); - - if (null !== $message) { - return new Success($message); - } - - $this->deferreds[] = $deferred = new Deferred(); - - return $deferred->promise(); - } - - /** - * Update RDY state (indicate you are ready to receive N messages). - * - * @return Promise - */ - public function rdy(int $count): Promise - { - if ($this->rdy === $count) { - return call(static function (): void { - }); - } - - $this->rdy = $count; - - return $this->stream->write(Command::rdy($count)); - } - - /** - * Finish a message (indicate successful processing). - * - * @return Promise - * - * @internal - */ - public function fin(string $id): Promise - { - --$this->rdy; - - return $this->stream->write(Command::fin($id)); - } - - /** - * Re-queue a message (indicate failure to process) The re-queued message is placed at the tail of the queue, - * equivalent to having just published it, but for various implementation specific reasons that behavior should not - * be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out - * behaves identically to an explicit REQ. - * - * @return Promise - * - * @internal - */ - public function req(string $id, int $timeout): Promise - { - --$this->rdy; - - return $this->stream->write(Command::req($id, $timeout)); - } - - /** - * Reset the timeout for an in-flight message. - * - * @return Promise - * - * @internal - */ - public function touch(string $id): Promise - { - return $this->stream->write(Command::touch($id)); - } -} diff --git a/tests/MessageTest.php b/tests/MessageTest.php index 8f58097..17efef7 100644 --- a/tests/MessageTest.php +++ b/tests/MessageTest.php @@ -4,7 +4,7 @@ declare(strict_types=1); use Amp\Loop; use Amp\Success; -use Nsq\ConsumerInterface; +use Nsq\Consumer; use Nsq\Exception\MessageException; use Nsq\Message; use PHPUnit\Framework\TestCase; @@ -55,7 +55,7 @@ final class MessageTest extends TestCase */ public function messages(): Generator { - $consumer = $this->createMock(ConsumerInterface::class); + $consumer = $this->createMock(Consumer::class); $consumer->method('fin')->willReturn(new Success()); $consumer->method('touch')->willReturn(new Success()); $consumer->method('req')->willReturn(new Success());