Snappy fix receiving partial message
This commit is contained in:
@@ -51,7 +51,7 @@ final class SnappySocket implements Socket
|
||||
|
||||
/** @var string $checksum */
|
||||
$checksum = hash('crc32c', $data, true);
|
||||
/** @phpstan-ignore-next-line */
|
||||
/** @phpstan-ignore-next-line */
|
||||
$checksum = unpack('N', $checksum)[1];
|
||||
$maskedChecksum = (($checksum >> 15) | ($checksum << 17)) + 0xa282ead8 & 0xffffffff;
|
||||
|
||||
@@ -78,50 +78,56 @@ final class SnappySocket implements Socket
|
||||
while ($output->size() < $length) {
|
||||
$this->logger->debug('Snappy enter loop');
|
||||
|
||||
/** @phpstan-ignore-next-line */
|
||||
/** @phpstan-ignore-next-line */
|
||||
$chunkType = unpack('V', $this->socket->read(4))[1];
|
||||
|
||||
$size = $chunkType >> 8;
|
||||
$chunkType &= 0xff;
|
||||
|
||||
$this->logger->debug('Snappy receive chunk [{chunk}] with size [{size}]', [
|
||||
$this->logger->debug('Snappy receive chunk [{chunk}], size [{size}]', [
|
||||
'chunk' => $chunkType,
|
||||
'size' => $size,
|
||||
]);
|
||||
|
||||
do {
|
||||
$input->append(
|
||||
$this->socket->read($size),
|
||||
);
|
||||
|
||||
$size -= $input->size();
|
||||
} while ($input->size() < $size);
|
||||
|
||||
switch ($chunkType) {
|
||||
case 0xff:
|
||||
$this->logger->debug('Snappy identifier chunk');
|
||||
|
||||
$this->socket->read(6); // discard identifier body
|
||||
$input->discard(6); // discard identifier body
|
||||
|
||||
break;
|
||||
case 0x00: // 'compressed',
|
||||
$this->logger->debug('Snappy compressed chunk');
|
||||
|
||||
$input->append(
|
||||
$this->socket->read($size),
|
||||
)
|
||||
$data = $input
|
||||
->discard(4) // discard checksum
|
||||
->flush()
|
||||
;
|
||||
|
||||
$output->append(
|
||||
snappy_uncompress(
|
||||
$input->flush(),
|
||||
),
|
||||
);
|
||||
$this->logger->debug('Snappy compressed data [{data}]', ['data' => $data]);
|
||||
|
||||
$output->append(snappy_uncompress($data));
|
||||
|
||||
break;
|
||||
case 0x01: // 'uncompressed',
|
||||
$this->logger->debug('Snappy uncompressed chunk');
|
||||
|
||||
$input->append(
|
||||
$this->socket->read($size),
|
||||
)
|
||||
$data = $input
|
||||
->discard(4) // discard checksum
|
||||
->flush()
|
||||
;
|
||||
|
||||
$output->append($input->flush());
|
||||
$this->logger->debug('Snappy uncompressed data [{data}]', ['data' => $data]);
|
||||
|
||||
$output->append($data);
|
||||
|
||||
break;
|
||||
case 0xfe:// 'padding',
|
||||
|
Reference in New Issue
Block a user