This commit is contained in:
Konstantin Grachev
2020-06-30 23:00:15 +03:00
committed by GitHub
parent 762bd0b866
commit cf0845cb8e
15 changed files with 1533 additions and 443 deletions

View File

@@ -28,6 +28,7 @@ services:
image: automagistre/tenant-php:${VERSION}
networks:
- default
- nsq
environment:
APP_SECRET: ${APP_SECRET}
DATABASE_PASSWORD: ${DB_PASSWORD}
@@ -55,6 +56,15 @@ services:
mode: replicated
replicas: 1
events_consumer:
<<: *php-fpm
command: console events:consume
healthcheck:
disable: true
deploy:
mode: replicated
replicas: 1
migration:
<<: *php-fpm
command: wait-for-it.sh db:5432 -- console doctrine:migrations:migrate --no-interaction --allow-no-migration
@@ -108,3 +118,6 @@ networks:
default:
driver: overlay
name: tenant_${TENANT}
nsq:
driver: overlay
name: nsq

View File

@@ -10,7 +10,9 @@
"ext-intl": "*",
"ext-json": "*",
"ext-mbstring": "*",
"ext-pcntl": "*",
"ext-uuid": "*",
"amphp/socket": "^1.1",
"beberlei/doctrineextensions": "^1.2",
"cron/cron-bundle": "^2.4",
"datto/json-rpc": "^6.1",
@@ -27,6 +29,7 @@
"ocramius/doctrine-batch-utils": "^2.0",
"odolbeau/phone-number-bundle": "^3.0",
"oro/doctrine-extensions": "^1.3",
"phpinnacle/buffer": "^1.0",
"premier/enum": "^2.0",
"ramsey/uuid-doctrine": "^1.6",
"sensio/framework-extra-bundle": "^5.3",

1259
composer.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -56,6 +56,16 @@ services:
environment:
VCAP_APP_PORT: 80
nsqd:
image: nsqio/nsq:v1.2.0
command: /nsqd
nsqadmin:
image: nsqio/nsq:v1.2.0
command: /nsqadmin --nsqd-http-address=nsqd:4151 --http-address=0.0.0.0:80
labels:
ru.grachevko.dhu: 'nsq.${TENANT}.automagistre.local'
networks:
default:
name: automagistre

View File

@@ -17,3 +17,7 @@ parameters:
symfony:
container_xml_path: %rootDir%/../../../var/cache/test/App_KernelTestDebugContainer.xml
checkMissingIterableValueType: false
ignoreErrors:
-
message: '#Unable to resolve the template type#'
path: %currentWorkingDirectory%/src

View File

@@ -40,6 +40,11 @@
<file name="src/Shared/Identifier/ODM/IdentifierType.php"/>
</errorLevel>
</PropertyNotSetInConstructor>
<PossiblyFalseOperand>
<errorLevel type="suppress">
<file name="src/Nsq/Command.php"/>
</errorLevel>
</PossiblyFalseOperand>
</issueHandlers>
<plugins>

View File

