From 6eeb2939c8c1dc55af02ed8743e459acd9bb46c8 Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Sat, 23 Jan 2021 01:35:09 +0300 Subject: [PATCH] Rename Reader to Consumer and Writer to Producer --- README.md | 14 ++++++++------ src/{Reader.php => Consumer.php} | 2 +- src/Message.php | 18 +++++++++--------- src/{Writer.php => Producer.php} | 2 +- src/Response.php | 2 +- src/Subscriber.php | 4 ++-- tests/NsqTest.php | 10 +++++----- 7 files changed, 27 insertions(+), 25 deletions(-) rename src/{Reader.php => Consumer.php} (97%) rename src/{Writer.php => Producer.php} (97%) diff --git a/README.md b/README.md index 24fbc29..dfbf69c 100644 --- a/README.md +++ b/README.md @@ -44,30 +44,32 @@ Usage ### Publish ```php -use Nsq\Writer; +use Nsq\Producer; -$writer = new Writer(address: 'tcp://nsqd:4150'); +$producer = new Producer(address: 'tcp://nsqd:4150'); // Publish a message to a topic -$writer->pub('topic', 'Simple message'); +$producer->pub('topic', 'Simple message'); // Publish multiple messages to a topic (atomically) -$writer->mpub('topic', [ +$producer->mpub('topic', [ 'Message one', 'Message two', ]); // Publish a deferred message to a topic -$writer->dpub('topic', 5000, 'Deferred message'); +$producer->dpub('topic', 5000, 'Deferred message'); ``` ### Subscription ```php +use Nsq\Consumer; use Nsq\Message; use Nsq\Subscriber; -$subscriber = new Subscriber(address: 'tcp://nsqd:4150'); +$consumer = new Consumer('tcp://nsqd:4150'); +$subscriber = new Subscriber($consumer); $generator = $subscriber->subscribe('topic', 'channel', timeout: 5); foreach ($generator as $message) { diff --git a/src/Reader.php b/src/Consumer.php similarity index 97% rename from src/Reader.php rename to src/Consumer.php index d884e68..6c207e2 100644 --- a/src/Reader.php +++ b/src/Consumer.php @@ -4,7 +4,7 @@ declare(strict_types=1); namespace Nsq; -class Reader extends Connection +class Consumer extends Connection { /** * Subscribe to a topic/channel. diff --git a/src/Message.php b/src/Message.php index 2043da3..c7513c0 100644 --- a/src/Message.php +++ b/src/Message.php @@ -28,20 +28,20 @@ final class Message */ public string $body; - public function __construct(int $timestamp, int $attempts, string $id, string $body, Reader $reader) + private bool $finished = false; + + private Consumer $consumer; + + public function __construct(int $timestamp, int $attempts, string $id, string $body, Consumer $consumer) { $this->timestamp = $timestamp; $this->attempts = $attempts; $this->id = $id; $this->body = $body; - $this->connection = $reader; + $this->consumer = $consumer; } - private bool $finished = false; - - private Reader $connection; - public function isFinished(): bool { return $this->finished; @@ -53,7 +53,7 @@ final class Message throw new LogicException('Can\'t finish message as it already finished.'); } - $this->connection->fin($this->id); + $this->consumer->fin($this->id); $this->finished = true; } @@ -63,7 +63,7 @@ final class Message throw new LogicException('Can\'t requeue message as it already finished.'); } - $this->connection->req($this->id, $timeout); + $this->consumer->req($this->id, $timeout); $this->finished = true; } @@ -73,6 +73,6 @@ final class Message throw new LogicException('Can\'t touch message as it already finished.'); } - $this->connection->touch($this->id); + $this->consumer->touch($this->id); } } diff --git a/src/Writer.php b/src/Producer.php similarity index 97% rename from src/Writer.php rename to src/Producer.php index d0da99b..99ba84b 100644 --- a/src/Writer.php +++ b/src/Producer.php @@ -10,7 +10,7 @@ use function pack; use function sprintf; use const PHP_EOL; -final class Writer extends Connection +final class Producer extends Connection { /** * @psalm-suppress PossiblyFalseOperand diff --git a/src/Response.php b/src/Response.php index 650340d..2ca643c 100644 --- a/src/Response.php +++ b/src/Response.php @@ -44,7 +44,7 @@ final class Response return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes(); } - public function toMessage(Reader $reader): Message + public function toMessage(Consumer $reader): Message { if (self::TYPE_MESSAGE !== $this->type) { throw new Exception(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type)); diff --git a/src/Subscriber.php b/src/Subscriber.php index 67a5554..3e28196 100644 --- a/src/Subscriber.php +++ b/src/Subscriber.php @@ -14,9 +14,9 @@ final class Subscriber public const STOP = 0; public const CHANGE_TIMEOUT = 1; - private Reader $reader; + private Consumer $reader; - public function __construct(Reader $reader) + public function __construct(Consumer $reader) { $this->reader = $reader; } diff --git a/tests/NsqTest.php b/tests/NsqTest.php index 6eba15a..bfb112d 100644 --- a/tests/NsqTest.php +++ b/tests/NsqTest.php @@ -3,9 +3,9 @@ declare(strict_types=1); use Nsq\Message; -use Nsq\Reader; +use Nsq\Consumer; use Nsq\Subscriber; -use Nsq\Writer; +use Nsq\Producer; use Nsq\Exception; use PHPUnit\Framework\TestCase; @@ -13,10 +13,10 @@ final class NsqTest extends TestCase { public function test(): void { - $writer = new Writer('tcp://localhost:4150'); + $writer = new Producer('tcp://localhost:4150'); $writer->pub(__FUNCTION__, __FUNCTION__); - $reader = new Reader('tcp://localhost:4150'); + $reader = new Consumer('tcp://localhost:4150'); $subscriber = new Subscriber($reader); $generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__, 1); @@ -85,7 +85,7 @@ final class NsqTest extends TestCase $this->expectException(Exception::class); $this->expectExceptionMessage($exceptionMessage); - $writer = new Writer('tcp://localhost:4150'); + $writer = new Producer('tcp://localhost:4150'); $writer->pub($topic, $body); }