Refactoring configs. Use connections settings on establishing connection. Create ClientConfig from array.
This commit is contained in:
@ -8,3 +8,4 @@ parameters:
|
|||||||
paths:
|
paths:
|
||||||
- src
|
- src
|
||||||
- tests
|
- tests
|
||||||
|
checkMissingIterableValueType: false
|
||||||
|
@ -13,7 +13,7 @@ use Composer\InstalledVersions;
|
|||||||
*
|
*
|
||||||
* @psalm-immutable
|
* @psalm-immutable
|
||||||
*/
|
*/
|
||||||
final class ClientConfig implements \JsonSerializable
|
final class ClientConfig
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* @psalm-suppress ImpureFunctionCall
|
* @psalm-suppress ImpureFunctionCall
|
||||||
@ -30,6 +30,22 @@ final class ClientConfig implements \JsonSerializable
|
|||||||
*/
|
*/
|
||||||
public int $connectTimeout = 10,
|
public int $connectTimeout = 10,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The max attempts for establishing a connection.
|
||||||
|
*/
|
||||||
|
public int $maxAttempts = 0,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Use tcp_nodelay for establishing a connection.
|
||||||
|
*/
|
||||||
|
public bool $tcpNoDelay = false,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
||||||
|
* it will send back a JSON payload of supported features and metadata.
|
||||||
|
*/
|
||||||
|
public bool $featureNegotiation = true,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An identifier used to disambiguate this client (i.e. something specific to the consumer).
|
* An identifier used to disambiguate this client (i.e. something specific to the consumer).
|
||||||
*/
|
*/
|
||||||
@ -73,12 +89,6 @@ final class ClientConfig implements \JsonSerializable
|
|||||||
*/
|
*/
|
||||||
public int $sampleRate = 0,
|
public int $sampleRate = 0,
|
||||||
|
|
||||||
/**
|
|
||||||
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
|
||||||
* it will send back a JSON payload of supported features and metadata.
|
|
||||||
*/
|
|
||||||
public bool $featureNegotiation = true,
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enable TLS for this connection.
|
* Enable TLS for this connection.
|
||||||
*/
|
*/
|
||||||
@ -89,11 +99,6 @@ final class ClientConfig implements \JsonSerializable
|
|||||||
*/
|
*/
|
||||||
public bool $snappy = false,
|
public bool $snappy = false,
|
||||||
|
|
||||||
/**
|
|
||||||
* The read timeout for connection sockets and for awaiting responses from nsq.
|
|
||||||
*/
|
|
||||||
public int $readTimeout = 5,
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A string identifying the agent for this client in the spirit of HTTP.
|
* A string identifying the agent for this client in the spirit of HTTP.
|
||||||
*/
|
*/
|
||||||
@ -114,12 +119,14 @@ final class ClientConfig implements \JsonSerializable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public static function fromArray(array $array): self
|
||||||
* @phpstan-ignore-next-line
|
|
||||||
*/
|
|
||||||
public function jsonSerialize(): array
|
|
||||||
{
|
{
|
||||||
return [
|
return new self(...array_intersect_key($array, get_class_vars(self::class)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function asNegotiationPayload(): string
|
||||||
|
{
|
||||||
|
$data = [
|
||||||
'client_id' => $this->clientId,
|
'client_id' => $this->clientId,
|
||||||
'deflate' => $this->deflate,
|
'deflate' => $this->deflate,
|
||||||
'deflate_level' => $this->deflateLevel,
|
'deflate_level' => $this->deflateLevel,
|
||||||
@ -132,10 +139,7 @@ final class ClientConfig implements \JsonSerializable
|
|||||||
'tls_v1' => $this->tls,
|
'tls_v1' => $this->tls,
|
||||||
'user_agent' => $this->userAgent,
|
'user_agent' => $this->userAgent,
|
||||||
];
|
];
|
||||||
}
|
|
||||||
|
|
||||||
public function toString(): string
|
return json_encode($data, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
||||||
{
|
|
||||||
return json_encode($this, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,7 @@ namespace Nsq\Config;
|
|||||||
*
|
*
|
||||||
* @psalm-immutable
|
* @psalm-immutable
|
||||||
*/
|
*/
|
||||||
final class ConnectionConfig
|
final class ServerConfig
|
||||||
{
|
{
|
||||||
public function __construct(
|
public function __construct(
|
||||||
/**
|
/**
|
||||||
@ -82,9 +82,6 @@ final class ConnectionConfig
|
|||||||
) {
|
) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @phpstan-ignore-next-line
|
|
||||||
*/
|
|
||||||
public static function fromArray(array $array): self
|
public static function fromArray(array $array): self
|
||||||
{
|
{
|
||||||
return new self(
|
return new self(
|
@ -6,7 +6,7 @@ namespace Nsq;
|
|||||||
|
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Nsq\Config\ClientConfig;
|
use Nsq\Config\ClientConfig;
|
||||||
use Nsq\Config\ConnectionConfig;
|
use Nsq\Config\ServerConfig;
|
||||||
use Nsq\Exception\AuthenticationRequired;
|
use Nsq\Exception\AuthenticationRequired;
|
||||||
use Nsq\Exception\NsqException;
|
use Nsq\Exception\NsqException;
|
||||||
use Nsq\Frame\Response;
|
use Nsq\Frame\Response;
|
||||||
@ -46,16 +46,21 @@ abstract class Connection
|
|||||||
$buffer = new Buffer();
|
$buffer = new Buffer();
|
||||||
|
|
||||||
/** @var SocketStream $stream */
|
/** @var SocketStream $stream */
|
||||||
$stream = yield SocketStream::connect($this->address);
|
$stream = yield SocketStream::connect(
|
||||||
|
$this->address,
|
||||||
|
$this->clientConfig->connectTimeout,
|
||||||
|
$this->clientConfig->maxAttempts,
|
||||||
|
$this->clientConfig->tcpNoDelay,
|
||||||
|
);
|
||||||
|
|
||||||
yield $stream->write(Command::magic());
|
yield $stream->write(Command::magic());
|
||||||
yield $stream->write(Command::identify($this->clientConfig->toString()));
|
yield $stream->write(Command::identify($this->clientConfig->asNegotiationPayload()));
|
||||||
|
|
||||||
/** @var Response $response */
|
/** @var Response $response */
|
||||||
$response = yield $this->response($stream, $buffer);
|
$response = yield $this->response($stream, $buffer);
|
||||||
$connectionConfig = ConnectionConfig::fromArray($response->toArray());
|
$serverConfig = ServerConfig::fromArray($response->toArray());
|
||||||
|
|
||||||
if ($connectionConfig->snappy) {
|
if ($serverConfig->snappy) {
|
||||||
$stream = new SnappyStream($stream, $buffer->flush());
|
$stream = new SnappyStream($stream, $buffer->flush());
|
||||||
|
|
||||||
/** @var Response $response */
|
/** @var Response $response */
|
||||||
@ -66,7 +71,7 @@ abstract class Connection
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($connectionConfig->deflate) {
|
if ($serverConfig->deflate) {
|
||||||
$stream = new GzipStream($stream);
|
$stream = new GzipStream($stream);
|
||||||
|
|
||||||
/** @var Response $response */
|
/** @var Response $response */
|
||||||
@ -77,7 +82,7 @@ abstract class Connection
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($connectionConfig->authRequired) {
|
if ($serverConfig->authRequired) {
|
||||||
if (null === $this->clientConfig->authSecret) {
|
if (null === $this->clientConfig->authSecret) {
|
||||||
throw new AuthenticationRequired();
|
throw new AuthenticationRequired();
|
||||||
}
|
}
|
||||||
|
@ -14,4 +14,57 @@ final class ClientConfigTest extends TestCase
|
|||||||
|
|
||||||
new ClientConfig(deflate: true, snappy: true);
|
new ClientConfig(deflate: true, snappy: true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @dataProvider array
|
||||||
|
*/
|
||||||
|
public function testFromArray(array $data, array $expected): void
|
||||||
|
{
|
||||||
|
self::assertSame($expected, get_object_vars(ClientConfig::fromArray($data)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function array(): Generator
|
||||||
|
{
|
||||||
|
$default = [
|
||||||
|
'authSecret' => null,
|
||||||
|
'connectTimeout' => 10,
|
||||||
|
'maxAttempts' => 0,
|
||||||
|
'tcpNoDelay' => false,
|
||||||
|
'featureNegotiation' => true,
|
||||||
|
'clientId' => '',
|
||||||
|
'deflate' => false,
|
||||||
|
'deflateLevel' => 6,
|
||||||
|
'heartbeatInterval' => 30000,
|
||||||
|
'hostname' => gethostname(),
|
||||||
|
'msgTimeout' => 60000,
|
||||||
|
'sampleRate' => 0,
|
||||||
|
'tls' => false,
|
||||||
|
'snappy' => false,
|
||||||
|
'userAgent' => 'nsqphp/dev-main',
|
||||||
|
];
|
||||||
|
|
||||||
|
yield 'Empty array' => [[], $default];
|
||||||
|
|
||||||
|
yield 'With wrong keys' => [['bla' => 'bla'], $default];
|
||||||
|
|
||||||
|
$custom = [
|
||||||
|
'authSecret' => 'SomeSecret',
|
||||||
|
'connectTimeout' => 100,
|
||||||
|
'maxAttempts' => 10,
|
||||||
|
'tcpNoDelay' => true,
|
||||||
|
'featureNegotiation' => true,
|
||||||
|
'clientId' => 'SomeGorgeousClientId',
|
||||||
|
'deflate' => true,
|
||||||
|
'deflateLevel' => 1,
|
||||||
|
'heartbeatInterval' => 31111,
|
||||||
|
'hostname' => gethostname(),
|
||||||
|
'msgTimeout' => 59999,
|
||||||
|
'sampleRate' => 25,
|
||||||
|
'tls' => true,
|
||||||
|
'snappy' => false,
|
||||||
|
'userAgent' => 'nsqphp/test',
|
||||||
|
];
|
||||||
|
|
||||||
|
yield 'Full filled' => [$custom, $custom];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,6 @@ final class NsqTest extends TestCase
|
|||||||
new ClientConfig(
|
new ClientConfig(
|
||||||
heartbeatInterval: 3000,
|
heartbeatInterval: 3000,
|
||||||
snappy: false,
|
snappy: false,
|
||||||
readTimeout: 1,
|
|
||||||
),
|
),
|
||||||
];
|
];
|
||||||
|
|
||||||
@ -32,7 +31,6 @@ final class NsqTest extends TestCase
|
|||||||
new ClientConfig(
|
new ClientConfig(
|
||||||
heartbeatInterval: 3000,
|
heartbeatInterval: 3000,
|
||||||
snappy: true,
|
snappy: true,
|
||||||
readTimeout: 1,
|
|
||||||
),
|
),
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user