Extract getMessage method from NsqTransport to NsqReceivedStamp
This commit is contained in:
@@ -4,7 +4,9 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Nsq\NsqBundle\Messenger;
|
namespace Nsq\NsqBundle\Messenger;
|
||||||
|
|
||||||
|
use LogicException;
|
||||||
use Nsq\Message;
|
use Nsq\Message;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Stamp\StampInterface;
|
use Symfony\Component\Messenger\Stamp\StampInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -18,4 +20,16 @@ final class NsqReceivedStamp implements StampInterface
|
|||||||
{
|
{
|
||||||
$this->message = $message;
|
$this->message = $message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static function getMessageFromEnvelope(Envelope $envelope): Message
|
||||||
|
{
|
||||||
|
/** @var self|null $stamp */
|
||||||
|
$stamp = $envelope->last(self::class);
|
||||||
|
|
||||||
|
if (null === $stamp) {
|
||||||
|
throw new LogicException('Envelop doesn\'t related to NsqMessage.');
|
||||||
|
}
|
||||||
|
|
||||||
|
return $stamp->message;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -6,7 +6,6 @@ namespace Nsq\NsqBundle\Messenger;
|
|||||||
|
|
||||||
use Generator;
|
use Generator;
|
||||||
use JsonException;
|
use JsonException;
|
||||||
use LogicException;
|
|
||||||
use Nsq\Message;
|
use Nsq\Message;
|
||||||
use Nsq\Producer;
|
use Nsq\Producer;
|
||||||
use Nsq\Subscriber;
|
use Nsq\Subscriber;
|
||||||
@@ -114,10 +113,7 @@ final class NsqTransport implements TransportInterface
|
|||||||
*/
|
*/
|
||||||
public function ack(Envelope $envelope): void
|
public function ack(Envelope $envelope): void
|
||||||
{
|
{
|
||||||
$message = $this->getMessage($envelope);
|
$message = NsqReceivedStamp::getMessageFromEnvelope($envelope);
|
||||||
if (!$message instanceof Message) {
|
|
||||||
throw new LogicException('Returned envelop doesn\'t related to NsqMessage.');
|
|
||||||
}
|
|
||||||
|
|
||||||
$message->finish();
|
$message->finish();
|
||||||
}
|
}
|
||||||
@@ -127,21 +123,8 @@ final class NsqTransport implements TransportInterface
|
|||||||
*/
|
*/
|
||||||
public function reject(Envelope $envelope): void
|
public function reject(Envelope $envelope): void
|
||||||
{
|
{
|
||||||
$message = $this->getMessage($envelope);
|
$message = NsqReceivedStamp::getMessageFromEnvelope($envelope);
|
||||||
if (!$message instanceof Message) {
|
|
||||||
throw new LogicException('Returned envelop doesn\'t related to NsqMessage.');
|
|
||||||
}
|
|
||||||
|
|
||||||
$message->finish();
|
$message->finish();
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getMessage(Envelope $envelope): ?Message
|
|
||||||
{
|
|
||||||
$stamp = $envelope->last(NsqReceivedStamp::class);
|
|
||||||
if (!$stamp instanceof NsqReceivedStamp) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
return $stamp->message;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user