Fix some psalm/phpstan errors

This commit is contained in:
2021-09-19 02:34:53 +03:00
parent 381ba88f8d
commit d4b29c69db
5 changed files with 47 additions and 37 deletions

View File

@ -46,9 +46,9 @@ abstract class Connection
protected LoggerInterface $logger, protected LoggerInterface $logger,
) { ) {
$this->stream = new NullStream(); $this->stream = new NullStream();
$this->onConnectCallback = static function () { $this->onConnectCallback = static function (): void {
}; };
$this->onCloseCallback = static function () { $this->onCloseCallback = static function (): void {
}; };
} }
@ -153,16 +153,13 @@ abstract class Connection
]); ]);
asyncCall(static function () use ($stream, $logger): \Generator { asyncCall(static function () use ($stream, $logger): \Generator {
$promise = $stream->write(Command::cls()); try {
$promise->onResolve(static function (?\Throwable $e) use ($stream, $logger) { yield $stream->write(Command::cls());
$stream->close(); } catch (\Throwable $e) {
$logger->warning($e->getMessage(), ['exception' => $e]);
}
if (null !== $e) { $stream->close();
$logger->warning($e->getMessage(), ['exception' => $e]);
}
});
yield $promise;
}); });
return; return;
@ -179,7 +176,7 @@ abstract class Connection
public function onConnect(callable $callback): static public function onConnect(callable $callback): static
{ {
$previous = $this->onConnectCallback; $previous = $this->onConnectCallback;
$this->onConnectCallback = static function () use ($previous, $callback) { $this->onConnectCallback = static function () use ($previous, $callback): void {
$previous(); $previous();
$callback(); $callback();
}; };
@ -190,7 +187,7 @@ abstract class Connection
public function onClose(callable $callback): static public function onClose(callable $callback): static
{ {
$previous = $this->onCloseCallback; $previous = $this->onCloseCallback;
$this->onCloseCallback = static function () use ($previous, $callback) { $this->onCloseCallback = static function () use ($previous, $callback): void {
$previous(); $previous();
$callback(); $callback();
}; };
@ -198,11 +195,14 @@ abstract class Connection
return $this; return $this;
} }
/**
* @psalm-return Promise<null|string>
*/
protected function read(): Promise protected function read(): Promise
{ {
return call(function () { return call(function (): \Generator {
try { try {
return $this->stream->read(); return yield $this->stream->read();
} catch (\Throwable $e) { } catch (\Throwable $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]); $this->logger->error($e->getMessage(), ['exception' => $e]);
@ -213,9 +213,12 @@ abstract class Connection
}); });
} }
/**
* @psalm-return Promise<void>
*/
protected function write(string $data): Promise protected function write(string $data): Promise
{ {
return call(function () use ($data) { return call(function () use ($data): \Generator {
try { try {
return yield $this->stream->write($data); return yield $this->stream->write($data);
} catch (\Throwable $e) { } catch (\Throwable $e) {

View File

@ -47,10 +47,10 @@ final class Consumer extends Connection
$this->onMessage = $onMessage; $this->onMessage = $onMessage;
$context = compact('address', 'topic', 'channel'); $context = compact('address', 'topic', 'channel');
$this->onConnect(function () use ($context) { $this->onConnect(function () use ($context): void {
$this->logger->debug('Consumer connected.', $context); $this->logger->debug('Consumer connected.', $context);
}); });
$this->onClose(function () use ($context) { $this->onClose(function () use ($context): void {
$this->logger->debug('Consumer disconnected.', $context); $this->logger->debug('Consumer disconnected.', $context);
}); });
$this->logger->debug('Consumer created.', $context); $this->logger->debug('Consumer created.', $context);
@ -151,13 +151,12 @@ final class Consumer extends Connection
} }
if ($this->rdy === $count) { if ($this->rdy === $count) {
return call(static function (): void { return new Success(true);
});
} }
$this->rdy = $count; $this->rdy = $count;
return call(function () use ($count) { return call(function () use ($count): \Generator {
try { try {
yield $this->write(Command::rdy($count)); yield $this->write(Command::rdy($count));
@ -181,7 +180,7 @@ final class Consumer extends Connection
return new Success(false); return new Success(false);
} }
return call(function () use ($id) { return call(function () use ($id): \Generator {
try { try {
yield $this->write(Command::fin($id)); yield $this->write(Command::fin($id));
@ -208,7 +207,7 @@ final class Consumer extends Connection
return new Success(false); return new Success(false);
} }
return call(function () use ($id, $timeout) { return call(function () use ($id, $timeout): \Generator {
try { try {
yield $this->write(Command::req($id, $timeout)); yield $this->write(Command::req($id, $timeout));
@ -232,7 +231,7 @@ final class Consumer extends Connection
return new Success(false); return new Success(false);
} }
return call(function () use ($id) { return call(function () use ($id): \Generator {
try { try {
yield $this->write(Command::touch($id)); yield $this->write(Command::touch($id));

View File

@ -10,6 +10,7 @@ use Amp\Http\Client\DelegateHttpClient;
use Amp\Http\Client\HttpClientBuilder; use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request; use Amp\Http\Client\Request;
use Amp\Http\Client\Response; use Amp\Http\Client\Response;
use Amp\NullCancellationToken;
use Amp\Promise; use Amp\Promise;
use Nsq\Config\ClientConfig; use Nsq\Config\ClientConfig;
use Nsq\Config\LookupConfig; use Nsq\Config\LookupConfig;
@ -23,7 +24,7 @@ use function Amp\delay;
final class Lookup final class Lookup
{ {
/** /**
* @var array<string, array<string, Lookup\Producer[]>> * @psalm-var array<string, array<string, Lookup\Producer>>
*/ */
private array $producers = []; private array $producers = [];
@ -60,10 +61,10 @@ final class Lookup
*/ */
public function nodes(): Promise public function nodes(): Promise
{ {
return call(function () { return call(function (): \Generator {
$requestHandler = function (string $uri): \Generator { $requestHandler = function (string $uri): \Generator {
/** @var Response $response */ /** @var Response $response */
$response = yield $this->httpClient->request(new Request($uri.'/nodes')); $response = yield $this->httpClient->request(new Request($uri.'/nodes'), new NullCancellationToken());
try { try {
return Lookup\Response::fromJson(yield $response->getBody()->buffer()); return Lookup\Response::fromJson(yield $response->getBody()->buffer());
@ -101,6 +102,9 @@ final class Lookup
$this->logger->info('Lookup stopped.'); $this->logger->info('Lookup stopped.');
} }
/**
* @psalm-suppress InvalidPropertyAssignmentValue
*/
public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void
{ {
if (null !== ($this->running[$topic][$channel] ?? null)) { if (null !== ($this->running[$topic][$channel] ?? null)) {
@ -109,7 +113,7 @@ final class Lookup
$this->running[$topic][$channel] = true; $this->running[$topic][$channel] = true;
asyncCall(function () use ($topic, $channel, $onMessage, $config) { asyncCall(function () use ($topic, $channel, $onMessage, $config): \Generator {
while (true) { while (true) {
if (null === ($this->running[$topic][$channel] ?? null)) { if (null === ($this->running[$topic][$channel] ?? null)) {
return; return;
@ -132,7 +136,7 @@ final class Lookup
} }
$this->keepConnection( $this->keepConnection(
new Consumer( Consumer::create(
$address, $address,
$topic, $topic,
$channel, $channel,
@ -173,7 +177,7 @@ final class Lookup
{ {
$this->consumers[$consumer->address][$consumer->topic][$consumer->channel] = $consumer; $this->consumers[$consumer->address][$consumer->topic][$consumer->channel] = $consumer;
asyncCall(function () use ($consumer) { asyncCall(function () use ($consumer): \Generator {
while (null !== ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) { while (null !== ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) {
try { try {
yield $consumer->connect(); yield $consumer->connect();
@ -219,12 +223,13 @@ final class Lookup
$this->topicWatchers[$topic] = true; $this->topicWatchers[$topic] = true;
asyncCall(function () use ($topic) { asyncCall(function () use ($topic): \Generator {
$requestHandler = function (string $uri) use ($topic): \Generator { $cancellationToken = new NullCancellationToken();
$requestHandler = function (string $uri) use ($topic, $cancellationToken): \Generator {
$this->logger->debug('Lookup', compact('topic')); $this->logger->debug('Lookup', compact('topic'));
/** @var Response $response */ /** @var Response $response */
$response = yield $this->httpClient->request(new Request($uri.'/lookup?topic='.$topic)); $response = yield $this->httpClient->request(new Request($uri.'/lookup?topic='.$topic), $cancellationToken);
try { try {
return Lookup\Response::fromJson(yield $response->getBody()->buffer()); return Lookup\Response::fromJson(yield $response->getBody()->buffer());

View File

@ -27,10 +27,10 @@ final class Producer extends Connection
); );
$context = compact('address'); $context = compact('address');
$this->onConnect(function () use ($context) { $this->onConnect(function () use ($context): void {
$this->logger->debug('Producer connected.', $context); $this->logger->debug('Producer connected.', $context);
}); });
$this->onClose(function () use ($context) { $this->onClose(function () use ($context): void {
$this->logger->debug('Producer disconnected.', $context); $this->logger->debug('Producer disconnected.', $context);
}); });
$this->logger->debug('Producer created.', $context); $this->logger->debug('Producer created.', $context);
@ -95,7 +95,7 @@ final class Producer extends Connection
* *
* @psalm-return Promise<bool> * @psalm-return Promise<bool>
*/ */
public function publish(string $topic, string | array $body, int $delay = 0): Promise public function publish(string $topic, string | array $body, int $delay = null): Promise
{ {
if (!$this->isConnected()) { if (!$this->isConnected()) {
return new Success(false); return new Success(false);

View File

@ -45,7 +45,7 @@ class GzipStream implements Stream
*/ */
public function read(): Promise public function read(): Promise
{ {
return call(function () { return call(function (): \Generator {
if (null === $this->inflate) { if (null === $this->inflate) {
return null; return null;
} }
@ -91,6 +91,9 @@ class GzipStream implements Stream
return $this->stream->write($compressed); return $this->stream->write($compressed);
} }
/**
* {@inheritDoc}
*/
public function close(): void public function close(): void
{ {
$this->stream->close(); $this->stream->close();