diff --git a/README.md b/README.md index 4e1154a..a5e900c 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,8 @@ Features - [ ] Discovery - [ ] Backoff - [ ] TLS -- [ ] Snappy +- [ ] Deflate +- [X] Snappy - [X] Sampling - [X] AUTH diff --git a/src/Connection.php b/src/Connection.php index ada2107..efdabc5 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -16,7 +16,9 @@ use Nsq\Protocol\Error; use Nsq\Protocol\Frame; use Nsq\Protocol\Message; use Nsq\Protocol\Response; +use Nsq\Socket\DeflateSocket; use Nsq\Socket\RawSocket; +use Nsq\Socket\SnappySocket; use Psr\Log\LoggerAwareTrait; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -61,7 +63,24 @@ abstract class Connection ->toArray() ); - if ($this->connectionConfig->snappy || $this->connectionConfig->deflate) { + if ($this->connectionConfig->snappy) { + $this->socket = new NsqSocket( + new SnappySocket( + $socket, + $this->logger, + ), + ); + + $this->checkIsOK(); + } + + if ($this->connectionConfig->deflate) { + $this->socket = new NsqSocket( + new DeflateSocket( + $socket, + ), + ); + $this->checkIsOK(); } @@ -116,6 +135,8 @@ abstract class Connection ? $command : implode(' ', [$command, ...((array) $params)]); + $this->logger->info('Command [{command}] with data [{data}]', ['command' => $command, 'data' => $data ?? 'null']); + $this->socket->write($command, $data); return $this; @@ -173,6 +194,8 @@ abstract class Connection if (!$response->isOk()) { throw new BadResponse($response); } + + $this->logger->info('Ok checked.'); } private function readResponse(): Response diff --git a/src/Socket/DeflateSocket.php b/src/Socket/DeflateSocket.php new file mode 100644 index 0000000..d077e74 --- /dev/null +++ b/src/Socket/DeflateSocket.php @@ -0,0 +1,47 @@ +socket->selectRead($timeout); + } +} diff --git a/src/Socket/SnappySocket.php b/src/Socket/SnappySocket.php new file mode 100644 index 0000000..da9e2ac --- /dev/null +++ b/src/Socket/SnappySocket.php @@ -0,0 +1,154 @@ +output = new ByteBuffer(); + $this->input = new ByteBuffer(); + } + + /** + * {@inheritDoc} + */ + public function write(string $data): void + { + $identifierFrame = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59]; + $compressedFrame = 0x00; + $uncompressedFrame = 0x01; // 11 + $maxChunkLength = 65536; + + $byteBuffer = new ByteBuffer(); + foreach ($identifierFrame as $bite) { + $byteBuffer->appendUint8($bite); + } + + foreach (str_split($data, $maxChunkLength) as $chunk) { + $compressedChunk = snappy_compress($chunk); + + [$chunk, $chunkType] = \strlen($compressedChunk) <= 0.875 * \strlen($data) + ? [$compressedChunk, $compressedFrame] + : [$data, $uncompressedFrame]; + + /** @var string $checksum */ + $checksum = hash('crc32c', $data, true); + /** @phpstan-ignore-next-line */ + $checksum = unpack('N', $checksum)[1]; + $maskedChecksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff; + + $size = (\strlen($chunk) + 4) << 8; + + $byteBuffer->append(pack('V', $chunkType + $size)); + $byteBuffer->append(pack('V', $maskedChecksum)); + $byteBuffer->append($chunk); + } + + $this->socket->write($byteBuffer->flush()); + } + + /** + * {@inheritDoc} + */ + public function read(int $length): string + { + $output = $this->output; + $input = $this->input; + + $this->logger->debug('Snappy requested {length} bytes.', ['length' => $length]); + + while ($output->size() < $length) { + $this->logger->debug('Snappy enter loop'); + + /** @phpstan-ignore-next-line */ + $chunkType = unpack('V', $this->socket->read(4))[1]; + + $size = $chunkType >> 8; + $chunkType &= 0xff; + + $this->logger->debug('Snappy receive chunk [{chunk}] with size [{size}]', [ + 'chunk' => $chunkType, + 'size' => $size, + ]); + + switch ($chunkType) { + case 0xff: + $this->logger->debug('Snappy identifier chunk'); + + $this->socket->read(6); // discard identifier body + + break; + case 0x00: // 'compressed', + $this->logger->debug('Snappy compressed chunk'); + + $input->append( + $this->socket->read($size), + ) + ->discard(4) // discard checksum + ; + + $output->append( + snappy_uncompress( + $input->flush(), + ), + ); + + break; + case 0x01: // 'uncompressed', + $this->logger->debug('Snappy uncompressed chunk'); + + $input->append( + $this->socket->read($size), + ) + ->discard(4) // discard checksum + ; + + $output->append($input->flush()); + + break; + case 0xfe:// 'padding', + $this->logger->debug('Snappy padding chunk'); + + break; + } + } + + $this->logger->debug('Snappy return message [{message}]', ['message' => $output->read($length)]); + + return $output->consume($length); + } + + /** + * {@inheritDoc} + */ + public function close(): void + { + $this->socket->close(); + } + + /** + * {@inheritDoc} + */ + public function selectRead(float $timeout): bool + { + return !$this->input->empty() || $this->socket->selectRead($timeout); + } +} diff --git a/src/Socket/Socket.php b/src/Socket/Socket.php index 20b0e49..6ea29b3 100644 --- a/src/Socket/Socket.php +++ b/src/Socket/Socket.php @@ -8,13 +8,6 @@ use Nsq\Exception\ConnectionFail; interface Socket { - public function close(): void; - - /** - * @throws ConnectionFail - */ - public function selectRead(float $timeout): bool; - /** * @throws ConnectionFail */ @@ -24,4 +17,11 @@ interface Socket * @throws ConnectionFail */ public function read(int $length): string; + + /** + * @throws ConnectionFail + */ + public function selectRead(float $timeout): bool; + + public function close(): void; } diff --git a/tests/NsqTest.php b/tests/NsqTest.php index 94d2342..0738a4b 100644 --- a/tests/NsqTest.php +++ b/tests/NsqTest.php @@ -11,7 +11,10 @@ use PHPUnit\Framework\TestCase; final class NsqTest extends TestCase { - public function test(): void + /** + * @dataProvider configs + */ + public function test(ClientConfig $clientConfig): void { $producer = new Producer('tcp://localhost:4150'); $producer->pub(__FUNCTION__, __FUNCTION__); @@ -20,10 +23,7 @@ final class NsqTest extends TestCase topic: 'test', channel: 'test', address: 'tcp://localhost:4150', - clientConfig: new ClientConfig( - heartbeatInterval: 3000, - readTimeout: 1, - ), + clientConfig: $clientConfig, ); $generator = $consumer->generator(); @@ -89,4 +89,26 @@ final class NsqTest extends TestCase $generator->send(0); self::assertTrue($consumer->isClosed()); } + + /** + * @return Generator> + */ + public function configs(): Generator + { + yield 'default' => [ + new ClientConfig( + heartbeatInterval: 3000, + snappy: false, + readTimeout: 1, + ), + ]; + + yield 'snappy' => [ + new ClientConfig( + heartbeatInterval: 3000, + snappy: true, + readTimeout: 1, + ), + ]; + } }