diff --git a/src/Connection.php b/src/Connection.php index 28a5f52..69edcb6 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -17,6 +17,7 @@ use Socket\Raw\Exception; use Socket\Raw\Factory; use Socket\Raw\Socket; use function addcslashes; +use function implode; use function json_encode; use function pack; use const JSON_FORCE_OBJECT; @@ -84,12 +85,11 @@ abstract class Connection } // @codeCoverageIgnoreEnd - $this->send(' V2'); + $this->socket->write(' V2'); $body = json_encode($this->features, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT); - $size = pack('N', \strlen($body)); - $this->send('IDENTIFY '.PHP_EOL.$size.$body)->response()->okOrFail(); + $this->command('IDENTIFY', data: $body)->response()->okOrFail(); }); } @@ -130,10 +130,21 @@ abstract class Connection return 'AUTH'.PHP_EOL.$size.$secret; } - protected function send(string $buffer): self + /** + * @param array|string $params + */ + protected function command(string $command, array | string $params = [], string $data = null): self { $socket = $this->socket(); + $buffer = [] === $params ? $command : implode(' ', [$command, ...((array) $params)]); + $buffer .= PHP_EOL; + + if (null !== $data) { + $buffer .= pack('N', \strlen($data)); + $buffer .= $data; + } + $this->logger->debug('Send buffer: '.addcslashes($buffer, PHP_EOL)); try { @@ -148,7 +159,6 @@ abstract class Connection throw ConnectionFail::fromThrowable($e); } // @codeCoverageIgnoreEnd - return $this; } @@ -196,7 +206,7 @@ abstract class Connection $response = new Response($buffer); if ($response->isHeartBeat()) { - $this->send('NOP'.PHP_EOL); + $this->command('NOP'); return $this->receive( ($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime diff --git a/src/Consumer.php b/src/Consumer.php index 8488045..6859ede 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -13,9 +13,7 @@ final class Consumer extends Connection */ public function sub(string $topic, string $channel): void { - $buffer = sprintf('SUB %s %s', $topic, $channel).PHP_EOL; - - $this->send($buffer)->response()->okOrFail(); + $this->command('SUB', [$topic, $channel])->response()->okOrFail(); } /** @@ -27,7 +25,7 @@ final class Consumer extends Connection return; } - $this->send('RDY '.$count.PHP_EOL); + $this->command('RDY', (string) $count); $this->rdy = $count; } @@ -42,7 +40,7 @@ final class Consumer extends Connection */ public function fin(string $id): void { - $this->send('FIN '.$id.PHP_EOL); + $this->command('FIN', $id); --$this->rdy; } @@ -55,7 +53,7 @@ final class Consumer extends Connection */ public function req(string $id, int $timeout): void { - $this->send(sprintf('REQ %s %s', $id, $timeout).PHP_EOL); + $this->command('REQ', [$id, $timeout]); --$this->rdy; } @@ -65,6 +63,6 @@ final class Consumer extends Connection */ public function touch(string $id): void { - $this->send('TOUCH '.$id.PHP_EOL); + $this->command('TOUCH', $id); } } diff --git a/src/Producer.php b/src/Producer.php index fd74bb5..f20bcbd 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -7,8 +7,6 @@ namespace Nsq; use function array_map; use function implode; use function pack; -use function sprintf; -use const PHP_EOL; final class Producer extends Connection { @@ -17,11 +15,7 @@ final class Producer extends Connection */ public function pub(string $topic, string $body): void { - $size = pack('N', \strlen($body)); - - $buffer = 'PUB '.$topic.PHP_EOL.$size.$body; - - $this->send($buffer)->response()->okOrFail(); + $this->command('PUB', $topic, $body)->response()->okOrFail(); } /** @@ -37,11 +31,7 @@ final class Producer extends Connection return pack('N', \strlen($body)).$body; }, $bodies)); - $size = pack('N', \strlen($num.$mb)); - - $buffer = 'MPUB '.$topic.PHP_EOL.$size.$num.$mb; - - $this->send($buffer)->response()->okOrFail(); + $this->command('MPUB', $topic, $num.$mb)->response()->okOrFail(); } /** @@ -49,10 +39,6 @@ final class Producer extends Connection */ public function dpub(string $topic, int $deferTime, string $body): void { - $size = pack('N', \strlen($body)); - - $buffer = sprintf('DPUB %s %s', $topic, $deferTime).PHP_EOL.$size.$body; - - $this->send($buffer)->response()->okOrFail(); + $this->command('DPUB', [$topic, $deferTime], $body)->response()->okOrFail(); } }