Remove Subscriber

This commit is contained in:
2021-01-30 18:22:34 +03:00
parent 1a24efacfb
commit e1cca2d3eb
4 changed files with 26 additions and 50 deletions

View File

@ -41,7 +41,7 @@ Features
Usage Usage
----- -----
### Publish ### Producer
```php ```php
use Nsq\Producer; use Nsq\Producer;
@ -61,12 +61,11 @@ $producer->mpub('topic', [
$producer->dpub('topic', 'Deferred message', delay: 5000); $producer->dpub('topic', 'Deferred message', delay: 5000);
``` ```
### Subscription ### Consumer
```php ```php
use Nsq\Consumer; use Nsq\Consumer;
use Nsq\Protocol\Message; use Nsq\Protocol\Message;
use Nsq\Subscriber;
$consumer = new Consumer( $consumer = new Consumer(
topic: 'topic', topic: 'topic',
@ -74,7 +73,8 @@ $consumer = new Consumer(
address: 'tcp://nsqd:4150', address: 'tcp://nsqd:4150',
); );
$generator = (new Subscriber($consumer))->run(); // Simple blocking loop based on generator
$generator = $consumer->generator();
foreach ($generator as $message) { foreach ($generator as $message) {
if ($message instanceof Message) { if ($message instanceof Message) {
@ -90,11 +90,8 @@ foreach ($generator as $message) {
// In case of nothing received during timeout generator will return NULL // In case of nothing received during timeout generator will return NULL
// Here we can do something between messages, like pcntl_signal_dispatch() // Here we can do something between messages, like pcntl_signal_dispatch()
// We can also communicate with generator through send
// for example:
// Gracefully close connection (loop will be ended) // Gracefully close connection (loop will be ended)
$generator->send(Subscriber::STOP); $generator->send(0);
} }
``` ```

View File

@ -4,6 +4,7 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Generator;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Exception\NsqError; use Nsq\Exception\NsqError;
use Nsq\Exception\NsqException; use Nsq\Exception\NsqException;
@ -34,6 +35,24 @@ final class Consumer extends Connection
$this->command('SUB', [$this->topic, $this->channel])->checkIsOK(); $this->command('SUB', [$this->topic, $this->channel])->checkIsOK();
} }
/**
* @psalm-return Generator<int, Message|float|null, int|null, void>
*/
public function generator(): Generator
{
while (true) {
$this->rdy(1);
$command = yield $this->readMessage();
if (0 === $command) {
break;
}
}
$this->disconnect();
}
public function readMessage(): ?Message public function readMessage(): ?Message
{ {
$frame = $this->readFrame(); $frame = $this->readFrame();

View File

@ -1,38 +0,0 @@
<?php
declare(strict_types=1);
namespace Nsq;
use Generator;
use Nsq\Protocol\Message;
final class Subscriber
{
public const STOP = 0;
private Consumer $reader;
public function __construct(Consumer $reader)
{
$this->reader = $reader;
}
/**
* @psalm-return Generator<int, Message|float|null, int|null, void>
*/
public function run(): Generator
{
while (true) {
$this->reader->rdy(1);
$command = yield $this->reader->readMessage();
if (self::STOP === $command) {
break;
}
}
$this->reader->disconnect();
}
}

View File

@ -6,7 +6,6 @@ use Nsq\Config\ClientConfig;
use Nsq\Consumer; use Nsq\Consumer;
use Nsq\Producer; use Nsq\Producer;
use Nsq\Protocol\Message; use Nsq\Protocol\Message;
use Nsq\Subscriber;
use Nyholm\NSA; use Nyholm\NSA;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
@ -26,8 +25,7 @@ final class NsqTest extends TestCase
readTimeout: 1, readTimeout: 1,
), ),
); );
$subscriber = new Subscriber($consumer); $generator = $consumer->generator();
$generator = $subscriber->run();
/** @var null|Message $message */ /** @var null|Message $message */
$message = $generator->current(); $message = $generator->current();
@ -88,7 +86,7 @@ final class NsqTest extends TestCase
$message->finish(); $message->finish();
self::assertTrue($consumer->isReady()); self::assertTrue($consumer->isReady());
$generator->send(Subscriber::STOP); $generator->send(0);
self::assertFalse($consumer->isReady()); self::assertFalse($consumer->isReady());
} }
} }