Remove Producer::defer

This commit is contained in:
2021-03-10 22:11:58 +03:00
parent 3e4e8c3802
commit 92d8304a6a
2 changed files with 13 additions and 10 deletions

View File

@ -59,7 +59,7 @@ $producer->publish('topic', [
]); ]);
// Publish a deferred message to a topic // Publish a deferred message to a topic
$producer->defer('topic', 'Deferred message', delay: 5000); $producer->publish('topic', 'Deferred message', delay: 5000);
``` ```
### Consumer ### Consumer

View File

@ -46,8 +46,19 @@ final class Producer extends Connection
* *
* @return Promise<void> * @return Promise<void>
*/ */
public function publish(string $topic, string | array $body): Promise public function publish(string $topic, string | array $body, int $delay = 0): Promise
{ {
if (0 < $delay) {
return call(
function (array $bodies) use ($topic, $delay): \Generator {
foreach ($bodies as $body) {
yield $this->stream->write(Command::dpub($topic, $body, $delay));
}
},
(array) $body,
);
}
$command = \is_array($body) $command = \is_array($body)
? Command::mpub($topic, $body) ? Command::mpub($topic, $body)
: Command::pub($topic, $body); : Command::pub($topic, $body);
@ -55,14 +66,6 @@ final class Producer extends Connection
return $this->stream->write($command); return $this->stream->write($command);
} }
/**
* @return Promise<void>
*/
public function defer(string $topic, string $body, int $delay): Promise
{
return $this->stream->write(Command::dpub($topic, $body, $delay));
}
private function run(): void private function run(): void
{ {
$buffer = new Buffer(); $buffer = new Buffer();