Refactoring: Extend Reader and Writer from Connection

This commit is contained in:
2021-01-20 16:02:06 +03:00
parent fdd65cb547
commit 0a926ce607
6 changed files with 124 additions and 162 deletions

View File

@ -10,13 +10,17 @@ use Socket\Raw\Factory;
use Socket\Raw\Socket; use Socket\Raw\Socket;
use Throwable; use Throwable;
use function json_encode; use function json_encode;
use function microtime;
use function pack; use function pack;
use function sprintf; use function sprintf;
use const JSON_FORCE_OBJECT; use const JSON_FORCE_OBJECT;
use const JSON_THROW_ON_ERROR; use const JSON_THROW_ON_ERROR;
use const PHP_EOL; use const PHP_EOL;
final class Connection /**
* @internal
*/
abstract class Connection
{ {
private const OK = 'OK'; private const OK = 'OK';
private const HEARTBEAT = '_heartbeat_'; private const HEARTBEAT = '_heartbeat_';
@ -31,27 +35,47 @@ final class Connection
private const BYTES_ID = 16; private const BYTES_ID = 16;
private const MAGIC_V2 = ' V2'; private const MAGIC_V2 = ' V2';
public Socket $socket; public ?Socket $socket = null;
public bool $closed = false; public bool $closed = false;
private function __construct(Socket $socket) private Config $config;
public function __construct(string $address)
{ {
$this->socket = $socket; $this->config = new Config($address);
} }
/** /**
* @psalm-suppress UnsafeInstantiation * @psalm-suppress UnsafeInstantiation
*
* @return static
*/ */
public static function connect(Config $config): self public function connect(): void
{ {
$socket = (new Factory())->createClient($config->address); $this->socket = (new Factory())->createClient($this->config->address);
$socket->write(self::MAGIC_V2); $this->socket->write(self::MAGIC_V2);
}
// @phpstan-ignore-next-line /**
return new self($socket); * Cleanly close your connection (no more messages are sent).
*/
public function disconnect(): void
{
if ($this->closed) {
return;
}
try {
$this->write('CLS'.PHP_EOL);
$this->consume(); // receive CLOSE_WAIT
if (null !== $this->socket) {
$this->socket->close();
}
} catch (Throwable $e) {
// Not interested
}
$this->closed = true;
} }
/** /**
@ -59,7 +83,7 @@ final class Connection
* *
* @psalm-suppress PossiblyFalseOperand * @psalm-suppress PossiblyFalseOperand
*/ */
public function identify(array $arr): string protected function identify(array $arr): string
{ {
$body = json_encode($arr, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT); $body = json_encode($arr, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
$size = pack('N', \strlen($body)); $size = pack('N', \strlen($body));
@ -70,21 +94,22 @@ final class Connection
/** /**
* @psalm-suppress PossiblyFalseOperand * @psalm-suppress PossiblyFalseOperand
*/ */
public function auth(string $secret): string protected function auth(string $secret): string
{ {
$size = pack('N', \strlen($secret)); $size = pack('N', \strlen($secret));
return 'AUTH'.PHP_EOL.$size.$secret; return 'AUTH'.PHP_EOL.$size.$secret;
} }
/**
* @internal
*/
public function write(string $buffer): void public function write(string $buffer): void
{ {
if ($this->closed) { $socket = $this->socket();
throw new LogicException('This connection is closed, create new one.');
}
try { try {
$this->socket->write($buffer); $socket->write($buffer);
} catch (Throwable $e) { } catch (Throwable $e) {
$this->closed = true; $this->closed = true;
@ -92,9 +117,15 @@ final class Connection
} }
} }
public function read(): ?Message protected function consume(?float $timeout = 0): ?Message
{ {
$socket = $this->socket; $deadline = microtime(true) + ($timeout ?? 0);
$socket = $this->socket();
if (false === $socket->selectRead($timeout)) {
return null;
}
$buffer = new ByteBuffer($socket->read(self::BYTES_SIZE + self::BYTES_TYPE)); $buffer = new ByteBuffer($socket->read(self::BYTES_SIZE + self::BYTES_TYPE));
$size = $buffer->consumeUint32(); $size = $buffer->consumeUint32();
@ -105,14 +136,21 @@ final class Connection
if (self::TYPE_RESPONSE === $type) { if (self::TYPE_RESPONSE === $type) {
$response = $buffer->consume($size - self::BYTES_TYPE); $response = $buffer->consume($size - self::BYTES_TYPE);
$isInternalMessage = false;
if (self::OK === $response || self::CLOSE_WAIT === $response) { if (self::OK === $response || self::CLOSE_WAIT === $response) {
return null; $isInternalMessage = true;
} }
if (self::HEARTBEAT === $response) { if (self::HEARTBEAT === $response) {
$socket->write('NOP'.PHP_EOL); $socket->write('NOP'.PHP_EOL);
return null; $isInternalMessage = true;
}
if ($isInternalMessage) {
return $this->consume(
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
);
} }
throw new LogicException(sprintf('Unexpected response from nsq: "%s"', $response)); throw new LogicException(sprintf('Unexpected response from nsq: "%s"', $response));
@ -133,4 +171,17 @@ final class Connection
return new Message($timestamp, $attempts, $id, $body); return new Message($timestamp, $attempts, $id, $body);
} }
private function socket(): Socket
{
if ($this->closed) {
throw new LogicException('This connection is closed, create new one.');
}
if (null === $this->socket) {
$this->connect();
}
return $this->socket;
}
} }

View File

@ -4,48 +4,51 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
/** use LogicException;
* @psalm-immutable
*/
final class Envelope final class Envelope
{ {
/**
* @psalm-readonly
*/
public Message $message; public Message $message;
/** private bool $finished = false;
* @var callable
*/
private $acknowledge;
/** private Reader $connection;
* @var callable
*/
private $requeue;
/** public function __construct(Message $message, Reader $connection)
* @var callable
*/
private $touching;
public function __construct(Message $message, callable $ack, callable $req, callable $touch)
{ {
$this->message = $message; $this->message = $message;
$this->acknowledge = $ack; $this->connection = $connection;
$this->requeue = $req;
$this->touching = $touch;
} }
public function ack(): void public function finish(): void
{ {
\call_user_func($this->acknowledge); if ($this->finished) {
throw new LogicException('Can\'t finish message as it already finished.');
} }
public function retry(int $timeout): void $this->connection->fin($this->message->id);
$this->finished = true;
}
public function requeue(int $timeout): void
{ {
\call_user_func($this->requeue, $timeout); if ($this->finished) {
throw new LogicException('Can\'t requeue message as it already finished.');
}
$this->connection->req($this->message->id, $timeout);
$this->finished = true;
} }
public function touch(): void public function touch(): void
{ {
\call_user_func($this->touching); if ($this->finished) {
throw new LogicException('Can\'t touch message as it already finished.');
}
$this->connection->touch($this->message->id);
} }
} }

View File

@ -4,24 +4,8 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Throwable; class Reader extends Connection
use function sprintf;
use const PHP_EOL;
class Reader
{ {
private Connection $connection;
public function __construct(Connection $connection)
{
$this->connection = $connection;
}
public function __destruct()
{
$this->close();
}
/** /**
* Subscribe to a topic/channel. * Subscribe to a topic/channel.
*/ */
@ -29,8 +13,8 @@ class Reader
{ {
$buffer = sprintf('SUB %s %s', $topic, $channel).PHP_EOL; $buffer = sprintf('SUB %s %s', $topic, $channel).PHP_EOL;
$this->connection->write($buffer); $this->write($buffer);
$this->connection->read(); $this->consume();
} }
/** /**
@ -38,7 +22,7 @@ class Reader
*/ */
public function rdy(int $count): void public function rdy(int $count): void
{ {
$this->connection->write('RDY '.$count.PHP_EOL); $this->write('RDY '.$count.PHP_EOL);
} }
/** /**
@ -46,7 +30,7 @@ class Reader
*/ */
public function fin(string $id): void public function fin(string $id): void
{ {
$this->connection->write('FIN '.$id.PHP_EOL); $this->write('FIN '.$id.PHP_EOL);
} }
/** /**
@ -57,7 +41,7 @@ class Reader
*/ */
public function req(string $id, int $timeout): void public function req(string $id, int $timeout): void
{ {
$this->connection->write(sprintf('REQ %s %s', $id, $timeout).PHP_EOL); $this->write(sprintf('REQ %s %s', $id, $timeout).PHP_EOL);
} }
/** /**
@ -65,35 +49,6 @@ class Reader
*/ */
public function touch(string $id): void public function touch(string $id): void
{ {
$this->connection->write('TOUCH '.$id.PHP_EOL); $this->write('TOUCH '.$id.PHP_EOL);
}
public function consume(?float $timeout = null): ?Message
{
if (false === $this->connection->socket->selectRead($timeout)) {
return null;
}
return $this->connection->read() ?? $this->consume(0);
}
/**
* Cleanly close your connection (no more messages are sent).
*/
public function close(): void
{
if ($this->connection->closed) {
return;
}
$this->connection->closed = true;
$this->connection->socket->write('CLS'.PHP_EOL);
$this->connection->read();
try {
$this->connection->socket->close();
} catch (Throwable $e) {
}
} }
} }

View File

@ -5,28 +5,19 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Generator; use Generator;
use LogicException;
final class Subscriber final class Subscriber extends Reader
{ {
private Reader $reader;
public function __construct(Reader $reader)
{
$this->reader = $reader;
}
/** /**
* @psalm-return Generator<int, Envelope|null, true|null, void> * @psalm-return Generator<int, Envelope|null, true|null, void>
*/ */
public function subscribe(string $topic, string $channel, ?float $timeout = 0): Generator public function subscribe(string $topic, string $channel, ?float $timeout = 0): Generator
{ {
$reader = $this->reader; $this->sub($topic, $channel);
$reader->sub($topic, $channel); $this->rdy(1);
$reader->rdy(1);
while (true) { while (true) {
$message = $reader->consume($timeout); $message = $this->consume($timeout);
if (null === $message) { if (null === $message) {
if (true === yield null) { if (true === yield null) {
@ -36,39 +27,13 @@ final class Subscriber
continue; continue;
} }
$finished = false; if (true === yield new Envelope($message, $this)) {
$envelop = new Envelope(
$message,
static function () use ($reader, $message, &$finished): void {
if ($finished) {
throw new LogicException('Can\'t ack, message already finished.');
}
$finished = true;
$reader->fin($message->id);
},
static function (int $timeout) use ($reader, $message, &$finished): void {
if ($finished) {
throw new LogicException('Can\'t retry, message already finished.');
}
$finished = true;
$reader->req($message->id, $timeout);
},
static function () use ($reader, $message): void {
$reader->touch($message->id);
},
);
if (true === yield $envelop) {
break; break;
} }
$reader->rdy(1); $this->rdy(1);
} }
$reader->close(); $this->disconnect();
} }
} }

View File

@ -10,15 +10,8 @@ use function pack;
use function sprintf; use function sprintf;
use const PHP_EOL; use const PHP_EOL;
final class Writer final class Writer extends Connection
{ {
private Connection $connection;
public function __construct(Connection $connection)
{
$this->connection = $connection;
}
/** /**
* @psalm-suppress PossiblyFalseOperand * @psalm-suppress PossiblyFalseOperand
*/ */
@ -28,8 +21,8 @@ final class Writer
$buffer = 'PUB '.$topic.PHP_EOL.$size.$body; $buffer = 'PUB '.$topic.PHP_EOL.$size.$body;
$this->connection->write($buffer); $this->write($buffer);
$this->connection->read(); $this->consume();
} }
/** /**
@ -49,8 +42,8 @@ final class Writer
$buffer = 'MPUB '.$topic.PHP_EOL.$size.$num.$mb; $buffer = 'MPUB '.$topic.PHP_EOL.$size.$num.$mb;
$this->connection->write($buffer); $this->write($buffer);
$this->connection->read(); $this->consume();
} }
/** /**
@ -62,7 +55,7 @@ final class Writer
$buffer = sprintf('DPUB %s %s', $topic, $deferTime).PHP_EOL.$size.$body; $buffer = sprintf('DPUB %s %s', $topic, $deferTime).PHP_EOL.$size.$body;
$this->connection->write($buffer); $this->write($buffer);
$this->connection->read(); $this->consume();
} }
} }

View File

@ -2,10 +2,7 @@
declare(strict_types=1); declare(strict_types=1);
use Nsq\Config;
use Nsq\Connection;
use Nsq\Envelope; use Nsq\Envelope;
use Nsq\Reader;
use Nsq\Subscriber; use Nsq\Subscriber;
use Nsq\Writer; use Nsq\Writer;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
@ -14,12 +11,10 @@ final class NsqTest extends TestCase
{ {
public function test(): void public function test(): void
{ {
$config = new Config('tcp://localhost:4150'); $writer = new Writer('tcp://localhost:4150');
$writer = new Writer(Connection::connect($config));
$writer->pub(__FUNCTION__, __FUNCTION__); $writer->pub(__FUNCTION__, __FUNCTION__);
$subscriber = new Subscriber(new Reader(Connection::connect($config))); $subscriber = new Subscriber('tcp://localhost:4150');
$generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__, 1); $generator = $subscriber->subscribe(__FUNCTION__, __FUNCTION__, 1);
$envelope = $generator->current(); $envelope = $generator->current();
@ -27,7 +22,7 @@ final class NsqTest extends TestCase
static::assertInstanceOf(Envelope::class, $envelope); static::assertInstanceOf(Envelope::class, $envelope);
/** @var Envelope $envelope */ /** @var Envelope $envelope */
static::assertSame(__FUNCTION__, $envelope->message->body); static::assertSame(__FUNCTION__, $envelope->message->body);
$envelope->ack(); $envelope->finish();
$generator->next(); $generator->next();
static::assertNull($generator->current()); static::assertNull($generator->current());