diff --git a/README.md b/README.md index c56e742..cdb151b 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ Features - [X] Feature Negotiation - [ ] Discovery - [ ] Backoff -- [ ] TLS +- [X] TLS - [ ] Deflate - [X] Snappy - [X] Sampling diff --git a/src/Connection.php b/src/Connection.php index f6aaa61..10be707 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -60,6 +60,17 @@ abstract class Connection $response = yield $this->response($stream, $buffer); $serverConfig = ServerConfig::fromArray($response->toArray()); + if ($serverConfig->tls) { + yield $stream->setupTls(); + + /** @var Response $response */ + $response = yield $this->response($stream, $buffer); + + if (!$response->isOk()) { + throw new NsqException(); + } + } + if ($serverConfig->snappy) { $stream = new SnappyStream($stream, $buffer->flush()); diff --git a/src/Stream/SocketStream.php b/src/Stream/SocketStream.php index 2036f1e..4fd87e6 100644 --- a/src/Stream/SocketStream.php +++ b/src/Stream/SocketStream.php @@ -5,15 +5,16 @@ declare(strict_types=1); namespace Nsq\Stream; use Amp\Promise; +use Amp\Socket\ClientTlsContext; use Amp\Socket\ConnectContext; -use Amp\Socket\Socket; +use Amp\Socket\EncryptableSocket; use Nsq\Stream; use function Amp\call; use function Amp\Socket\connect; class SocketStream implements Stream { - public function __construct(private Socket $socket) + public function __construct(private EncryptableSocket $socket) { } @@ -37,6 +38,11 @@ class SocketStream implements Stream $context = $context->withTcpNoDelay(); } + $context = $context->withTlsContext( + (new ClientTlsContext('')) + ->withoutPeerVerification() + ); + return new self(yield connect($uri, $context)); }); } @@ -61,4 +67,12 @@ class SocketStream implements Stream { $this->socket->close(); } + + /** + * @return Promise + */ + public function setupTls(): Promise + { + return $this->socket->setupTls(); + } }