@@ -8,6 +8,7 @@ use App\JSONRPC\Test\JsonRPCClient;
use App\Shared\Doctrine\ORM\Listeners\MetadataCacheCompilerPass;
use App\Shared\Identifier\IdentifierFormatter;
use App\Shared\Identifier\IdentifierMapCompilerPass;
use App\SimpleBus\DI\AsyncEventMiddlewareCompilerPass;
use function assert;
use function class_exists;
use function dirname;
@@ -96,6 +97,7 @@ final class Kernel extends SymfonyKernel implements CompilerPassInterface
{
$container->addCompilerPass(new MetadataCacheCompilerPass(), PassConfig::TYPE_OPTIMIZE);
$container->addCompilerPass(new IdentifierMapCompilerPass(), PassConfig::TYPE_OPTIMIZE);
$container->addCompilerPass(new AsyncEventMiddlewareCompilerPass(), PassConfig::TYPE_OPTIMIZE);
}
/**

135
src/Nsq/Command.php Normal file
View File

@@ -0,0 +1,135 @@
<?php
namespace App\Nsq;
use function array_map;
use function count;
use function implode;
use function pack;
use const PHP_EOL;
use Sentry\Util\JSON;
use function sprintf;
use function strlen;
class Command
{
private const MAGIC_V2 = ' V2';
public static function magic(): string
{
return self::MAGIC_V2;
}
/**
* Update client metadata on the server and negotiate features.
*/
public static function identify(array $arr): string
{
$body = Json::encode($arr, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT);
$size = pack('N', strlen($body));
return 'IDENTIFY '.PHP_EOL.$size.$body;
}
/**
* Subscribe to a topic/channel.
*/
public static function sub(string $topic, string $channel): string
{
return sprintf('SUB %s %s', $topic, $channel).PHP_EOL;
}
/**
* Publish a message to a topic.
*/
public static function pub(string $topic, string $body): string
{
$size = pack('N', strlen($body));
return 'PUB '.$topic.PHP_EOL.$size.$body;
}
/**
* Publish multiple messages to a topic (atomically).
*/
public static function mpub(string $topic, array $bodies): string
{
$num = pack('N', count($bodies));
$mb = implode('', array_map(static function ($body): string {
return pack('N', strlen($body)).$body;
}, $bodies));
$size = pack('N', strlen($num.$mb));
return 'MPUB '.$topic.PHP_EOL.$size.$num.$mb;
}
/**
* Publish a deferred message to a topic.
*/
public static function dpub(string $topic, int $deferTime, string $body): string
{
$size = pack('N', strlen($body));
return sprintf('DPUB %s %s', $topic, $deferTime).PHP_EOL.$size.$body;
}
/**
* Update RDY state (indicate you are ready to receive N messages).
*/
public static function rdy(int $count): string
{
return 'RDY '.$count.PHP_EOL;
}
/**
* Finish a message (indicate successful processing).
*/
public static function fin(string $id): string
{
return 'FIN '.$id.PHP_EOL;
}
/**
* Re-queue a message (indicate failure to process)
* The re-queued message is placed at the tail of the queue, equivalent to having just published it,
* but for various implementation specific reasons that behavior should not be explicitly relied upon and may change in the future.
* Similarly, a message that is in-flight and times out behaves identically to an explicit REQ.
*/
public static function req(string $id, int $timeout): string
{
return sprintf('REQ %s %s', $id, $timeout).PHP_EOL;
}
/**
* Reset the timeout for an in-flight message.
*/
public static function touch(string $id): string
{
return 'TOUCH '.$id.PHP_EOL;
}
/**
* Cleanly close your connection (no more messages are sent).
*/
public static function cls(): string
{
return 'CLS'.PHP_EOL;
}
/**
* Response for heartbeat.
*/
public static function nop(): string
{
return 'NOP'.PHP_EOL;
}
public static function auth(string $secret): string
{
$size = pack('N', strlen($secret));
return 'AUTH'.PHP_EOL.$size.$secret;
}
}

79
src/Nsq/Envelop.php Normal file
View File

@@ -0,0 +1,79 @@
<?php
declare(strict_types=1);
namespace App\Nsq;
use Amp\Promise;
use function call_user_func;
/**
* @psalm-immutable
*/
final class Envelop
{
public int $timestamp;
public int $attempts;
public string $id;
public string $body;
/**
* @var callable
*/
private $acknowledge;
/**
* @var callable
*/
private $requeue;
/**
* @var callable
*/
private $touch;
public function __construct(
int $timestamp,
int $attempts,
string $id,
string $body,
callable $ack,
callable $req,
callable $touch
) {
$this->timestamp = $timestamp;
$this->attempts = $attempts;
$this->id = $id;
$this->body = $body;
$this->acknowledge = $ack;
$this->requeue = $req;
$this->touch = $touch;
}
/**
* @psalm-return Promise<void>
*/
public function ack(): Promise
{
return call_user_func($this->acknowledge);
}
/**
* @psalm-return Promise<void>
*/
public function retry(int $timeout): Promise
{
return call_user_func($this->requeue, $timeout);
}
/**
* @psalm-return Promise<void>
*/
public function touch(): Promise
{
return call_user_func($this->touch);
}
}

