diff --git a/src/Consumer.php b/src/Consumer.php index bf93f69..00c40cd 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -9,6 +9,7 @@ use Amp\Promise; use Nsq\Config\ClientConfig; use Nsq\Exception\ConsumerException; use Nsq\Frame\Response; +use Nsq\Stream\NullStream; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use function Amp\asyncCall; @@ -60,6 +61,11 @@ final class Consumer extends Connection implements ConsumerInterface public function connect(): Promise { + if (!$this->stream instanceof NullStream) { + return call(static function (): void { + }); + } + return call(function (): \Generator { yield parent::connect(); @@ -113,6 +119,8 @@ final class Consumer extends Connection implements ConsumerInterface } } } + + $this->stream = new NullStream(); }); }); } diff --git a/src/Producer.php b/src/Producer.php index 48f62f1..d503cd4 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -7,6 +7,7 @@ namespace Nsq; use Amp\Promise; use Nsq\Config\ClientConfig; use Nsq\Exception\NsqException; +use Nsq\Stream\NullStream; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use function Amp\asyncCall; @@ -28,6 +29,11 @@ final class Producer extends Connection public function connect(): Promise { + if (!$this->stream instanceof NullStream) { + return call(static function (): void { + }); + } + return call(function (): \Generator { yield parent::connect(); @@ -83,6 +89,8 @@ final class Producer extends Connection } } } + + $this->stream = new NullStream(); }); } } diff --git a/src/Reader.php b/src/Reader.php index 5b4e0e7..a823682 100644 --- a/src/Reader.php +++ b/src/Reader.php @@ -10,6 +10,7 @@ use Amp\Success; use Nsq\Config\ClientConfig; use Nsq\Exception\ConsumerException; use Nsq\Frame\Response; +use Nsq\Stream\NullStream; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use function Amp\asyncCall; @@ -64,6 +65,11 @@ final class Reader extends Connection implements ConsumerInterface */ public function connect(): Promise { + if (!$this->stream instanceof NullStream) { + return call(static function (): void { + }); + } + return call(function (): \Generator { yield parent::connect(); @@ -131,6 +137,8 @@ final class Reader extends Connection implements ConsumerInterface } } } + + $this->stream = new NullStream(); } ); });