buffer = new ByteBuffer(); $this->inputStream = $this->outputStream = new NullStream(); $this->clientConfig = $clientConfig ?? new ClientConfig(); $this->logger = $logger ?? new NullLogger(); } public function __destruct() { $this->close(); } /** * @return Promise */ public function connect(): Promise { return call(function (): \Generator { $this->socket = $this->outputStream = yield connect($this->address); $this->inputStream = new NsqInputStream($this->socket); yield $this->outputStream->write(' V2'); yield $this->command('IDENTIFY', data: $this->clientConfig->toString()); /** @var Response $response */ $response = yield $this->readResponse(); $this->connectionConfig = ConnectionConfig::fromArray($response->toArray()); if ($this->connectionConfig->snappy) { $this->inputStream = new NsqInputStream( new SnappyInputStream($this->inputStream, $this->logger), ); $this->outputStream = new SnappyOutputStream($this->outputStream); $this->checkIsOK(); } if ($this->connectionConfig->deflate) { $this->inputStream = new NsqInputStream( new ZlibInputStream($this->socket, ZLIB_ENCODING_DEFLATE, [ 'level' => $this->connectionConfig->deflateLevel, ]), ); $this->outputStream = new ZlibOutputStream($this->socket, ZLIB_ENCODING_DEFLATE, [ 'level' => $this->connectionConfig->deflateLevel, ]); $this->checkIsOK(); } if ($this->connectionConfig->authRequired) { if (null === $this->clientConfig->authSecret) { yield $this->close(); throw new AuthenticationRequired(); } yield $this->command('AUTH', data: $this->clientConfig->authSecret); $response = yield $this->readResponse(); $this->logger->info('Authorization response: '.http_build_query($response->toArray())); } }); } /** * Cleanly close your connection (no more messages are sent). * * @return Promise */ public function close(): Promise { if (null === $this->socket) { return new Success(); } return call(function (): \Generator { yield $this->command('CLS'); if (null !== $this->socket) { $this->socket->close(); $this->socket = null; } }); } public function isClosed(): bool { return null === $this->socket; } /** * @param array|string $params * * @return Promise */ protected function command(string $command, array | string $params = [], string $data = null): Promise { if (null === $this->socket) { return new Failure(new NotConnected()); } $command = [] === $params ? $command : implode(' ', [$command, ...((array) $params)]); $buffer = $this->buffer->append($command.PHP_EOL); if (null !== $data) { $buffer->appendUint32(\strlen($data)); $buffer->append($data); } $this->logger->debug('Sending: {bytes}', ['bytes' => $buffer->bytes()]); return $this->outputStream->write($buffer->flush()); } /** * @return Promise */ protected function readFrame(): Promise { return call(function (): \Generator { $bytes = yield $this->inputStream->read(); $this->logger->debug('Receiving: {bytes}', ['bytes' => $bytes]); if (null === $bytes) { throw new NotConnected(); } $buffer = $this->buffer->append($bytes); return match ($type = $buffer->consumeUint32()) { 0 => new Response($buffer->flush()), 1 => new Error($buffer->flush()), 2 => new Message( timestamp: $buffer->consumeInt64(), attempts: $buffer->consumeUint16(), id: $buffer->consume(Bytes::BYTES_ID), body: $buffer->flush(), consumer: $this instanceof Consumer ? $this : throw new NsqException('what?'), ), default => throw new NsqException('Unexpected frame type: '.$type) }; }); } /** * @return Promise */ protected function checkIsOK(): Promise { return call(function (): \Generator { /** @var Response $response */ $response = yield $this->readResponse(); if ($response->isHeartBeat()) { yield $this->command('NOP'); return $this->checkIsOK(); } if (!$response->isOk()) { throw new BadResponse($response); } $this->logger->info('Ok checked.'); return call(static function (): void {}); }); } /** * @return Promise */ private function readResponse(): Promise { return call(function (): \Generator { $frame = yield $this->readFrame(); if ($frame instanceof Error) { if ($frame->type->terminateConnection) { $this->close(); } throw new NsqError($frame); } if (!$frame instanceof Response) { throw new NsqException('Unreachable statement.'); } return $frame; }); } }