Decouple Consumer and Producer from direct call Streams

This commit is contained in:
2021-09-18 22:08:41 +03:00
parent aa3333bfba
commit 083bc44c9c
3 changed files with 41 additions and 20 deletions

View File

@ -5,6 +5,7 @@ declare(strict_types=1);
namespace Nsq; namespace Nsq;
use Amp\ByteStream\ClosedException; use Amp\ByteStream\ClosedException;
use Amp\Failure;
use Amp\Promise; use Amp\Promise;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Config\ServerConfig; use Nsq\Config\ServerConfig;
@ -23,7 +24,7 @@ use function Amp\call;
*/ */
abstract class Connection abstract class Connection
{ {
protected Stream $stream; private Stream $stream;
/** /**
* @var callable * @var callable
@ -42,7 +43,7 @@ abstract class Connection
public function __destruct() public function __destruct()
{ {
$this->close(); $this->close(false);
} }
public function isConnected(): bool public function isConnected(): bool
@ -145,6 +146,28 @@ abstract class Connection
return $this; return $this;
} }
protected function read(): Promise
{
try {
return $this->stream->read();
} catch (\Throwable $e) {
$this->close(false);
return new Failure($e);
}
}
protected function write(string $data): Promise
{
try {
return $this->stream->write($data);
} catch (\Throwable $e) {
$this->close(false);
return new Failure($e);
}
}
protected function handleError(Frame\Error $error): void protected function handleError(Frame\Error $error): void
{ {
$this->logger->error($error->data); $this->logger->error($error->data);

View File

@ -9,7 +9,6 @@ use Amp\Promise;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Exception\ConsumerException; use Nsq\Exception\ConsumerException;
use Nsq\Frame\Response; use Nsq\Frame\Response;
use Nsq\Stream\NullStream;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger; use Psr\Log\NullLogger;
use function Amp\asyncCall; use function Amp\asyncCall;
@ -61,7 +60,7 @@ final class Consumer extends Connection
public function connect(): Promise public function connect(): Promise
{ {
if (!$this->stream instanceof NullStream) { if ($this->isConnected()) {
return call(static function (): void { return call(static function (): void {
}); });
} }
@ -84,9 +83,9 @@ final class Consumer extends Connection
$buffer = new Buffer(); $buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator { asyncCall(function () use ($buffer): \Generator {
yield $this->stream->write(Command::sub($this->topic, $this->channel)); yield $this->write(Command::sub($this->topic, $this->channel));
if (null !== ($chunk = yield $this->stream->read())) { if (null !== ($chunk = yield $this->read())) {
$buffer->append($chunk); $buffer->append($chunk);
} }
@ -101,14 +100,14 @@ final class Consumer extends Connection
/** @phpstan-ignore-next-line */ /** @phpstan-ignore-next-line */
asyncCall(function () use ($buffer): \Generator { asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->stream->read()) { while (null !== $chunk = yield $this->read()) {
$buffer->append($chunk); $buffer->append($chunk);
while ($frame = Parser::parse($buffer)) { while ($frame = Parser::parse($buffer)) {
switch (true) { switch (true) {
case $frame instanceof Frame\Response: case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) { if ($frame->isHeartBeat()) {
yield $this->stream->write(Command::nop()); yield $this->write(Command::nop());
break; break;
} }
@ -136,7 +135,7 @@ final class Consumer extends Connection
'channel' => $this->channel, 'channel' => $this->channel,
]); ]);
$this->stream = new NullStream(); $this->close(false);
}); });
}); });
} }
@ -155,7 +154,7 @@ final class Consumer extends Connection
$this->rdy = $count; $this->rdy = $count;
return $this->stream->write(Command::rdy($count)); return $this->write(Command::rdy($count));
} }
/** /**
@ -167,7 +166,7 @@ final class Consumer extends Connection
*/ */
public function fin(string $id): Promise public function fin(string $id): Promise
{ {
return $this->stream->write(Command::fin($id)); return $this->write(Command::fin($id));
} }
/** /**
@ -182,7 +181,7 @@ final class Consumer extends Connection
*/ */
public function req(string $id, int $timeout): Promise public function req(string $id, int $timeout): Promise
{ {
return $this->stream->write(Command::req($id, $timeout)); return $this->write(Command::req($id, $timeout));
} }
/** /**
@ -194,6 +193,6 @@ final class Consumer extends Connection
*/ */
public function touch(string $id): Promise public function touch(string $id): Promise
{ {
return $this->stream->write(Command::touch($id)); return $this->write(Command::touch($id));
} }
} }

View File

@ -7,7 +7,6 @@ namespace Nsq;
use Amp\Promise; use Amp\Promise;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Exception\NsqException; use Nsq\Exception\NsqException;
use Nsq\Stream\NullStream;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger; use Psr\Log\NullLogger;
use function Amp\asyncCall; use function Amp\asyncCall;
@ -41,7 +40,7 @@ final class Producer extends Connection
public function connect(): Promise public function connect(): Promise
{ {
if (!$this->stream instanceof NullStream) { if ($this->isConnected()) {
return call(static function (): void { return call(static function (): void {
}); });
} }
@ -70,7 +69,7 @@ final class Producer extends Connection
return call( return call(
function (array $bodies) use ($topic, $delay): \Generator { function (array $bodies) use ($topic, $delay): \Generator {
foreach ($bodies as $body) { foreach ($bodies as $body) {
yield $this->stream->write(Command::dpub($topic, $body, $delay)); yield $this->write(Command::dpub($topic, $body, $delay));
} }
}, },
(array) $body, (array) $body,
@ -81,7 +80,7 @@ final class Producer extends Connection
? Command::mpub($topic, $body) ? Command::mpub($topic, $body)
: Command::pub($topic, $body); : Command::pub($topic, $body);
return $this->stream->write($command); return $this->write($command);
} }
private function run(): void private function run(): void
@ -89,14 +88,14 @@ final class Producer extends Connection
$buffer = new Buffer(); $buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator { asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->stream->read()) { while (null !== $chunk = yield $this->read()) {
$buffer->append($chunk); $buffer->append($chunk);
while ($frame = Parser::parse($buffer)) { while ($frame = Parser::parse($buffer)) {
switch (true) { switch (true) {
case $frame instanceof Frame\Response: case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) { if ($frame->isHeartBeat()) {
yield $this->stream->write(Command::nop()); yield $this->write(Command::nop());
} }
// Ok received // Ok received
@ -111,7 +110,7 @@ final class Producer extends Connection
} }
} }
$this->stream = new NullStream(); $this->close(false);
}); });
} }
} }