221
src/Nsq/Nsq.php Normal file
View File

@@ -0,0 +1,221 @@
<?php
declare(strict_types=1);
namespace App\Nsq;
use function Amp\call;
use Amp\CancellationTokenSource;
use Amp\Promise;
use function Amp\Promise\rethrow;
use Amp\Socket\EncryptableSocket;
use Amp\Socket\SocketPool;
use Amp\Socket\UnlimitedSocketPool;
use Generator;
use LogicException;
use PHPinnacle\Buffer\ByteBuffer;
use Sentry\SentryBundle\SentryBundle;
use function sprintf;
use Throwable;
final class Nsq
{
private const OK = 'OK';
private const HEARTBEAT = '_heartbeat_';
private const TYPE_RESPONSE = 0;
private const TYPE_ERROR = 1;
private const TYPE_MESSAGE = 2;
private const BYTES_SIZE = 4;
private const BYTES_TYPE = 4;
private const BYTES_TIMESTAMP = 8;
private const BYTES_ATTEMPTS = 2;
private const BYTES_ID = 16;
private SocketPool $pool;
private array $config;
public function __construct(?SocketPool $pool, array $config = [])
{
$this->pool = $pool ?? new UnlimitedSocketPool();
$this->config = [
'localAddr' => $config['localAddr'] ?? 'tcp://nsqd:4150',
];
}
/**
* @psalm-return Promise<void>
*/
public function pub(string $topic, string $message): Promise
{
return call(function () use ($topic, $message): Generator {
/** @var EncryptableSocket $socket */
$socket = yield $this->pool->checkout(sprintf('%s#%s', $this->config['localAddr'], $topic));
yield $socket->write(Command::magic());
yield $socket->write(Command::pub($topic, $message));
$buffer = yield $socket->read();
$this->pool->checkin($socket);
if (null === $buffer) {
throw new LogicException('NSQ return unexpected null.');
}
$buffer = new ByteBuffer($buffer);
$size = $buffer->consumeUint32();
$type = $buffer->consumeUint32();
if (self::TYPE_ERROR === $type) {
throw new LogicException(sprintf('NSQ return error: "%s"', $buffer->consume($size)));
}
if (self::TYPE_RESPONSE !== $type) {
throw new LogicException(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_RESPONSE, $type));
}
$response = $buffer->consume($size - self::BYTES_TYPE);
if (self::OK !== $response) {
throw new LogicException(sprintf('NSQ return unexpected response: "%s"', $response));
}
});
}
public function subscribe(string $topic, string $channel, callable $callable): Stopper
{
$tokenSource = new CancellationTokenSource();
$stopper = new Stopper(static function () use ($tokenSource): void {
$tokenSource->cancel();
});
rethrow(call(function () use ($topic, $channel, $callable, $stopper, $tokenSource): Generator {
$uri = sprintf('%s#%s-%s', $this->config['localAddr'], $topic, $channel);
$buffer = new ByteBuffer();
/** @var EncryptableSocket $socket */
$socket = null;
$establishConnection = function () use ($uri, $tokenSource, &$socket, $topic, $channel): Generator {
$socket = yield $this->pool->checkout($uri, null, $tokenSource->getToken());
yield $socket->write(Command::magic());
yield $socket->write(Command::sub($topic, $channel));
};
yield call($establishConnection);
while (!$stopper->isStopped()) {
yield $socket->write(Command::rdy(1));
$size = 4;
$sizeRead = false;
while ($buffer->size() < $size) {
$chunk = yield $socket->read();
if (null === $chunk && $stopper->isStopped()) {
break 2;
}
if (null === $chunk) {
$buffer->empty();
$this->pool->checkin($socket);
yield call($establishConnection);
continue 2;
}
$buffer->append($chunk);
/** @phpstan-ignore-next-line */
if (false === $sizeRead && $buffer->size() >= self::BYTES_SIZE) {
$size = $buffer->consumeUint32();
$sizeRead = true;
}
}
$type = $buffer->consumeUint32();
if (self::TYPE_RESPONSE === $type) {
$response = $buffer->consume($size - self::BYTES_TYPE);
if (self::OK === $response) {
continue;
}
if (self::HEARTBEAT === $response) {
yield $socket->write(Command::nop());
continue;
}
throw new LogicException(sprintf('Unsupported response: "%s"', $response));
}
if (self::TYPE_ERROR === $type) {
throw new LogicException($buffer->consume($size - self::BYTES_TYPE));
}
if (self::TYPE_MESSAGE !== $type) {
throw new LogicException(sprintf('Unsupported type: "%s"', $type));
}
$timestamp = $buffer->consumeInt64();
$attempts = $buffer->consumeUint16();
$id = $buffer->consume(self::BYTES_ID);
$body = $buffer->consume($size - self::BYTES_TYPE - self::BYTES_TIMESTAMP - self::BYTES_ATTEMPTS - self::BYTES_ID);
$finished = false;
$message = new Envelop(
$timestamp,
$attempts,
$id,
$body,
static function () use ($socket, $id, &$finished): Promise {
if ($finished) {
throw new LogicException('Can\'t ack, message already finished.');
}
$finished = true;
return call(static function () use ($socket, $id): Generator {
yield $socket->write(Command::fin($id));
});
},
static function (int $timeout) use ($socket, $id, &$finished): Promise {
if ($finished) {
throw new LogicException('Can\'t retry, message already finished.');
}
$finished = true;
return call(static function () use ($socket, $id, $timeout): Generator {
yield $socket->write(Command::req($id, $timeout));
});
},
static function () use ($socket, $id): Promise {
return call(static function () use ($socket, $id): Generator {
yield $socket->write(Command::touch($id));
});
},
);
try {
yield from $callable($message);
} catch (Throwable $e) {
SentryBundle::getCurrentHub()->captureException($e);
$this->pool->checkin($socket);
throw $e;
}
}
$this->pool->checkin($socket);
}));
return $stopper;
}
}

