This commit is contained in:
2021-02-01 00:39:07 +03:00
parent 070b980003
commit 3d8f5be2d0
6 changed files with 261 additions and 14 deletions

View File

@ -34,7 +34,8 @@ Features
- [ ] Discovery - [ ] Discovery
- [ ] Backoff - [ ] Backoff
- [ ] TLS - [ ] TLS
- [ ] Snappy - [ ] Deflate
- [X] Snappy
- [X] Sampling - [X] Sampling
- [X] AUTH - [X] AUTH

View File

@ -16,7 +16,9 @@ use Nsq\Protocol\Error;
use Nsq\Protocol\Frame; use Nsq\Protocol\Frame;
use Nsq\Protocol\Message; use Nsq\Protocol\Message;
use Nsq\Protocol\Response; use Nsq\Protocol\Response;
use Nsq\Socket\DeflateSocket;
use Nsq\Socket\RawSocket; use Nsq\Socket\RawSocket;
use Nsq\Socket\SnappySocket;
use Psr\Log\LoggerAwareTrait; use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger; use Psr\Log\NullLogger;
@ -61,7 +63,24 @@ abstract class Connection
->toArray() ->toArray()
); );
if ($this->connectionConfig->snappy || $this->connectionConfig->deflate) { if ($this->connectionConfig->snappy) {
$this->socket = new NsqSocket(
new SnappySocket(
$socket,
$this->logger,
),
);
$this->checkIsOK();
}
if ($this->connectionConfig->deflate) {
$this->socket = new NsqSocket(
new DeflateSocket(
$socket,
),
);
$this->checkIsOK(); $this->checkIsOK();
} }
@ -116,6 +135,8 @@ abstract class Connection
? $command ? $command
: implode(' ', [$command, ...((array) $params)]); : implode(' ', [$command, ...((array) $params)]);
$this->logger->info('Command [{command}] with data [{data}]', ['command' => $command, 'data' => $data ?? 'null']);
$this->socket->write($command, $data); $this->socket->write($command, $data);
return $this; return $this;
@ -173,6 +194,8 @@ abstract class Connection
if (!$response->isOk()) { if (!$response->isOk()) {
throw new BadResponse($response); throw new BadResponse($response);
} }
$this->logger->info('Ok checked.');
} }
private function readResponse(): Response private function readResponse(): Response

View File

@ -0,0 +1,47 @@
<?php
declare(strict_types=1);
namespace Nsq\Socket;
use LogicException;
final class DeflateSocket implements Socket
{
public function __construct(
private Socket $socket,
) {
}
/**
* {@inheritDoc}
*/
public function write(string $data): void
{
throw new LogicException('not implemented.');
}
/**
* {@inheritDoc}
*/
public function read(int $length): string
{
throw new LogicException('not implemented.');
}
/**
* {@inheritDoc}
*/
public function close(): void
{
throw new LogicException('not implemented.');
}
/**
* {@inheritDoc}
*/
public function selectRead(float $timeout): bool
{
return $this->socket->selectRead($timeout);
}
}

154
src/Socket/SnappySocket.php Normal file
View File

