Remove Reader and ConsumerInterface
This commit is contained in:
@ -1,39 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
require __DIR__.'/../vendor/autoload.php';
|
||||
|
||||
use Amp\ByteStream;
|
||||
use Amp\Log\ConsoleFormatter;
|
||||
use Amp\Log\StreamHandler;
|
||||
use Monolog\Logger;
|
||||
use Monolog\Processor\PsrLogMessageProcessor;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Reader;
|
||||
use function Amp\Promise\wait;
|
||||
|
||||
$handler = new StreamHandler(ByteStream\getStdout());
|
||||
$handler->setFormatter(new ConsoleFormatter());
|
||||
$logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]);
|
||||
|
||||
$reader = new Reader(
|
||||
'tcp://localhost:4150',
|
||||
topic: 'local',
|
||||
channel: 'local',
|
||||
clientConfig: new ClientConfig(
|
||||
deflate: false,
|
||||
snappy: false,
|
||||
),
|
||||
logger: $logger,
|
||||
);
|
||||
|
||||
wait($reader->connect());
|
||||
|
||||
while (true) {
|
||||
$message = wait($reader->consume());
|
||||
|
||||
$logger->info('Received: {body}', ['body' => $message->body]);
|
||||
|
||||
wait($message->finish());
|
||||
}
|
@ -15,7 +15,7 @@ use Psr\Log\NullLogger;
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\call;
|
||||
|
||||
final class Consumer extends Connection implements ConsumerInterface
|
||||
final class Consumer extends Connection
|
||||
{
|
||||
private int $rdy = 0;
|
||||
|
||||
|
@ -1,47 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\Promise;
|
||||
|
||||
interface ConsumerInterface
|
||||
{
|
||||
/**
|
||||
* Update RDY state (indicate you are ready to receive N messages).
|
||||
*
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public function rdy(int $count): Promise;
|
||||
|
||||
/**
|
||||
* Finish a message (indicate successful processing).
|
||||
*
|
||||
* @return Promise<void>
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
public function fin(string $id): Promise;
|
||||
|
||||
/**
|
||||
* Re-queue a message (indicate failure to process) The re-queued message is placed at the tail of the queue,
|
||||
* equivalent to having just published it, but for various implementation specific reasons that behavior should not
|
||||
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
|
||||
* behaves identically to an explicit REQ.
|
||||
*
|
||||
* @return Promise<void>
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
public function req(string $id, int $timeout): Promise;
|
||||
|
||||
/**
|
||||
* Reset the timeout for an in-flight message.
|
||||
*
|
||||
* @return Promise<void>
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
public function touch(string $id): Promise;
|
||||
}
|
@ -17,11 +17,11 @@ final class Message
|
||||
public string $body,
|
||||
public int $timestamp,
|
||||
public int $attempts,
|
||||
private ConsumerInterface $consumer,
|
||||
private Consumer $consumer,
|
||||
) {
|
||||
}
|
||||
|
||||
public static function compose(Frame\Message $message, ConsumerInterface $consumer): self
|
||||
public static function compose(Frame\Message $message, Consumer $consumer): self
|
||||
{
|
||||
return new self(
|
||||
$message->id,
|
||||
|
222
src/Reader.php
222
src/Reader.php
@ -1,222 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq;
|
||||
|
||||
use Amp\Deferred;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use Nsq\Config\ClientConfig;
|
||||
use Nsq\Exception\ConsumerException;
|
||||
use Nsq\Frame\Response;
|
||||
use Nsq\Stream\NullStream;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\call;
|
||||
|
||||
final class Reader extends Connection implements ConsumerInterface
|
||||
{
|
||||
private int $rdy = 0;
|
||||
|
||||
/**
|
||||
* @var array<int, Deferred<Message>>
|
||||
*/
|
||||
private array $deferreds = [];
|
||||
|
||||
/**
|
||||
* @var array<int, Message>
|
||||
*/
|
||||
private array $messages = [];
|
||||
|
||||
public function __construct(
|
||||
private string $address,
|
||||
private string $topic,
|
||||
private string $channel,
|
||||
ClientConfig $clientConfig,
|
||||
private LoggerInterface $logger,
|
||||
) {
|
||||
parent::__construct(
|
||||
$this->address,
|
||||
$clientConfig,
|
||||
$this->logger,
|
||||
);
|
||||
}
|
||||
|
||||
public static function create(
|
||||
string $address,
|
||||
string $topic,
|
||||
string $channel,
|
||||
?ClientConfig $clientConfig = null,
|
||||
?LoggerInterface $logger = null,
|
||||
): self {
|
||||
return new self(
|
||||
$address,
|
||||
$topic,
|
||||
$channel,
|
||||
$clientConfig ?? new ClientConfig(),
|
||||
$logger ?? new NullLogger(),
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function connect(): Promise
|
||||
{
|
||||
if (!$this->stream instanceof NullStream) {
|
||||
return call(static function (): void {
|
||||
});
|
||||
}
|
||||
|
||||
return call(function (): \Generator {
|
||||
yield parent::connect();
|
||||
|
||||
$this->run();
|
||||
});
|
||||
}
|
||||
|
||||
private function run(): void
|
||||
{
|
||||
$buffer = new Buffer();
|
||||
|
||||
asyncCall(function () use ($buffer): \Generator {
|
||||
yield $this->stream->write(Command::sub($this->topic, $this->channel));
|
||||
|
||||
if (null !== ($chunk = yield $this->stream->read())) {
|
||||
$buffer->append($chunk);
|
||||
}
|
||||
|
||||
/** @var Response $response */
|
||||
$response = Parser::parse($buffer);
|
||||
|
||||
if (!$response->isOk()) {
|
||||
throw new ConsumerException('Fail subscription.');
|
||||
}
|
||||
|
||||
yield $this->rdy(1);
|
||||
|
||||
asyncCall(
|
||||
function () use ($buffer): \Generator {
|
||||
while (null !== $chunk = yield $this->stream->read()) {
|
||||
$buffer->append($chunk);
|
||||
|
||||
while ($frame = Parser::parse($buffer)) {
|
||||
switch (true) {
|
||||
case $frame instanceof Frame\Response:
|
||||
if ($frame->isHeartBeat()) {
|
||||
yield $this->stream->write(Command::nop());
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
throw ConsumerException::response($frame);
|
||||
case $frame instanceof Frame\Error:
|
||||
$this->handleError($frame);
|
||||
|
||||
$deferred = array_pop($this->deferreds);
|
||||
|
||||
if (null !== $deferred) {
|
||||
$deferred->fail($frame->toException());
|
||||
}
|
||||
|
||||
break;
|
||||
case $frame instanceof Frame\Message:
|
||||
$message = Message::compose($frame, $this);
|
||||
|
||||
$deferred = array_pop($this->deferreds);
|
||||
|
||||
if (null === $deferred) {
|
||||
$this->messages[] = $message;
|
||||
} else {
|
||||
$deferred->resolve($message);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$this->stream = new NullStream();
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Promise<Message>
|
||||
*/
|
||||
public function consume(): Promise
|
||||
{
|
||||
$message = array_pop($this->messages);
|
||||
|
||||
if (null !== $message) {
|
||||
return new Success($message);
|
||||
}
|
||||
|
||||
$this->deferreds[] = $deferred = new Deferred();
|
||||
|
||||
return $deferred->promise();
|
||||
}
|
||||
|
||||
/**
|
||||
* Update RDY state (indicate you are ready to receive N messages).
|
||||
*
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public function rdy(int $count): Promise
|
||||
{
|
||||
if ($this->rdy === $count) {
|
||||
return call(static function (): void {
|
||||
});
|
||||
}
|
||||
|
||||
$this->rdy = $count;
|
||||
|
||||
return $this->stream->write(Command::rdy($count));
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish a message (indicate successful processing).
|
||||
*
|
||||
* @return Promise<void>
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
public function fin(string $id): Promise
|
||||
{
|
||||
--$this->rdy;
|
||||
|
||||
return $this->stream->write(Command::fin($id));
|
||||
}
|
||||
|
||||
/**
|
||||
* Re-queue a message (indicate failure to process) The re-queued message is placed at the tail of the queue,
|
||||
* equivalent to having just published it, but for various implementation specific reasons that behavior should not
|
||||
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
|
||||
* behaves identically to an explicit REQ.
|
||||
*
|
||||
* @return Promise<void>
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
public function req(string $id, int $timeout): Promise
|
||||
{
|
||||
--$this->rdy;
|
||||
|
||||
return $this->stream->write(Command::req($id, $timeout));
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the timeout for an in-flight message.
|
||||
*
|
||||
* @return Promise<void>
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
public function touch(string $id): Promise
|
||||
{
|
||||
return $this->stream->write(Command::touch($id));
|
||||
}
|
||||
}
|
@ -4,7 +4,7 @@ declare(strict_types=1);
|
||||
|
||||
use Amp\Loop;
|
||||
use Amp\Success;
|
||||
use Nsq\ConsumerInterface;
|
||||
use Nsq\Consumer;
|
||||
use Nsq\Exception\MessageException;
|
||||
use Nsq\Message;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
@ -55,7 +55,7 @@ final class MessageTest extends TestCase
|
||||
*/
|
||||
public function messages(): Generator
|
||||
{
|
||||
$consumer = $this->createMock(ConsumerInterface::class);
|
||||
$consumer = $this->createMock(Consumer::class);
|
||||
$consumer->method('fin')->willReturn(new Success());
|
||||
$consumer->method('touch')->willReturn(new Success());
|
||||
$consumer->method('req')->willReturn(new Success());
|
||||
|
Reference in New Issue
Block a user