38
src/Nsq/Stopper.php Normal file
View File

@@ -0,0 +1,38 @@
<?php
declare(strict_types=1);
namespace App\Nsq;
use function call_user_func;
final class Stopper
{
private bool $stopped = false;
/**
* @var callable
*/
private $onStopped;
public function __construct(callable $onStopped)
{
$this->onStopped = $onStopped;
}
public function stop(): void
{
if ($this->isStopped()) {
return;
}
$this->stopped = true;
call_user_func($this->onStopped);
}
public function isStopped(): bool
{
return $this->stopped;
}
}

View File

@@ -0,0 +1,61 @@
<?php
declare(strict_types=1);
namespace App\SimpleBus;
use function Amp\Promise\wait;
use App\Nsq\Nsq;
use App\Tenant\Tenant;
use function get_class;
use const JSON_UNESCAPED_SLASHES;
use const PHP_SAPI;
use SimpleBus\Message\Bus\Middleware\MessageBusMiddleware;
use function sprintf;
use Symfony\Component\Serializer\SerializerInterface;
final class AsyncEventBusMiddleware implements MessageBusMiddleware
{
private Nsq $nsq;
private Tenant $tenant;
private bool $debug;
private SerializerInterface $serializer;
public function __construct(Nsq $nsq, Tenant $tenant, bool $debug, SerializerInterface $serializer)
{
$this->nsq = $nsq;
$this->tenant = $tenant;
$this->debug = $debug;
$this->serializer = $serializer;
}
/**
* {@inheritdoc}
*/
public function handle($message, callable $next): void
{
if (!$this->debug && 'cli' !== PHP_SAPI) {
$topic = sprintf('%s_events', $this->tenant->toIdentifier());
$body = $this->serializer->serialize(
[
'class' => get_class($message),
'body' => $message,
],
'json',
[
'json_encode_options' => JSON_UNESCAPED_SLASHES,
]
);
wait($this->nsq->pub($topic, $body));
// return;
}
$next($message);
}
}

