Rename Reader to Consumer and Writer to Producer

This commit is contained in:
2021-01-23 01:35:09 +03:00
parent bf738254b1
commit 6eeb2939c8
7 changed files with 27 additions and 25 deletions

View File

@ -44,30 +44,32 @@ Usage
### Publish ### Publish
```php ```php
use Nsq\Writer; use Nsq\Producer;
$writer = new Writer(address: 'tcp://nsqd:4150'); $producer = new Producer(address: 'tcp://nsqd:4150');
// Publish a message to a topic // Publish a message to a topic
$writer->pub('topic', 'Simple message'); $producer->pub('topic', 'Simple message');
// Publish multiple messages to a topic (atomically) // Publish multiple messages to a topic (atomically)
$writer->mpub('topic', [ $producer->mpub('topic', [
'Message one', 'Message one',
'Message two', 'Message two',
]); ]);
// Publish a deferred message to a topic // Publish a deferred message to a topic
$writer->dpub('topic', 5000, 'Deferred message'); $producer->dpub('topic', 5000, 'Deferred message');
``` ```
### Subscription ### Subscription
```php ```php
use Nsq\Consumer;
use Nsq\Message; use Nsq\Message;
use Nsq\Subscriber; use Nsq\Subscriber;
$subscriber = new Subscriber(address: 'tcp://nsqd:4150'); $consumer = new Consumer('tcp://nsqd:4150');
$subscriber = new Subscriber($consumer);
$generator = $subscriber->subscribe('topic', 'channel', timeout: 5); $generator = $subscriber->subscribe('topic', 'channel', timeout: 5);
foreach ($generator as $message) { foreach ($generator as $message) {

View File

@ -4,7 +4,7 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
class Reader extends Connection class Consumer extends Connection
{ {
/** /**
* Subscribe to a topic/channel. * Subscribe to a topic/channel.

View File

@ -28,20 +28,20 @@ final class Message
*/ */
public string $body; public string $body;
public function __construct(int $timestamp, int $attempts, string $id, string $body, Reader $reader) private bool $finished = false;
private Consumer $consumer;
public function __construct(int $timestamp, int $attempts, string $id, string $body, Consumer $consumer)
{ {
$this->timestamp = $timestamp; $this->timestamp = $timestamp;
$this->attempts = $attempts; $this->attempts = $attempts;
$this->id = $id; $this->id = $id;
$this->body = $body; $this->body = $body;
$this->connection = $reader; $this->consumer = $consumer;
} }
private bool $finished = false;
private Reader $connection;
public function isFinished(): bool public function isFinished(): bool
{ {
return $this->finished; return $this->finished;
@ -53,7 +53,7 @@ final class Message
throw new LogicException('Can\'t finish message as it already finished.'); throw new LogicException('Can\'t finish message as it already finished.');
} }
$this->connection->fin($this->id); $this->consumer->fin($this->id);
$this->finished = true; $this->finished = true;
} }
@ -63,7 +63,7 @@ final class Message
throw new LogicException('Can\'t requeue message as it already finished.'); throw new LogicException('Can\'t requeue message as it already finished.');
} }
$this->connection->req($this->id, $timeout); $this->consumer->req($this->id, $timeout);
$this->finished = true; $this->finished = true;
} }
@ -73,6 +73,6 @@ final class Message
throw new LogicException('Can\'t touch message as it already finished.'); throw new LogicException('Can\'t touch message as it already finished.');
} }
$this->connection->touch($this->id); $this->consumer->touch($this->id);
} }
} }

View File

@ -10,7 +10,7 @@ use function pack;
use function sprintf; use function sprintf;
use const PHP_EOL; use const PHP_EOL;
final class Writer extends Connection final class Producer extends Connection
{ {
/** /**
* @psalm-suppress PossiblyFalseOperand * @psalm-suppress PossiblyFalseOperand

View File

@ -44,7 +44,7 @@ final class Response
return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes(); return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes();
} }
public function toMessage(Reader $reader): Message public function toMessage(Consumer $reader): Message
{ {
if (self::TYPE_MESSAGE !== $this->type) { if (self::TYPE_MESSAGE !== $this->type) {
throw new Exception(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type)); throw new Exception(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type));

View File

@ -14,9 +14,9 @@ final class Subscriber
public const STOP = 0; public const STOP = 0;
public const CHANGE_TIMEOUT = 1; public const CHANGE_TIMEOUT = 1;
private Reader $reader; private Consumer $reader;
public function __construct(Reader $reader) public function __construct(Consumer $reader)
{ {
$this->reader = $reader; $this->reader = $reader;
} }

View File

@ -3,9 +3,9 @@
declare(strict_types=1); declare(strict_types=1);
use Nsq\Message; use Nsq\Message;
use Nsq\Reader; use Nsq\Consumer;
use Nsq\Subscriber; use Nsq\Subscriber;
use Nsq\Writer; use Nsq\Producer;
use Nsq\Exception; use Nsq\Exception;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
@ -13,10 +13,10 @@ final class NsqTest extends TestCase
{ {
public function test(): void public function test(): void
{ {
$writer = new Writer('tcp://localhost:4150'); $writer = new Producer('tcp://localhost:4150');
$writer->pub(__FUNCTION__, __FUNCTION__); $writer->pub(__FUNCTION__, __FUNCTION__);
$reader = new Reader('tcp://localhost:4150'); $reader = new Consumer('tcp://localhost:4150');
$subscriber = new Subscriber($reader); $subscriber = new Subscriber($reader);
$generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__, 1); $generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__, 1);
@ -85,7 +85,7 @@ final class NsqTest extends TestCase
$this->expectException(Exception::class); $this->expectException(Exception::class);
$this->expectExceptionMessage($exceptionMessage); $this->expectExceptionMessage($exceptionMessage);
$writer = new Writer('tcp://localhost:4150'); $writer = new Producer('tcp://localhost:4150');
$writer->pub($topic, $body); $writer->pub($topic, $body);
} }