From 801fdfe547ac8f0fadd3fa0506c6a61a56a332a6 Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Mon, 1 Feb 2021 01:17:39 +0300 Subject: [PATCH] Snappy fix receiving partial message --- src/Socket/SnappySocket.php | 38 +++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/Socket/SnappySocket.php b/src/Socket/SnappySocket.php index da9e2ac..775ae8a 100644 --- a/src/Socket/SnappySocket.php +++ b/src/Socket/SnappySocket.php @@ -51,7 +51,7 @@ final class SnappySocket implements Socket /** @var string $checksum */ $checksum = hash('crc32c', $data, true); - /** @phpstan-ignore-next-line */ + /** @phpstan-ignore-next-line */ $checksum = unpack('N', $checksum)[1]; $maskedChecksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff; @@ -78,50 +78,56 @@ final class SnappySocket implements Socket while ($output->size() < $length) { $this->logger->debug('Snappy enter loop'); - /** @phpstan-ignore-next-line */ + /** @phpstan-ignore-next-line */ $chunkType = unpack('V', $this->socket->read(4))[1]; $size = $chunkType >> 8; $chunkType &= 0xff; - $this->logger->debug('Snappy receive chunk [{chunk}] with size [{size}]', [ + $this->logger->debug('Snappy receive chunk [{chunk}], size [{size}]', [ 'chunk' => $chunkType, 'size' => $size, ]); + do { + $input->append( + $this->socket->read($size), + ); + + $size -= $input->size(); + } while ($input->size() < $size); + switch ($chunkType) { case 0xff: $this->logger->debug('Snappy identifier chunk'); - $this->socket->read(6); // discard identifier body + $input->discard(6); // discard identifier body break; case 0x00: // 'compressed', $this->logger->debug('Snappy compressed chunk'); - $input->append( - $this->socket->read($size), - ) + $data = $input ->discard(4) // discard checksum + ->flush() ; - $output->append( - snappy_uncompress( - $input->flush(), - ), - ); + $this->logger->debug('Snappy compressed data [{data}]', ['data' => $data]); + + $output->append(snappy_uncompress($data)); break; case 0x01: // 'uncompressed', $this->logger->debug('Snappy uncompressed chunk'); - $input->append( - $this->socket->read($size), - ) + $data = $input ->discard(4) // discard checksum + ->flush() ; - $output->append($input->flush()); + $this->logger->debug('Snappy uncompressed data [{data}]', ['data' => $data]); + + $output->append($data); break; case 0xfe:// 'padding',