Remove unnecessary Producer::run and Consumer::run

This commit is contained in:
2021-09-18 22:53:02 +03:00
parent 97b3d8206c
commit 2c07fb1aca
2 changed files with 73 additions and 83 deletions

View File

@@ -77,12 +77,6 @@ final class Consumer extends Connection
return call(function (): \Generator { return call(function (): \Generator {
yield parent::connect(); yield parent::connect();
$this->run();
});
}
private function run(): void
{
$buffer = new Buffer(); $buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator { asyncCall(function () use ($buffer): \Generator {
@@ -135,6 +129,7 @@ final class Consumer extends Connection
$this->close(false); $this->close(false);
}); });
}); });
});
} }
/** /**

View File

@@ -57,7 +57,33 @@ final class Producer extends Connection
return call(function (): \Generator { return call(function (): \Generator {
yield parent::connect(); yield parent::connect();
$this->run(); $buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->write(Command::nop());
}
// Ok received
break;
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
default:
throw new NsqException('Unreachable statement.');
}
}
}
$this->close(false);
});
}); });
} }
@@ -87,35 +113,4 @@ final class Producer extends Connection
return $this->write($command); return $this->write($command);
} }
private function run(): void
{
$buffer = new Buffer();
asyncCall(function () use ($buffer): \Generator {
while (null !== $chunk = yield $this->read()) {
$buffer->append($chunk);
while ($frame = Parser::parse($buffer)) {
switch (true) {
case $frame instanceof Frame\Response:
if ($frame->isHeartBeat()) {
yield $this->write(Command::nop());
}
// Ok received
break;
case $frame instanceof Frame\Error:
$this->handleError($frame);
break;
default:
throw new NsqException('Unreachable statement.');
}
}
}
$this->close(false);
});
}
} }