diff --git a/src/Connection.php b/src/Connection.php index 3521f6f..36def8e 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -46,9 +46,9 @@ abstract class Connection protected LoggerInterface $logger, ) { $this->stream = new NullStream(); - $this->onConnectCallback = static function () { + $this->onConnectCallback = static function (): void { }; - $this->onCloseCallback = static function () { + $this->onCloseCallback = static function (): void { }; } @@ -153,16 +153,13 @@ abstract class Connection ]); asyncCall(static function () use ($stream, $logger): \Generator { - $promise = $stream->write(Command::cls()); - $promise->onResolve(static function (?\Throwable $e) use ($stream, $logger) { - $stream->close(); + try { + yield $stream->write(Command::cls()); + } catch (\Throwable $e) { + $logger->warning($e->getMessage(), ['exception' => $e]); + } - if (null !== $e) { - $logger->warning($e->getMessage(), ['exception' => $e]); - } - }); - - yield $promise; + $stream->close(); }); return; @@ -179,7 +176,7 @@ abstract class Connection public function onConnect(callable $callback): static { $previous = $this->onConnectCallback; - $this->onConnectCallback = static function () use ($previous, $callback) { + $this->onConnectCallback = static function () use ($previous, $callback): void { $previous(); $callback(); }; @@ -190,7 +187,7 @@ abstract class Connection public function onClose(callable $callback): static { $previous = $this->onCloseCallback; - $this->onCloseCallback = static function () use ($previous, $callback) { + $this->onCloseCallback = static function () use ($previous, $callback): void { $previous(); $callback(); }; @@ -198,11 +195,14 @@ abstract class Connection return $this; } + /** + * @psalm-return Promise + */ protected function read(): Promise { - return call(function () { + return call(function (): \Generator { try { - return $this->stream->read(); + return yield $this->stream->read(); } catch (\Throwable $e) { $this->logger->error($e->getMessage(), ['exception' => $e]); @@ -213,9 +213,12 @@ abstract class Connection }); } + /** + * @psalm-return Promise + */ protected function write(string $data): Promise { - return call(function () use ($data) { + return call(function () use ($data): \Generator { try { return yield $this->stream->write($data); } catch (\Throwable $e) { diff --git a/src/Consumer.php b/src/Consumer.php index 06fe752..57528af 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -47,10 +47,10 @@ final class Consumer extends Connection $this->onMessage = $onMessage; $context = compact('address', 'topic', 'channel'); - $this->onConnect(function () use ($context) { + $this->onConnect(function () use ($context): void { $this->logger->debug('Consumer connected.', $context); }); - $this->onClose(function () use ($context) { + $this->onClose(function () use ($context): void { $this->logger->debug('Consumer disconnected.', $context); }); $this->logger->debug('Consumer created.', $context); @@ -151,13 +151,12 @@ final class Consumer extends Connection } if ($this->rdy === $count) { - return call(static function (): void { - }); + return new Success(true); } $this->rdy = $count; - return call(function () use ($count) { + return call(function () use ($count): \Generator { try { yield $this->write(Command::rdy($count)); @@ -181,7 +180,7 @@ final class Consumer extends Connection return new Success(false); } - return call(function () use ($id) { + return call(function () use ($id): \Generator { try { yield $this->write(Command::fin($id)); @@ -208,7 +207,7 @@ final class Consumer extends Connection return new Success(false); } - return call(function () use ($id, $timeout) { + return call(function () use ($id, $timeout): \Generator { try { yield $this->write(Command::req($id, $timeout)); @@ -232,7 +231,7 @@ final class Consumer extends Connection return new Success(false); } - return call(function () use ($id) { + return call(function () use ($id): \Generator { try { yield $this->write(Command::touch($id)); diff --git a/src/Lookup.php b/src/Lookup.php index 1b45884..58567c0 100644 --- a/src/Lookup.php +++ b/src/Lookup.php @@ -10,6 +10,7 @@ use Amp\Http\Client\DelegateHttpClient; use Amp\Http\Client\HttpClientBuilder; use Amp\Http\Client\Request; use Amp\Http\Client\Response; +use Amp\NullCancellationToken; use Amp\Promise; use Nsq\Config\ClientConfig; use Nsq\Config\LookupConfig; @@ -23,7 +24,7 @@ use function Amp\delay; final class Lookup { /** - * @var array> + * @psalm-var array> */ private array $producers = []; @@ -60,10 +61,10 @@ final class Lookup */ public function nodes(): Promise { - return call(function () { + return call(function (): \Generator { $requestHandler = function (string $uri): \Generator { /** @var Response $response */ - $response = yield $this->httpClient->request(new Request($uri.'/nodes')); + $response = yield $this->httpClient->request(new Request($uri.'/nodes'), new NullCancellationToken()); try { return Lookup\Response::fromJson(yield $response->getBody()->buffer()); @@ -101,6 +102,9 @@ final class Lookup $this->logger->info('Lookup stopped.'); } + /** + * @psalm-suppress InvalidPropertyAssignmentValue + */ public function subscribe(string $topic, string $channel, callable $onMessage, ClientConfig $config = null): void { if (null !== ($this->running[$topic][$channel] ?? null)) { @@ -109,7 +113,7 @@ final class Lookup $this->running[$topic][$channel] = true; - asyncCall(function () use ($topic, $channel, $onMessage, $config) { + asyncCall(function () use ($topic, $channel, $onMessage, $config): \Generator { while (true) { if (null === ($this->running[$topic][$channel] ?? null)) { return; @@ -132,7 +136,7 @@ final class Lookup } $this->keepConnection( - new Consumer( + Consumer::create( $address, $topic, $channel, @@ -173,7 +177,7 @@ final class Lookup { $this->consumers[$consumer->address][$consumer->topic][$consumer->channel] = $consumer; - asyncCall(function () use ($consumer) { + asyncCall(function () use ($consumer): \Generator { while (null !== ($this->consumers[$consumer->address][$consumer->topic][$consumer->channel] ?? null)) { try { yield $consumer->connect(); @@ -219,12 +223,13 @@ final class Lookup $this->topicWatchers[$topic] = true; - asyncCall(function () use ($topic) { - $requestHandler = function (string $uri) use ($topic): \Generator { + asyncCall(function () use ($topic): \Generator { + $cancellationToken = new NullCancellationToken(); + $requestHandler = function (string $uri) use ($topic, $cancellationToken): \Generator { $this->logger->debug('Lookup', compact('topic')); /** @var Response $response */ - $response = yield $this->httpClient->request(new Request($uri.'/lookup?topic='.$topic)); + $response = yield $this->httpClient->request(new Request($uri.'/lookup?topic='.$topic), $cancellationToken); try { return Lookup\Response::fromJson(yield $response->getBody()->buffer()); diff --git a/src/Producer.php b/src/Producer.php index 0f191f8..46233a3 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -27,10 +27,10 @@ final class Producer extends Connection ); $context = compact('address'); - $this->onConnect(function () use ($context) { + $this->onConnect(function () use ($context): void { $this->logger->debug('Producer connected.', $context); }); - $this->onClose(function () use ($context) { + $this->onClose(function () use ($context): void { $this->logger->debug('Producer disconnected.', $context); }); $this->logger->debug('Producer created.', $context); @@ -95,7 +95,7 @@ final class Producer extends Connection * * @psalm-return Promise */ - public function publish(string $topic, string | array $body, int $delay = 0): Promise + public function publish(string $topic, string | array $body, int $delay = null): Promise { if (!$this->isConnected()) { return new Success(false); diff --git a/src/Stream/GzipStream.php b/src/Stream/GzipStream.php index b5c400d..122bf08 100644 --- a/src/Stream/GzipStream.php +++ b/src/Stream/GzipStream.php @@ -45,7 +45,7 @@ class GzipStream implements Stream */ public function read(): Promise { - return call(function () { + return call(function (): \Generator { if (null === $this->inflate) { return null; } @@ -91,6 +91,9 @@ class GzipStream implements Stream return $this->stream->write($compressed); } + /** + * {@inheritDoc} + */ public function close(): void { $this->stream->close();