diff --git a/src/Connection.php b/src/Connection.php index 394b1c2..7b41152 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -5,10 +5,13 @@ declare(strict_types=1); namespace Nsq; use Composer\InstalledVersions; +use Nsq\Exception\ConnectionFail; +use Nsq\Exception\UnexpectedResponse; use PHPinnacle\Buffer\ByteBuffer; use Psr\Log\LoggerAwareTrait; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; +use Socket\Raw\Exception; use Socket\Raw\Factory; use Socket\Raw\Socket; use Throwable; @@ -65,7 +68,12 @@ abstract class Connection public function connect(): void { - $this->socket = (new Factory())->createClient($this->address); + try { + $this->socket = (new Factory())->createClient($this->address); + } catch (Exception $e) { + throw ConnectionFail::fromThrowable($e); + } + $this->send(' V2'); $body = json_encode($this->features, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT); @@ -121,12 +129,12 @@ abstract class Connection try { $socket->write($buffer); - } catch (Throwable $e) { + } catch (Exception $e) { $this->closed = true; $this->logger->error($e->getMessage(), ['exception' => $e]); - throw $e; + throw ConnectionFail::fromThrowable($e); } return $this; @@ -162,7 +170,7 @@ abstract class Connection $response = $this->receive(0.1); if (null === $response) { - throw new Exception('Response was expected, but null received.'); + throw new UnexpectedResponse('Response was expected, but null received.'); } return $response; @@ -171,7 +179,7 @@ abstract class Connection private function socket(): Socket { if ($this->closed) { - throw new Exception('This connection is closed, create new one.'); + throw new ConnectionFail('This connection is closed, create new one.'); } if (null === $this->socket) { diff --git a/src/Exception.php b/src/Exception.php deleted file mode 100644 index a9d429f..0000000 --- a/src/Exception.php +++ /dev/null @@ -1,9 +0,0 @@ -getMessage(), (int) $throwable->getCode(), $throwable); + } +} diff --git a/src/Exception/MessageAlreadyFinished.php b/src/Exception/MessageAlreadyFinished.php new file mode 100644 index 0000000..e91b938 --- /dev/null +++ b/src/Exception/MessageAlreadyFinished.php @@ -0,0 +1,26 @@ +finished) { - throw new Exception('Can\'t finish message as it already finished.'); + throw MessageAlreadyFinished::finish($this); } $this->consumer->fin($this->id); @@ -58,7 +60,7 @@ final class Message public function requeue(int $timeout): void { if ($this->finished) { - throw new Exception('Can\'t requeue message as it already finished.'); + throw MessageAlreadyFinished::requeue($this); } $this->consumer->req($this->id, $timeout); @@ -68,7 +70,7 @@ final class Message public function touch(): void { if ($this->finished) { - throw new Exception('Can\'t touch message as it already finished.'); + throw MessageAlreadyFinished::touch($this); } $this->consumer->touch($this->id); diff --git a/src/Response.php b/src/Response.php index 2ca643c..e09a9ab 100644 --- a/src/Response.php +++ b/src/Response.php @@ -4,6 +4,8 @@ declare(strict_types=1); namespace Nsq; +use Nsq\Exception\NsqError; +use Nsq\Exception\UnexpectedResponse; use PHPinnacle\Buffer\ByteBuffer; final class Response @@ -27,15 +29,15 @@ final class Response public function okOrFail(): void { if (self::TYPE_ERROR === $this->type) { - throw new Exception($this->buffer->bytes()); + throw new NsqError($this->buffer->bytes()); } if (self::TYPE_RESPONSE !== $this->type) { - throw new Exception(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type)); + throw new UnexpectedResponse(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type)); } if (self::OK !== $this->buffer->bytes()) { - throw new Exception(sprintf('OK response expected, but "%s" received.', $this->buffer->bytes())); + throw new UnexpectedResponse(sprintf('OK response expected, but "%s" received.', $this->buffer->bytes())); } } @@ -47,7 +49,7 @@ final class Response public function toMessage(Consumer $reader): Message { if (self::TYPE_MESSAGE !== $this->type) { - throw new Exception(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type)); + throw new UnexpectedResponse(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type)); } $buffer = new ByteBuffer($this->buffer->bytes()); diff --git a/src/Subscriber.php b/src/Subscriber.php index 529ccb3..cda1a69 100644 --- a/src/Subscriber.php +++ b/src/Subscriber.php @@ -5,6 +5,7 @@ declare(strict_types=1); namespace Nsq; use Generator; +use InvalidArgumentException; use function get_debug_type; use function sprintf; @@ -41,7 +42,7 @@ final class Subscriber $newTimeout = yield null; if (!\is_float($newTimeout)) { - throw new Exception(sprintf('Timeout must be float, "%s" given.', get_debug_type($newTimeout))); + throw new InvalidArgumentException(sprintf('Timeout must be float, "%s" given.', get_debug_type($newTimeout))); } $timeout = $newTimeout; diff --git a/tests/MessageTest.php b/tests/MessageTest.php index d767a36..b67864f 100644 --- a/tests/MessageTest.php +++ b/tests/MessageTest.php @@ -3,7 +3,7 @@ declare(strict_types=1); use Nsq\Consumer; -use Nsq\Exception; +use Nsq\Exception\MessageAlreadyFinished; use Nsq\Message; use PHPUnit\Framework\TestCase; @@ -20,7 +20,7 @@ final class MessageTest extends TestCase self::assertTrue($message->isFinished()); - $this->expectException(Exception::class); + $this->expectException(MessageAlreadyFinished::class); $this->expectExceptionMessage('Can\'t finish message as it already finished.'); $message->finish(); @@ -37,7 +37,7 @@ final class MessageTest extends TestCase self::assertTrue($message->isFinished()); - $this->expectException(Exception::class); + $this->expectException(MessageAlreadyFinished::class); $this->expectExceptionMessage('Can\'t requeue message as it already finished.'); $message->requeue(5); @@ -52,7 +52,7 @@ final class MessageTest extends TestCase $message->finish(); - $this->expectException(Exception::class); + $this->expectException(MessageAlreadyFinished::class); $this->expectExceptionMessage('Can\'t touch message as it already finished.'); $message->touch(); diff --git a/tests/ProducerTest.php b/tests/ProducerTest.php index f930986..e7f728e 100644 --- a/tests/ProducerTest.php +++ b/tests/ProducerTest.php @@ -2,6 +2,7 @@ declare(strict_types=1); +use Nsq\Exception\NsqError; use Nsq\Producer; use PHPUnit\Framework\TestCase; @@ -12,7 +13,7 @@ final class ProducerTest extends TestCase */ public function testPubFail(string $topic, string $body, string $exceptionMessage): void { - $this->expectException(Exception::class); + $this->expectException(NsqError::class); $this->expectExceptionMessage($exceptionMessage); $producer = new Producer('tcp://localhost:4150'); diff --git a/tests/SubscriberTest.php b/tests/SubscriberTest.php index 3136750..c413ed1 100644 --- a/tests/SubscriberTest.php +++ b/tests/SubscriberTest.php @@ -34,7 +34,7 @@ final class SubscriberTest extends TestCase public function testInvalidChangeInterval(): void { - $this->expectException(\Nsq\Exception::class); + $this->expectException(InvalidArgumentException::class); $this->expectExceptionMessage('Timeout must be float, "string" given.'); $generator = $this->subscriber->subscribe(__FUNCTION__, __FUNCTION__);