Return bool from public api of Producer and Consumer to indicate of success of process and prevent throwing exception from it

This commit is contained in:
2021-09-18 23:31:05 +03:00
parent b130b09a82
commit e1725ea140
3 changed files with 87 additions and 25 deletions

View File

@ -6,6 +6,7 @@ namespace Nsq;
use Amp\Failure;
use Amp\Promise;
use Amp\Success;
use Nsq\Config\ClientConfig;
use Nsq\Exception\ConsumerException;
use Nsq\Frame\Response;
@ -135,10 +136,14 @@ final class Consumer extends Connection
/**
* Update RDY state (indicate you are ready to receive N messages).
*
* @return Promise<void>
* @return Promise<bool>
*/
public function rdy(int $count): Promise
{
if (!$this->isConnected()) {
return new Success(false);
}
if ($this->rdy === $count) {
return call(static function (): void {
});
@ -146,19 +151,39 @@ final class Consumer extends Connection
$this->rdy = $count;
return $this->write(Command::rdy($count));
return call(function () use ($count) {
try {
yield $this->write(Command::rdy($count));
return true;
} catch (\Throwable) {
return false;
}
});
}
/**
* Finish a message (indicate successful processing).
*
* @return Promise<void>
* @return Promise<bool>
*
* @internal
*/
public function fin(string $id): Promise
{
return $this->write(Command::fin($id));
if (!$this->isConnected()) {
return new Success(false);
}
return call(function () use ($id) {
try {
yield $this->write(Command::fin($id));
return true;
} catch (\Throwable) {
return false;
}
});
}
/**
@ -167,24 +192,48 @@ final class Consumer extends Connection
* be explicitly relied upon and may change in the future. Similarly, a message that is in-flight and times out
* behaves identically to an explicit REQ.
*
* @return Promise<void>
* @return Promise<bool>
*
* @internal
*/
public function req(string $id, int $timeout): Promise
{
return $this->write(Command::req($id, $timeout));
if (!$this->isConnected()) {
return new Success(false);
}
return call(function () use ($id, $timeout) {
try {
yield $this->write(Command::req($id, $timeout));
return true;
} catch (\Throwable) {
return false;
}
});
}
/**
* Reset the timeout for an in-flight message.
*
* @return Promise<void>
* @return Promise<bool>
*
* @internal
*/
public function touch(string $id): Promise
{
return $this->write(Command::touch($id));
if (!$this->isConnected()) {
return new Success(false);
}
return call(function () use ($id) {
try {
yield $this->write(Command::touch($id));
return true;
} catch (\Throwable) {
return false;
}
});
}
}

View File

@ -37,7 +37,7 @@ final class Message
}
/**
* @return Promise<void>
* @psalm-return Promise<bool>
*/
public function finish(): Promise
{
@ -49,7 +49,7 @@ final class Message
/**
* @psalm-param positive-int|0 $timeout
*
* @return Promise<void>
* @psalm-return Promise<bool>
*/
public function requeue(int $timeout): Promise
{
@ -59,7 +59,7 @@ final class Message
}
/**
* @return Promise<void>
* @psalm-return Promise<bool>
*/
public function touch(): Promise
{

View File

@ -5,6 +5,7 @@ declare(strict_types=1);
namespace Nsq;
use Amp\Promise;
use Amp\Success;
use Nsq\Config\ClientConfig;
use Nsq\Exception\NsqException;
use Psr\Log\LoggerInterface;
@ -92,25 +93,37 @@ final class Producer extends Connection
*
* @psalm-param positive-int|0 $delay
*
* @return Promise<void>
* @return Promise<bool>
*/
public function publish(string $topic, string | array $body, int $delay = 0): Promise
{
if (0 < $delay) {
return call(
function (array $bodies) use ($topic, $delay): \Generator {
foreach ($bodies as $body) {
yield $this->write(Command::dpub($topic, $body, $delay));
}
},
(array) $body,
);
if (!$this->isConnected()) {
return new Success(false);
}
$command = \is_array($body)
? Command::mpub($topic, $body)
: Command::pub($topic, $body);
return call(
function (iterable $commands): \Generator {
try {
foreach ($commands as $command) {
yield $this->write($command);
}
return $this->write($command);
return true;
} catch (\Throwable) {
return false;
}
},
(static function () use ($topic, $body, $delay): \Generator {
if (\is_array($body) && null === $delay) {
yield Command::mpub($topic, $body);
} elseif (null !== $delay) {
foreach ((array) $body as $content) {
yield Command::dpub($topic, $content, $delay);
}
} else {
yield Command::pub($topic, $body);
}
})(),
);
}
}