Refactoring: Add typed Connection::command method if favor of send
This commit is contained in:
@ -17,6 +17,7 @@ use Socket\Raw\Exception;
|
|||||||
use Socket\Raw\Factory;
|
use Socket\Raw\Factory;
|
||||||
use Socket\Raw\Socket;
|
use Socket\Raw\Socket;
|
||||||
use function addcslashes;
|
use function addcslashes;
|
||||||
|
use function implode;
|
||||||
use function json_encode;
|
use function json_encode;
|
||||||
use function pack;
|
use function pack;
|
||||||
use const JSON_FORCE_OBJECT;
|
use const JSON_FORCE_OBJECT;
|
||||||
@ -84,12 +85,11 @@ abstract class Connection
|
|||||||
}
|
}
|
||||||
// @codeCoverageIgnoreEnd
|
// @codeCoverageIgnoreEnd
|
||||||
|
|
||||||
$this->send(' V2');
|
$this->socket->write(' V2');
|
||||||
|
|
||||||
$body = json_encode($this->features, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
$body = json_encode($this->features, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
||||||
$size = pack('N', \strlen($body));
|
|
||||||
|
|
||||||
$this->send('IDENTIFY '.PHP_EOL.$size.$body)->response()->okOrFail();
|
$this->command('IDENTIFY', data: $body)->response()->okOrFail();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -130,10 +130,21 @@ abstract class Connection
|
|||||||
return 'AUTH'.PHP_EOL.$size.$secret;
|
return 'AUTH'.PHP_EOL.$size.$secret;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function send(string $buffer): self
|
/**
|
||||||
|
* @param array<int, int|string>|string $params
|
||||||
|
*/
|
||||||
|
protected function command(string $command, array | string $params = [], string $data = null): self
|
||||||
{
|
{
|
||||||
$socket = $this->socket();
|
$socket = $this->socket();
|
||||||
|
|
||||||
|
$buffer = [] === $params ? $command : implode(' ', [$command, ...((array) $params)]);
|
||||||
|
$buffer .= PHP_EOL;
|
||||||
|
|
||||||
|
if (null !== $data) {
|
||||||
|
$buffer .= pack('N', \strlen($data));
|
||||||
|
$buffer .= $data;
|
||||||
|
}
|
||||||
|
|
||||||
$this->logger->debug('Send buffer: '.addcslashes($buffer, PHP_EOL));
|
$this->logger->debug('Send buffer: '.addcslashes($buffer, PHP_EOL));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -148,7 +159,6 @@ abstract class Connection
|
|||||||
throw ConnectionFail::fromThrowable($e);
|
throw ConnectionFail::fromThrowable($e);
|
||||||
}
|
}
|
||||||
// @codeCoverageIgnoreEnd
|
// @codeCoverageIgnoreEnd
|
||||||
|
|
||||||
return $this;
|
return $this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -196,7 +206,7 @@ abstract class Connection
|
|||||||
$response = new Response($buffer);
|
$response = new Response($buffer);
|
||||||
|
|
||||||
if ($response->isHeartBeat()) {
|
if ($response->isHeartBeat()) {
|
||||||
$this->send('NOP'.PHP_EOL);
|
$this->command('NOP');
|
||||||
|
|
||||||
return $this->receive(
|
return $this->receive(
|
||||||
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
|
($currentTime = microtime(true)) > $deadline ? 0 : $deadline - $currentTime
|
||||||
|
@ -13,9 +13,7 @@ final class Consumer extends Connection
|
|||||||
*/
|
*/
|
||||||
public function sub(string $topic, string $channel): void
|
public function sub(string $topic, string $channel): void
|
||||||
{
|
{
|
||||||
$buffer = sprintf('SUB %s %s', $topic, $channel).PHP_EOL;
|
$this->command('SUB', [$topic, $channel])->response()->okOrFail();
|
||||||
|
|
||||||
$this->send($buffer)->response()->okOrFail();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -27,7 +25,7 @@ final class Consumer extends Connection
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->send('RDY '.$count.PHP_EOL);
|
$this->command('RDY', (string) $count);
|
||||||
|
|
||||||
$this->rdy = $count;
|
$this->rdy = $count;
|
||||||
}
|
}
|
||||||
@ -42,7 +40,7 @@ final class Consumer extends Connection
|
|||||||
*/
|
*/
|
||||||
public function fin(string $id): void
|
public function fin(string $id): void
|
||||||
{
|
{
|
||||||
$this->send('FIN '.$id.PHP_EOL);
|
$this->command('FIN', $id);
|
||||||
|
|
||||||
--$this->rdy;
|
--$this->rdy;
|
||||||
}
|
}
|
||||||
@ -55,7 +53,7 @@ final class Consumer extends Connection
|
|||||||
*/
|
*/
|
||||||
public function req(string $id, int $timeout): void
|
public function req(string $id, int $timeout): void
|
||||||
{
|
{
|
||||||
$this->send(sprintf('REQ %s %s', $id, $timeout).PHP_EOL);
|
$this->command('REQ', [$id, $timeout]);
|
||||||
|
|
||||||
--$this->rdy;
|
--$this->rdy;
|
||||||
}
|
}
|
||||||
@ -65,6 +63,6 @@ final class Consumer extends Connection
|
|||||||
*/
|
*/
|
||||||
public function touch(string $id): void
|
public function touch(string $id): void
|
||||||
{
|
{
|
||||||
$this->send('TOUCH '.$id.PHP_EOL);
|
$this->command('TOUCH', $id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,8 +7,6 @@ namespace Nsq;
|
|||||||
use function array_map;
|
use function array_map;
|
||||||
use function implode;
|
use function implode;
|
||||||
use function pack;
|
use function pack;
|
||||||
use function sprintf;
|
|
||||||
use const PHP_EOL;
|
|
||||||
|
|
||||||
final class Producer extends Connection
|
final class Producer extends Connection
|
||||||
{
|
{
|
||||||
@ -17,11 +15,7 @@ final class Producer extends Connection
|
|||||||
*/
|
*/
|
||||||
public function pub(string $topic, string $body): void
|
public function pub(string $topic, string $body): void
|
||||||
{
|
{
|
||||||
$size = pack('N', \strlen($body));
|
$this->command('PUB', $topic, $body)->response()->okOrFail();
|
||||||
|
|
||||||
$buffer = 'PUB '.$topic.PHP_EOL.$size.$body;
|
|
||||||
|
|
||||||
$this->send($buffer)->response()->okOrFail();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -37,11 +31,7 @@ final class Producer extends Connection
|
|||||||
return pack('N', \strlen($body)).$body;
|
return pack('N', \strlen($body)).$body;
|
||||||
}, $bodies));
|
}, $bodies));
|
||||||
|
|
||||||
$size = pack('N', \strlen($num.$mb));
|
$this->command('MPUB', $topic, $num.$mb)->response()->okOrFail();
|
||||||
|
|
||||||
$buffer = 'MPUB '.$topic.PHP_EOL.$size.$num.$mb;
|
|
||||||
|
|
||||||
$this->send($buffer)->response()->okOrFail();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -49,10 +39,6 @@ final class Producer extends Connection
|
|||||||
*/
|
*/
|
||||||
public function dpub(string $topic, int $deferTime, string $body): void
|
public function dpub(string $topic, int $deferTime, string $body): void
|
||||||
{
|
{
|
||||||
$size = pack('N', \strlen($body));
|
$this->command('DPUB', [$topic, $deferTime], $body)->response()->okOrFail();
|
||||||
|
|
||||||
$buffer = sprintf('DPUB %s %s', $topic, $deferTime).PHP_EOL.$size.$body;
|
|
||||||
|
|
||||||
$this->send($buffer)->response()->okOrFail();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user