Refactoring: Simplify Message methods, add isProcessed method.

This commit is contained in:
2021-06-09 16:33:18 +03:00
parent 2fc7e37120
commit d3e1788d23

View File

@ -6,7 +6,6 @@ namespace Nsq;
use Amp\Promise; use Amp\Promise;
use Nsq\Exception\MessageException; use Nsq\Exception\MessageException;
use function Amp\call;
final class Message final class Message
{ {
@ -32,20 +31,19 @@ final class Message
); );
} }
public function isProcessed(): bool
{
return $this->processed;
}
/** /**
* @return Promise<void> * @return Promise<void>
*/ */
public function finish(): Promise public function finish(): Promise
{ {
return call(function (): \Generator { $this->markAsProcessedOrFail();
if ($this->processed) {
throw MessageException::processed($this);
}
yield $this->consumer->fin($this->id); return $this->consumer->fin($this->id);
$this->processed = true;
});
} }
/** /**
@ -53,15 +51,9 @@ final class Message
*/ */
public function requeue(int $timeout): Promise public function requeue(int $timeout): Promise
{ {
return call(function () use ($timeout): \Generator { $this->markAsProcessedOrFail();
if ($this->processed) {
throw MessageException::processed($this);
}
yield $this->consumer->req($this->id, $timeout); return $this->consumer->req($this->id, $timeout);
$this->processed = true;
});
} }
/** /**
@ -69,14 +61,17 @@ final class Message
*/ */
public function touch(): Promise public function touch(): Promise
{ {
return call(function (): \Generator { $this->markAsProcessedOrFail();
if ($this->processed) {
throw MessageException::processed($this);
}
yield $this->consumer->touch($this->id); return $this->consumer->touch($this->id);
}
$this->processed = true; private function markAsProcessedOrFail(): void
}); {
if ($this->processed) {
throw MessageException::processed($this);
}
$this->processed = true;
} }
} }