From f33a9dc226117587b095c9a12552140f03789047 Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Mon, 18 Jan 2021 16:46:31 +0300 Subject: [PATCH] Initial commit --- .gitignore | 2 + LICENSE | 21 +++ composer.json | 29 +++++ src/DependencyInjection/NsqExtension.php | 20 +++ src/Messenger/NsqReceivedStamp.php | 21 +++ src/Messenger/NsqTransport.php | 155 +++++++++++++++++++++++ src/Messenger/NsqTransportFactory.php | 52 ++++++++ src/NsqBundle.php | 10 ++ src/Resources/config/services.yml | 3 + 9 files changed, 313 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 composer.json create mode 100644 src/DependencyInjection/NsqExtension.php create mode 100644 src/Messenger/NsqReceivedStamp.php create mode 100644 src/Messenger/NsqTransport.php create mode 100644 src/Messenger/NsqTransportFactory.php create mode 100644 src/NsqBundle.php create mode 100644 src/Resources/config/services.yml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4fbb073 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/vendor/ +/composer.lock diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0a55468 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 nsqphp + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..0bfb952 --- /dev/null +++ b/composer.json @@ -0,0 +1,29 @@ +{ + "name": "nsq/nsq-bundle", + "type": "library", + "description": "Symfony Integration for NsqPHP", + "homepage": "https://github.com/nsqphp/NsqBundle", + "license": "MIT", + "authors": [ + { + "name": "Konstantin Grachev", + "email": "me@grachevko.ru" + } + ], + "require": { + "php": ">=7.4", + "ext-json": "*", + "nsq/nsq": "0.1", + "symfony/framework-bundle": "^5.0", + "symfony/messenger": "^5.0" + }, + "require-dev": { + "ergebnis/composer-normalize": "^2.13" + }, + "autoload": { + "psr-4": { + "NsqPHP\\NsqBundle\\": "src/" + } + }, + "minimum-stability": "dev" +} diff --git a/src/DependencyInjection/NsqExtension.php b/src/DependencyInjection/NsqExtension.php new file mode 100644 index 0000000..fc070bd --- /dev/null +++ b/src/DependencyInjection/NsqExtension.php @@ -0,0 +1,20 @@ +load('services.yml'); + } +} diff --git a/src/Messenger/NsqReceivedStamp.php b/src/Messenger/NsqReceivedStamp.php new file mode 100644 index 0000000..7bb8224 --- /dev/null +++ b/src/Messenger/NsqReceivedStamp.php @@ -0,0 +1,21 @@ +envelope = $envelope; + } +} diff --git a/src/Messenger/NsqTransport.php b/src/Messenger/NsqTransport.php new file mode 100644 index 0000000..79f240e --- /dev/null +++ b/src/Messenger/NsqTransport.php @@ -0,0 +1,155 @@ +connection = $connection; + $this->topic = $topic; + $this->channel = $channel; + $this->serializer = $serializer ?? new PhpSerializer(); + } + + /** + * {@inheritdoc} + */ + public function send(Envelope $envelope): Envelope + { + $nsqEnvelope = $this->getNsqEnvelope($envelope); + + $encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class)); + + $this->getPublisher()->pub($this->topic, json_encode($encodedMessage, JSON_THROW_ON_ERROR)); + + if (null !== $nsqEnvelope) { + $nsqEnvelope->ack(); + } + + return $envelope; + } + + /** + * {@inheritdoc} + */ + public function get(): iterable + { + $generator = $this->generator; + if (null === $generator) { + $this->generator = $generator = $this->getSubscriber()->subscribe($this->topic, $this->channel); + } else { + $generator->next(); + } + + /** @var NsqEnvelope|null $nsqEnvelope */ + $nsqEnvelope = $generator->current(); + + if (null === $nsqEnvelope) { + return []; + } + + try { + $encodedEnvelope = json_decode($nsqEnvelope->message->body, true, 512, JSON_THROW_ON_ERROR); + } catch (JsonException $e) { + $nsqEnvelope->ack(); + + throw new MessageDecodingFailedException('', 0, $e); + } + + try { + $envelope = $this->serializer->decode($encodedEnvelope); + } catch (MessageDecodingFailedException $e) { + $nsqEnvelope->ack(); + + throw $e; + } + + return [ + $envelope->with( + new NsqReceivedStamp($nsqEnvelope), + new TransportMessageIdStamp($nsqEnvelope->message->id), + ), + ]; + } + + /** + * {@inheritdoc} + */ + public function ack(Envelope $envelope): void + { + $message = $this->getNsqEnvelope($envelope); + if (!$message instanceof NsqEnvelope) { + throw new LogicException('Returned envelop doesn\'t related to NsqMessage.'); + } + + $message->ack(); + } + + /** + * {@inheritdoc} + */ + public function reject(Envelope $envelope): void + { + $message = $this->getNsqEnvelope($envelope); + if (!$message instanceof NsqEnvelope) { + throw new LogicException('Returned envelop doesn\'t related to NsqMessage.'); + } + + $message->ack(); + } + + private function getNsqEnvelope(Envelope $envelope): ?NsqEnvelope + { + $stamp = $envelope->last(NsqReceivedStamp::class); + if (!$stamp instanceof NsqReceivedStamp) { + return null; + } + + return $stamp->envelope; + } + + private function getPublisher(): Writer + { + return $this->publisher ??= new Writer($this->connection); + } + + private function getSubscriber(): Subscriber + { + return $this->publisher ??= new Subscriber(new Reader($this->connection)); + } +} diff --git a/src/Messenger/NsqTransportFactory.php b/src/Messenger/NsqTransportFactory.php new file mode 100644 index 0000000..ab84134 --- /dev/null +++ b/src/Messenger/NsqTransportFactory.php @@ -0,0 +1,52 @@ +