View File

@@ -0,0 +1,104 @@
<?php
declare(strict_types=1);
namespace App\SimpleBus\Command;
use Amp\Loop;
use App\Nsq\Envelop;
use App\Nsq\Nsq;
use App\Tenant\Tenant;
use function class_exists;
use Generator;
use function is_object;
use LogicException;
use LongRunning\Core\Cleaner;
use Sentry\Util\JSON;
use const SIGINT;
use const SIGTERM;
use SimpleBus\SymfonyBridge\Bus\EventBus;
use function sprintf;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Serializer\Normalizer\DenormalizerInterface;
final class EventsConsumerCommand extends Command
{
protected static $defaultName = 'events:consume';
private Nsq $nsq;
private Tenant $tenant;
private DenormalizerInterface $denormalizer;
private EventBus $eventBus;
private Cleaner $cleaner;
public function __construct(
Nsq $nsq,
Tenant $tenant,
DenormalizerInterface $denormalizer,
EventBus $eventBus,
Cleaner $cleaner
) {
parent::__construct();
$this->nsq = $nsq;
$this->tenant = $tenant;
$this->denormalizer = $denormalizer;
$this->eventBus = $eventBus;
$this->cleaner = $cleaner;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$topic = sprintf('%s_events', $this->tenant->toIdentifier());
Loop::run(function () use ($topic, $io): void {
$stopper = $this->nsq->subscribe($topic, 'tenant', function (Envelop $envelop) use ($io): Generator {
$data = JSON::decode($envelop->body);
$class = $data['class'] ?? '';
if (!class_exists($class)) {
throw new LogicException(sprintf('Event class "%s" not exists. Body: "%s"', $class, $envelop->body));
}
$event = $this->denormalizer->denormalize($data['body'], $class);
if (!is_object($event)) {
throw new LogicException(sprintf('Event class "%s" not exists. Body: "%s"', $class, $envelop->body));
}
// $this->eventBus->handle($event); TODO only ack for now
$io->success(sprintf('Event: %s handled.', $class));
$this->cleaner->cleanUp();
yield $envelop->ack();
});
$onSignal = static function () use ($stopper, $io): void {
$io->note('Stop signal received');
$stopper->stop();
Loop::delay(1000, static function (): void {
Loop::stop();
});
};
Loop::onSignal(SIGINT, $onSignal);
Loop::onSignal(SIGTERM, $onSignal);
});
return 0;
}
}

View File

@@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
namespace App\SimpleBus\DI;
use App\SimpleBus\AsyncEventBusMiddleware;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;
final class AsyncEventMiddlewareCompilerPass implements CompilerPassInterface
{
/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container): void
{
$container->getDefinition('simple_bus.event_bus')
->addArgument([
new Reference(AsyncEventBusMiddleware::class),
]);
}
}

18
src/SimpleBus/config.php Normal file
View File

@@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
use Symfony\Component\DependencyInjection\Loader\Configurator\ContainerConfigurator;
return static function (ContainerConfigurator $configurator): void {
$services = $configurator->services();
$services
->defaults()
->autowire()
->autoconfigure();
$services
->set(App\SimpleBus\AsyncEventBusMiddleware::class)
->arg('$debug', '%kernel.debug%');
};