Compare commits

...

6 Commits

8 changed files with 43 additions and 29 deletions

View File

@ -13,9 +13,9 @@
"require": { "require": {
"php": "^8.0.1", "php": "^8.0.1",
"ext-json": "*", "ext-json": "*",
"nsq/nsq": "^0.6.2", "nsq/nsq": "0.7",
"symfony/framework-bundle": "^5.0", "symfony/framework-bundle": "^5.0 || ^6.0",
"symfony/messenger": "^5.0" "symfony/messenger": "^5.0 || ^6.0"
}, },
"require-dev": { "require-dev": {
"ergebnis/composer-normalize": "^2.13" "ergebnis/composer-normalize": "^2.13"
@ -25,5 +25,10 @@
"Nsq\\NsqBundle\\": "src/" "Nsq\\NsqBundle\\": "src/"
} }
}, },
"minimum-stability": "dev" "minimum-stability": "dev",
"config": {
"allow-plugins": {
"ergebnis/composer-normalize": true
}
}
} }

3
renovate.json Normal file
View File

@ -0,0 +1,3 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
}

View File

@ -14,7 +14,7 @@ class NsqExtension extends Extension
*/ */
public function load(array $configs, ContainerBuilder $container): void public function load(array $configs, ContainerBuilder $container): void
{ {
$loader = new Loader\YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config')); $loader = new Loader\PhpFileLoader($container, new FileLocator(__DIR__.'/../config'));
$loader->load('services.yml'); $loader->load('services.php');
} }
} }

View File

@ -1,10 +1,12 @@
<?php <?php
declare(strict_types=1); declare(strict_types=1);
namespace Nsq\NsqBundle\Messenger; namespace Nsq\NsqBundle\EventListener;
use Nsq\NsqBundle\Messenger\NsqReceivedStamp;
use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use function Amp\Promise\wait; use function Amp\Promise\wait;
final class AckUnrecoverableMessageListener implements EventSubscriberInterface final class AckUnrecoverableMessageListener implements EventSubscriberInterface
@ -21,7 +23,7 @@ final class AckUnrecoverableMessageListener implements EventSubscriberInterface
public function onMessageFailed(WorkerMessageFailedEvent $event): void public function onMessageFailed(WorkerMessageFailedEvent $event): void
{ {
if ($event->willRetry()) { if (!$event->getThrowable() instanceof UnrecoverableExceptionInterface) {
return; return;
} }

View File

@ -39,25 +39,19 @@ final class NsqSender implements SenderInterface
$delayStamp = $envelope->last(DelayStamp::class); $delayStamp = $envelope->last(DelayStamp::class);
$delay = null !== $delayStamp ? $delayStamp->getDelay() : 0; $delay = null !== $delayStamp ? $delayStamp->getDelay() : 0;
$promise = null; if (null === $envelope->last(NsqReceivedStamp::class)) {
$encodedMessage = $this->serializer->encode($envelope);
$encodedMessage = json_encode($encodedMessage, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE);
if (null !== $envelope->last(NsqReceivedStamp::class)) { wait($producer->publish($this->topic, $encodedMessage, $delay));
} else {
$message = NsqReceivedStamp::getMessageFromEnvelope($envelope); $message = NsqReceivedStamp::getMessageFromEnvelope($envelope);
if (!$message->isProcessed()) { if (!$message->isProcessed()) {
$promise = $message->requeue($delay); wait($message->requeue($delay));
} }
} }
if (null === $promise) {
$encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class));
$encodedMessage = json_encode($encodedMessage, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE);
$promise = $producer->publish($this->topic, $encodedMessage, $delay);
}
wait($promise);
return $envelope; return $envelope;
} }

View File

@ -39,6 +39,7 @@ final class NsqTransportFactory implements TransportFactoryInterface
} }
$nsqOptions = $query + $options; $nsqOptions = $query + $options;
$nsqOptions['rdyCount'] = 1;
$address = sprintf('tcp://%s:%s', $components['host'], $components['port'] ?? $query['port'] ?? 4150); $address = sprintf('tcp://%s:%s', $components['host'], $components['port'] ?? $query['port'] ?? 4150);
$topic = $nsqOptions['topic'] ?? 'symfony-messenger'; $topic = $nsqOptions['topic'] ?? 'symfony-messenger';

View File

@ -1,9 +0,0 @@
services:
Nsq\NsqBundle\Messenger\NsqTransportFactory:
tags:
- 'messenger.transport_factory'
- 'container.no_preload'
Nsq\NsqBundle\Messenger\AckUnrecoverableMessageListener:
tags:
- { name: kernel.event_subscriber }

18
src/config/services.php Normal file
View File

@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
use Symfony\Component\DependencyInjection\Loader\Configurator\ContainerConfigurator;
return static function (ContainerConfigurator $configurator): void {
$services = $configurator->services();
$services->set(Nsq\NsqBundle\Messenger\NsqTransportFactory::class)
->tag('messenger.transport_factory')
->tag('container.no_preload')
;
$services->set(\Nsq\NsqBundle\EventListener\AckUnrecoverableMessageListener::class)
->tag('kernel.event_subscriber')
;
};