This commit is contained in:
2021-09-04 01:56:49 +03:00
parent ca2c2ee633
commit e3e83212c4
4 changed files with 78 additions and 7 deletions

View File

@ -34,7 +34,7 @@ Features
- [X] Discovery
- [ ] Backoff
- [X] TLS
- [ ] Deflate
- [X] Deflate
- [X] Snappy
- [X] Sampling
- [X] AUTH

View File

@ -83,7 +83,7 @@ abstract class Connection
}
if ($serverConfig->deflate) {
$stream = new GzipStream($stream);
$stream = new GzipStream($stream, $serverConfig->deflateLevel, $buffer->flush());
/** @var Response $response */
$response = yield $this->response($stream, $buffer);

View File

@ -0,0 +1,9 @@
<?php
declare(strict_types=1);
namespace Nsq\Exception;
final class StreamException extends NsqException
{
}

View File

@ -5,14 +5,39 @@ declare(strict_types=1);
namespace Nsq\Stream;
use Amp\Promise;
use Nsq\Exception\NsqException;
use Nsq\Buffer;
use Nsq\Exception\StreamException;
use Nsq\Stream;
use function Amp\call;
class GzipStream implements Stream
{
public function __construct(private Stream $stream)
/**
* @var resource
*/
private $inflate;
/**
* @var resource
*/
private $deflate;
private Buffer $buffer;
public function __construct(private Stream $stream, private int $level, string $bytes = '')
{
throw new NsqException('GzipStream not implemented yet.');
$this->inflate = @inflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
$this->deflate = @deflate_init(ZLIB_ENCODING_RAW, ['level' => $this->level]);
if (false === $this->inflate) {
throw new StreamException('Failed initializing inflate context');
}
if (false === $this->deflate) {
throw new StreamException('Failed initializing deflate context');
}
$this->buffer = new Buffer($bytes);
}
/**
@ -20,7 +45,32 @@ class GzipStream implements Stream
*/
public function read(): Promise
{
return $this->stream->read();
return call(function () {
if (null === $this->inflate) {
return null;
}
if ($this->buffer->empty()) {
$chunk = yield $this->stream->read();
if (null !== $chunk) {
$this->buffer->append($chunk);
}
}
$data = $this->buffer->flush();
if ('' === $data) {
return null;
}
$decompressed = inflate_add($this->inflate, $data, ZLIB_SYNC_FLUSH);
if (false === $decompressed) {
throw new StreamException('Failed adding data to deflate context');
}
return $decompressed;
});
}
/**
@ -28,11 +78,23 @@ class GzipStream implements Stream
*/
public function write(string $data): Promise
{
return $this->stream->write($data);
if (null === $this->deflate) {
throw new StreamException('The stream has already been closed');
}
$compressed = deflate_add($this->deflate, $data, ZLIB_SYNC_FLUSH);
if (false === $compressed) {
throw new StreamException('Failed adding data to deflate context');
}
return $this->stream->write($compressed);
}
public function close(): void
{
$this->stream->close();
$this->inflate = null;
$this->deflate = null;
}
}