This commit is contained in:
2021-01-27 00:30:03 +03:00
parent fc6b67cc92
commit 87fe939018

View File

@ -19,10 +19,13 @@ use Socket\Raw\Exception;
use Socket\Raw\Factory;
use Socket\Raw\Socket;
use function addcslashes;
use function hash;
use function http_build_query;
use function implode;
use function json_encode;
use function pack;
use function snappy_compress;
use function unpack;
use const JSON_FORCE_OBJECT;
use const JSON_THROW_ON_ERROR;
use const PHP_EOL;
@ -140,7 +143,63 @@ abstract class Connection
$buffer .= $data;
}
$this->logger->debug('Send buffer: '.addcslashes($buffer, PHP_EOL));
$this->logger->debug('Prepare send uncompressed buffer: {bytes}', ['bytes' => addcslashes($buffer, PHP_EOL)]);
if ($this->connectionConfig?->snappy) {
$identifierFrame = [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
$compressedFrame = 0x00;
$uncompressedFrame = 0x01;
$chunk = snappy_compress($buffer);
[$chunk, $compressFrame] = match (\strlen($chunk) < \strlen($buffer)) {
true => [$chunk, $compressedFrame],
false => [$buffer, $uncompressedFrame],
};
$size = \strlen($chunk) + 4;
$buffer = new ByteBuffer();
foreach ([...$identifierFrame, $compressFrame, $size, $size >> 8, $size >> 16] as $byte) {
$buffer->appendUint8($byte);
}
$crc32c = hash('crc32c', $data, true);
$crc32c = unpack('V', $crc32c)[1];
$unsignedRightShift = static function ($a, $b) {
if ($b >= 32 || $b < -32) {
$m = (int) ($b / 32);
$b -= ($m * 32);
}
if ($b < 0) {
$b = 32 + $b;
}
if (0 === $b) {
return (($a >> 1) & 0x7fffffff) * 2 + (($a >> $b) & 1);
}
if ($a < 0) {
$a >>= 1;
$a &= 2147483647;
$a |= 0x40000000;
$a >>= ($b - 1);
} else {
$a >>= $b;
}
return $a;
};
$checksum = $unsignedRightShift((($crc32c >> 15) | ($crc32c << 17)) + 0xa282ead8, 0);
$buffer->appendUint32($checksum);
$buffer->append($chunk);
$buffer = $buffer->bytes();
}
$this->logger->debug('Prepare send compressed buffer: {bytes}', ['bytes' => addcslashes($buffer, PHP_EOL)]);
try {
$socket->write($buffer);
@ -191,18 +250,59 @@ abstract class Connection
throw new ConnectionFail('Probably connection lost');
}
$buffer = new ByteBuffer();
if ($this->connectionConfig?->snappy) {
$buffer = new ByteBuffer();
$snappyBuffer = new ByteBuffer($size);
while (true) {
$typeByte = \ord($snappyBuffer->consume(1));
/** @phpstan-ignore-next-line */
$size = unpack('N', $size)[1];
$size = \ord($snappyBuffer->consume(1)) + (\ord($snappyBuffer->consume(1)) << 8) + (\ord($snappyBuffer->consume(1)) << 16);
$type = match ($typeByte) {
0xff => 'identifier',
0x00 => 'compressed',
0x01 => 'uncompressed',
0xfe => 'padding',
};
do {
$chunk = $socket->read($size);
$this->logger->debug('Received snappy chunk: {type}, size: {size}', [
'type' => $type,
'size' => $size,
]);
$buffer->append($chunk);
switch ($typeByte) {
case 0xff: // 'identifier',
$socket->read($size);
$snappyBuffer->append($socket->read(4));
$size -= \strlen($chunk);
} while (0 < $size);
continue 2;
case 0x00: // 'compressed',
case 0x01: // 'uncompressed',
$uncompressed = $socket->read($size);
$this->logger->debug('Received uncompressed bytes: {bytes}', ['bytes' => $uncompressed]);
$buffer->append($uncompressed);
$buffer->consume(4); // slice snappy prefix
$buffer->consumeUint32(); // slice size
break 2;
case 0xfe:// 'padding',
}
}
} else {
$this->logger->debug('Size bytes received: "{bytes}"', ['bytes' => $size]);
$buffer = new ByteBuffer($size);
$size = $buffer->consumeUint32();
do {
$chunk = $socket->read($size);
$buffer->append($chunk);
$size -= \strlen($chunk);
} while (0 < $size);
}
$this->logger->debug('Received buffer: '.addcslashes($buffer->bytes(), PHP_EOL));