Compare commits
5 Commits
1f2a80f820
...
fad67d327f
Author | SHA1 | Date | |
---|---|---|---|
fad67d327f
|
|||
9d2ef8e132 | |||
4fac0e066f | |||
ed2fdebce5 | |||
819f2c9e5d |
@ -13,9 +13,9 @@
|
||||
"require": {
|
||||
"php": "^8.0.1",
|
||||
"ext-json": "*",
|
||||
"nsq/nsq": "^0.6.2",
|
||||
"symfony/framework-bundle": "^5.0",
|
||||
"symfony/messenger": "^5.0"
|
||||
"nsq/nsq": "0.7",
|
||||
"symfony/framework-bundle": "^5.0 || ^6.0",
|
||||
"symfony/messenger": "^5.0 || ^6.0"
|
||||
},
|
||||
"require-dev": {
|
||||
"ergebnis/composer-normalize": "^2.13"
|
||||
@ -25,5 +25,10 @@
|
||||
"Nsq\\NsqBundle\\": "src/"
|
||||
}
|
||||
},
|
||||
"minimum-stability": "dev"
|
||||
"minimum-stability": "dev",
|
||||
"config": {
|
||||
"allow-plugins": {
|
||||
"ergebnis/composer-normalize": true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ class NsqExtension extends Extension
|
||||
*/
|
||||
public function load(array $configs, ContainerBuilder $container): void
|
||||
{
|
||||
$loader = new Loader\YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
|
||||
$loader->load('services.yml');
|
||||
$loader = new Loader\PhpFileLoader($container, new FileLocator(__DIR__.'/../config'));
|
||||
$loader->load('services.php');
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +1,12 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Nsq\NsqBundle\Messenger;
|
||||
namespace Nsq\NsqBundle\EventListener;
|
||||
|
||||
use Nsq\NsqBundle\Messenger\NsqReceivedStamp;
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
|
||||
use function Amp\Promise\wait;
|
||||
|
||||
final class AckUnrecoverableMessageListener implements EventSubscriberInterface
|
||||
@ -21,7 +23,7 @@ final class AckUnrecoverableMessageListener implements EventSubscriberInterface
|
||||
|
||||
public function onMessageFailed(WorkerMessageFailedEvent $event): void
|
||||
{
|
||||
if ($event->willRetry()) {
|
||||
if (!$event->getThrowable() instanceof UnrecoverableExceptionInterface) {
|
||||
return;
|
||||
}
|
||||
|
@ -39,25 +39,19 @@ final class NsqSender implements SenderInterface
|
||||
$delayStamp = $envelope->last(DelayStamp::class);
|
||||
$delay = null !== $delayStamp ? $delayStamp->getDelay() : 0;
|
||||
|
||||
$promise = null;
|
||||
if (null === $envelope->last(NsqReceivedStamp::class)) {
|
||||
$encodedMessage = $this->serializer->encode($envelope);
|
||||
$encodedMessage = json_encode($encodedMessage, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE);
|
||||
|
||||
if (null !== $envelope->last(NsqReceivedStamp::class)) {
|
||||
wait($producer->publish($this->topic, $encodedMessage, $delay));
|
||||
} else {
|
||||
$message = NsqReceivedStamp::getMessageFromEnvelope($envelope);
|
||||
|
||||
if (!$message->isProcessed()) {
|
||||
$promise = $message->requeue($delay);
|
||||
wait($message->requeue($delay));
|
||||
}
|
||||
}
|
||||
|
||||
if (null === $promise) {
|
||||
$encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class));
|
||||
$encodedMessage = json_encode($encodedMessage, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE);
|
||||
|
||||
$promise = $producer->publish($this->topic, $encodedMessage, $delay);
|
||||
}
|
||||
|
||||
wait($promise);
|
||||
|
||||
return $envelope;
|
||||
}
|
||||
|
||||
|
@ -39,6 +39,7 @@ final class NsqTransportFactory implements TransportFactoryInterface
|
||||
}
|
||||
|
||||
$nsqOptions = $query + $options;
|
||||
$nsqOptions['rdyCount'] = 1;
|
||||
|
||||
$address = sprintf('tcp://%s:%s', $components['host'], $components['port'] ?? $query['port'] ?? 4150);
|
||||
$topic = $nsqOptions['topic'] ?? 'symfony-messenger';
|
||||
|
@ -1,9 +0,0 @@
|
||||
services:
|
||||
Nsq\NsqBundle\Messenger\NsqTransportFactory:
|
||||
tags:
|
||||
- 'messenger.transport_factory'
|
||||
- 'container.no_preload'
|
||||
|
||||
Nsq\NsqBundle\Messenger\AckUnrecoverableMessageListener:
|
||||
tags:
|
||||
- { name: kernel.event_subscriber }
|
18
src/config/services.php
Normal file
18
src/config/services.php
Normal file
@ -0,0 +1,18 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use Symfony\Component\DependencyInjection\Loader\Configurator\ContainerConfigurator;
|
||||
|
||||
return static function (ContainerConfigurator $configurator): void {
|
||||
$services = $configurator->services();
|
||||
|
||||
$services->set(Nsq\NsqBundle\Messenger\NsqTransportFactory::class)
|
||||
->tag('messenger.transport_factory')
|
||||
->tag('container.no_preload')
|
||||
;
|
||||
|
||||
$services->set(\Nsq\NsqBundle\EventListener\AckUnrecoverableMessageListener::class)
|
||||
->tag('kernel.event_subscriber')
|
||||
;
|
||||
};
|
Reference in New Issue
Block a user