Graceful close connection
This commit is contained in:
@@ -17,6 +17,7 @@ use Nsq\Stream\NullStream;
|
|||||||
use Nsq\Stream\SnappyStream;
|
use Nsq\Stream\SnappyStream;
|
||||||
use Nsq\Stream\SocketStream;
|
use Nsq\Stream\SocketStream;
|
||||||
use Psr\Log\LoggerInterface;
|
use Psr\Log\LoggerInterface;
|
||||||
|
use function Amp\asyncCall;
|
||||||
use function Amp\call;
|
use function Amp\call;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -124,17 +125,41 @@ abstract class Connection
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public function close(): void
|
public function close(bool $graceful = true): void
|
||||||
{
|
{
|
||||||
// $this->stream->write(Command::cls());
|
if (!$this->isConnected()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
$this->stream->close();
|
$logger = $this->logger;
|
||||||
$this->stream = new NullStream();
|
[$stream, $this->stream] = [$this->stream, new NullStream()];
|
||||||
|
|
||||||
$this->logger->debug('{class} disconnected from {address}', [
|
if ($graceful) {
|
||||||
'class' => static::class,
|
$this->logger->debug('Graceful disconnect.', [
|
||||||
'address' => $this->address,
|
'class' => static::class,
|
||||||
]);
|
'address' => $this->address,
|
||||||
|
]);
|
||||||
|
|
||||||
|
asyncCall(static function () use ($stream, $logger): \Generator {
|
||||||
|
$promise = $stream->write(Command::cls());
|
||||||
|
$promise->onResolve(static function (?\Throwable $e) use ($stream, $logger) {
|
||||||
|
$stream->close();
|
||||||
|
|
||||||
|
if (null !== $e) {
|
||||||
|
$logger->warning($e->getMessage(), ['exception' => $e]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
yield $promise;
|
||||||
|
});
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
$stream->close();
|
||||||
|
} catch (ClosedException) {
|
||||||
|
}
|
||||||
|
|
||||||
($this->onCloseCallback)();
|
($this->onCloseCallback)();
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user