Add rdyCount to ClientConfig
This commit is contained in:
@ -39,6 +39,7 @@ final class ClientConfig
|
|||||||
* Use tcp_nodelay for establishing a connection.
|
* Use tcp_nodelay for establishing a connection.
|
||||||
*/
|
*/
|
||||||
public bool $tcpNoDelay = false,
|
public bool $tcpNoDelay = false,
|
||||||
|
public int $rdyCount = 2500,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
* Boolean used to indicate that the client supports feature negotiation. If the server is capable,
|
||||||
|
@ -29,7 +29,7 @@ final class Consumer extends Connection
|
|||||||
private string $topic,
|
private string $topic,
|
||||||
private string $channel,
|
private string $channel,
|
||||||
callable $onMessage,
|
callable $onMessage,
|
||||||
ClientConfig $clientConfig,
|
private ClientConfig $clientConfig,
|
||||||
private LoggerInterface $logger,
|
private LoggerInterface $logger,
|
||||||
) {
|
) {
|
||||||
parent::__construct(
|
parent::__construct(
|
||||||
@ -91,7 +91,7 @@ final class Consumer extends Connection
|
|||||||
return new Failure(new ConsumerException('Fail subscription.'));
|
return new Failure(new ConsumerException('Fail subscription.'));
|
||||||
}
|
}
|
||||||
|
|
||||||
yield $this->rdy(2500);
|
yield $this->rdy(1);
|
||||||
|
|
||||||
/** @phpstan-ignore-next-line */
|
/** @phpstan-ignore-next-line */
|
||||||
asyncCall(function () use ($buffer): \Generator {
|
asyncCall(function () use ($buffer): \Generator {
|
||||||
@ -117,6 +117,10 @@ final class Consumer extends Connection
|
|||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ($this->rdy !== $this->clientConfig->rdyCount) {
|
||||||
|
yield $this->rdy($this->clientConfig->rdyCount);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -151,8 +155,6 @@ final class Consumer extends Connection
|
|||||||
*/
|
*/
|
||||||
public function fin(string $id): Promise
|
public function fin(string $id): Promise
|
||||||
{
|
{
|
||||||
--$this->rdy;
|
|
||||||
|
|
||||||
return $this->stream->write(Command::fin($id));
|
return $this->stream->write(Command::fin($id));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -168,8 +170,6 @@ final class Consumer extends Connection
|
|||||||
*/
|
*/
|
||||||
public function req(string $id, int $timeout): Promise
|
public function req(string $id, int $timeout): Promise
|
||||||
{
|
{
|
||||||
--$this->rdy;
|
|
||||||
|
|
||||||
return $this->stream->write(Command::req($id, $timeout));
|
return $this->stream->write(Command::req($id, $timeout));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,6 +36,7 @@ final class ClientConfigTest extends TestCase
|
|||||||
'connectTimeout' => 100,
|
'connectTimeout' => 100,
|
||||||
'maxAttempts' => 10,
|
'maxAttempts' => 10,
|
||||||
'tcpNoDelay' => true,
|
'tcpNoDelay' => true,
|
||||||
|
'rdyCount' => 1,
|
||||||
'featureNegotiation' => true,
|
'featureNegotiation' => true,
|
||||||
'clientId' => 'SomeGorgeousClientId',
|
'clientId' => 'SomeGorgeousClientId',
|
||||||
'deflate' => true,
|
'deflate' => true,
|
||||||
|
Reference in New Issue
Block a user