From 92d8304a6a1cbe86fec9134a3e850e80494ff32d Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Wed, 10 Mar 2021 22:11:58 +0300 Subject: [PATCH] Remove Producer::defer --- README.md | 2 +- src/Producer.php | 21 ++++++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 95a554a..c56e742 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ $producer->publish('topic', [ ]); // Publish a deferred message to a topic -$producer->defer('topic', 'Deferred message', delay: 5000); +$producer->publish('topic', 'Deferred message', delay: 5000); ``` ### Consumer diff --git a/src/Producer.php b/src/Producer.php index d503cd4..d96f4eb 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -46,8 +46,19 @@ final class Producer extends Connection * * @return Promise */ - public function publish(string $topic, string | array $body): Promise + public function publish(string $topic, string | array $body, int $delay = 0): Promise { + if (0 < $delay) { + return call( + function (array $bodies) use ($topic, $delay): \Generator { + foreach ($bodies as $body) { + yield $this->stream->write(Command::dpub($topic, $body, $delay)); + } + }, + (array) $body, + ); + } + $command = \is_array($body) ? Command::mpub($topic, $body) : Command::pub($topic, $body); @@ -55,14 +66,6 @@ final class Producer extends Connection return $this->stream->write($command); } - /** - * @return Promise - */ - public function defer(string $topic, string $body, int $delay): Promise - { - return $this->stream->write(Command::dpub($topic, $body, $delay)); - } - private function run(): void { $buffer = new Buffer();