setFormatter(new ConsoleFormatter()); $logger = new Logger('publisher', [$handler], [new PsrLogMessageProcessor()]); $consumer = new Consumer( 'tcp://localhost:4150', topic: 'local', channel: 'local', onMessage: static function (Message $message) use ($logger): Promise { return call(function () use ($message, $logger): Generator { $logger->info('Received: {body}', ['body' => $message->body]); yield $message->finish(); }); }, clientConfig: new ClientConfig( deflate: false, snappy: true, ), logger: $logger, ); yield $consumer->connect(); });