type = $buffer->consumeUint32(); $this->buffer = $buffer; } public function okOrFail(): void { if (self::TYPE_ERROR === $this->type) { throw new Exception($this->buffer->bytes()); } if (self::TYPE_RESPONSE !== $this->type) { throw new Exception(sprintf('"%s" type expected, but "%s" received.', self::TYPE_RESPONSE, $this->type)); } if (self::OK !== $this->buffer->bytes()) { throw new Exception(sprintf('OK response expected, but "%s" received.', $this->buffer->bytes())); } } public function isHeartBeat(): bool { return self::TYPE_RESPONSE === $this->type && self::HEARTBEAT === $this->buffer->bytes(); } public function toMessage(Consumer $reader): Message { if (self::TYPE_MESSAGE !== $this->type) { throw new Exception(sprintf('Expecting "%s" type, but NSQ return: "%s"', self::TYPE_MESSAGE, $this->type)); } $buffer = new ByteBuffer($this->buffer->bytes()); $timestamp = $buffer->consumeInt64(); $attempts = $buffer->consumeUint16(); $id = $buffer->consume(Bytes::BYTES_ID); $body = $buffer->flush(); return new Message($timestamp, $attempts, $id, $body, $reader); } }