diff --git a/src/Message.php b/src/Message.php index 3141f1b..99192d9 100644 --- a/src/Message.php +++ b/src/Message.php @@ -6,7 +6,6 @@ namespace Nsq; use Amp\Promise; use Nsq\Exception\MessageException; -use function Amp\call; final class Message { @@ -32,20 +31,19 @@ final class Message ); } + public function isProcessed(): bool + { + return $this->processed; + } + /** * @return Promise */ public function finish(): Promise { - return call(function (): \Generator { - if ($this->processed) { - throw MessageException::processed($this); - } + $this->markAsProcessedOrFail(); - yield $this->consumer->fin($this->id); - - $this->processed = true; - }); + return $this->consumer->fin($this->id); } /** @@ -53,15 +51,9 @@ final class Message */ public function requeue(int $timeout): Promise { - return call(function () use ($timeout): \Generator { - if ($this->processed) { - throw MessageException::processed($this); - } + $this->markAsProcessedOrFail(); - yield $this->consumer->req($this->id, $timeout); - - $this->processed = true; - }); + return $this->consumer->req($this->id, $timeout); } /** @@ -69,14 +61,17 @@ final class Message */ public function touch(): Promise { - return call(function (): \Generator { - if ($this->processed) { - throw MessageException::processed($this); - } + $this->markAsProcessedOrFail(); - yield $this->consumer->touch($this->id); + return $this->consumer->touch($this->id); + } - $this->processed = true; - }); + private function markAsProcessedOrFail(): void + { + if ($this->processed) { + throw MessageException::processed($this); + } + + $this->processed = true; } }