This commit is contained in:
2021-02-26 00:59:52 +03:00
committed by GitHub
parent 9cefa847a9
commit e670cb161c
54 changed files with 1410 additions and 1280 deletions

View File

@ -4,108 +4,148 @@ declare(strict_types=1);
namespace Nsq;
use Generator;
use Amp\Failure;
use Amp\Promise;
use Nsq\Config\ClientConfig;
use Nsq\Exception\NsqError;
use Nsq\Exception\NsqException;
use Nsq\Protocol\Error;
use Nsq\Protocol\Message;
use Nsq\Protocol\Response;
use Nsq\Exception\ConsumerException;
use Nsq\Frame\Response;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function Amp\asyncCall;
use function Amp\call;
final class Consumer extends Connection
final class Consumer extends Connection implements ConsumerInterface
{
private int $rdy = 0;
/**
* @var callable
*/
private $onMessage;
public function __construct(
private string $address,
private string $topic,
private string $channel,
string $address,
ClientConfig $clientConfig = null,
LoggerInterface $logger = null
callable $onMessage,
ClientConfig $clientConfig,
private LoggerInterface $logger,
) {
parent::__construct($address, $clientConfig, $logger);
parent::__construct(
$this->address,
$clientConfig,
$this->logger,
);
$this->onMessage = $onMessage;
}
/**
* @psalm-return Generator<int, Message|float|null, int|null, void>
*/
public function generator(): \Generator
{
$this->command('SUB', [$this->topic, $this->channel])->checkIsOK();
while (true) {
$this->rdy(1);
$timeout = $this->clientConfig->readTimeout;
do {
$deadline = microtime(true) + $timeout;
$message = $this->hasMessage($timeout) ? $this->readMessage() : null;
$timeout = ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime;
} while (0 < $timeout && null === $message);
$command = yield $message;
if (0 === $command) {
break;
}
}
$this->close();
public static function create(
string $address,
string $topic,
string $channel,
callable $onMessage,
?ClientConfig $clientConfig = null,
?LoggerInterface $logger = null,
): self {
return new self(
$address,
$topic,
$channel,
$onMessage,
$clientConfig ?? new ClientConfig(),
$logger ?? new NullLogger(),
);
}
public function readMessage(): ?Message
public function connect(): Promise
{
$frame = $this->readFrame();
return call(function (): \Generator {
yield parent::connect();
if ($frame instanceof Message) {
return $frame;
}
$this->run();
});
}
if ($frame instanceof Response && $frame->isHeartBeat()) {
$this->command('NOP');
private function run(): void
{
$buffer = new Buffer();
return null;
}
asyncCall(function () use ($buffer): \Generator {
yield $this->stream->write(Command::sub($this->topic, $this->channel));
if ($frame instanceof Error) {
if ($frame->type->terminateConnection) {
$this->close();
if (null !== ($chunk = yield $this->stream->read())) {
$buffer->append($chunk);
}
throw new NsqError($frame);
}
/** @var Response $response */
$response = Parser::parse($buffer);
throw new NsqException('Unreachable statement.');
if (!$response->isOk()) {
return new Failure(new ConsumerException('Fail subscription.'));
}
yield $this->rdy(2500);
/** @phpstan-ignore-next-line */
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->stream->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->stream->write(Command::nop());
break;
}
throw ConsumerException::response($frame);
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
case $frame instanceof Frame\Message:
asyncCall($this->onMessage, Message::compose($frame, $this));
break;
}
}
}
});
});
}
/**
* Update RDY state (indicate you are ready to receive N messages).
*
* @return Promise<void>
*/
public function rdy(int $count): void
public function rdy(int $count): Promise
{
if ($this->rdy === $count) {
return;
return call(static function (): void {
});
}
$this->command('RDY', (string) $count);
$this->rdy = $count;
return $this->stream->write(Command::rdy($count));
}
/**
* Finish a message (indicate successful processing).
*
* @return Promise<void>
*
* @internal
*/
public function fin(string $id): void
public function fin(string $id): Promise
{
$this->command('FIN', $id);
--$this->rdy;
return $this->stream->write(Command::fin($id));
}
/**
@ -114,22 +154,26 @@ final class Consumer extends Connection
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
* behaves identically to an explicit REQ.
*
* @return Promise<void>
*
* @internal
*/
public function req(string $id, int $timeout): void
public function req(string $id, int $timeout): Promise
{
$this->command('REQ', [$id, $timeout]);
--$this->rdy;
return $this->stream->write(Command::req($id, $timeout));
}
/**
* Reset the timeout for an in-flight message.
*
* @return Promise<void>
*
* @internal
*/
public function touch(string $id): void
public function touch(string $id): Promise
{
$this->command('TOUCH', $id);
return $this->stream->write(Command::touch($id));
}
}