@ -0,0 +1,154 @@
<?php
declare(strict_types=1);
namespace Nsq\Socket;
use PHPinnacle\Buffer\ByteBuffer;
use Psr\Log\LoggerInterface;
use function hash;
use function pack;
use function snappy_compress;
use function snappy_uncompress;
use function str_split;
use function unpack;
final class SnappySocket implements Socket
{
private ByteBuffer $output;
private ByteBuffer $input;
public function __construct(
private Socket $socket,
private LoggerInterface $logger,
) {
$this->output = new ByteBuffer();
$this->input = new ByteBuffer();
}
/**
* {@inheritDoc}
*/
public function write(string $data): void
{
$identifierFrame = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
$compressedFrame = 0x00;
$uncompressedFrame = 0x01; // 11
$maxChunkLength = 65536;
$byteBuffer = new ByteBuffer();
foreach ($identifierFrame as $bite) {
$byteBuffer->appendUint8($bite);
}
foreach (str_split($data, $maxChunkLength) as $chunk) {
$compressedChunk = snappy_compress($chunk);
[$chunk, $chunkType] = \strlen($compressedChunk) <= 0.875 * \strlen($data)
? [$compressedChunk, $compressedFrame]
: [$data, $uncompressedFrame];
/** @var string $checksum */
$checksum = hash('crc32c', $data, true);
/** @phpstan-ignore-next-line */
$checksum = unpack('N', $checksum)[1];
$maskedChecksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff;
$size = (\strlen($chunk) + 4) << 8;
$byteBuffer->append(pack('V', $chunkType + $size));
$byteBuffer->append(pack('V', $maskedChecksum));
$byteBuffer->append($chunk);
}
$this->socket->write($byteBuffer->flush());
}
/**
* {@inheritDoc}
*/
public function read(int $length): string
{
$output = $this->output;
$input = $this->input;
$this->logger->debug('Snappy requested {length} bytes.', ['length' => $length]);
while ($output->size() < $length) {
$this->logger->debug('Snappy enter loop');
/** @phpstan-ignore-next-line */
$chunkType = unpack('V', $this->socket->read(4))[1];
$size = $chunkType >> 8;
$chunkType &= 0xff;
$this->logger->debug('Snappy receive chunk [{chunk}] with size [{size}]', [
'chunk' => $chunkType,
'size' => $size,
]);
switch ($chunkType) {
case 0xff:
$this->logger->debug('Snappy identifier chunk');
$this->socket->read(6); // discard identifier body
break;
case 0x00: // 'compressed',
$this->logger->debug('Snappy compressed chunk');
$input->append(
$this->socket->read($size),
)
->discard(4) // discard checksum
;
$output->append(
snappy_uncompress(
$input->flush(),
),
);
break;
case 0x01: // 'uncompressed',
$this->logger->debug('Snappy uncompressed chunk');
$input->append(
$this->socket->read($size),
)
->discard(4) // discard checksum
;
$output->append($input->flush());
break;
case 0xfe:// 'padding',
$this->logger->debug('Snappy padding chunk');
break;
}
}
$this->logger->debug('Snappy return message [{message}]', ['message' => $output->read($length)]);
return $output->consume($length);
}
/**
* {@inheritDoc}
*/
public function close(): void
{
$this->socket->close();
}
/**
* {@inheritDoc}
*/
public function selectRead(float $timeout): bool
{
return !$this->input->empty() || $this->socket->selectRead($timeout);
}
}

View File

@ -8,13 +8,6 @@ use Nsq\Exception\ConnectionFail;
interface Socket interface Socket
{ {
public function close(): void;
/**
* @throws ConnectionFail
*/
public function selectRead(float $timeout): bool;
/** /**
* @throws ConnectionFail * @throws ConnectionFail
*/ */
@ -24,4 +17,11 @@ interface Socket
* @throws ConnectionFail * @throws ConnectionFail
*/ */
public function read(int $length): string; public function read(int $length): string;
/**
* @throws ConnectionFail
*/
public function selectRead(float $timeout): bool;
public function close(): void;
} }

View File

@ -11,7 +11,10 @@ use PHPUnit\Framework\TestCase;
final class NsqTest extends TestCase final class NsqTest extends TestCase
{ {
public function test(): void /**
* @dataProvider configs
*/
public function test(ClientConfig $clientConfig): void
{ {
$producer = new Producer('tcp://localhost:4150'); $producer = new Producer('tcp://localhost:4150');
$producer->pub(__FUNCTION__, __FUNCTION__); $producer->pub(__FUNCTION__, __FUNCTION__);
@ -20,10 +23,7 @@ final class NsqTest extends TestCase
topic: 'test', topic: 'test',
channel: 'test', channel: 'test',
address: 'tcp://localhost:4150', address: 'tcp://localhost:4150',
clientConfig: new ClientConfig( clientConfig: $clientConfig,
heartbeatInterval: 3000,
readTimeout: 1,
),
); );
$generator = $consumer->generator(); $generator = $consumer->generator();
@ -89,4 +89,26 @@ final class NsqTest extends TestCase
$generator->send(0); $generator->send(0);
self::assertTrue($consumer->isClosed()); self::assertTrue($consumer->isClosed());
} }
/**
* @return Generator<string, array<int, ClientConfig>>
*/
public function configs(): Generator
{
yield 'default' => [
new ClientConfig(
heartbeatInterval: 3000,
snappy: false,
readTimeout: 1,
),
];
yield 'snappy' => [
new ClientConfig(
heartbeatInterval: 3000,
snappy: true,
readTimeout: 1,
),
];
}
} }