Skip to content

Commit

Permalink
Cancel request reading upon shutdown
Browse files Browse the repository at this point in the history
Fixes #367 and #370.
  • Loading branch information
trowski committed Dec 16, 2024
1 parent c74232a commit 027bd02
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 10 deletions.
42 changes: 32 additions & 10 deletions src/Driver/Http1Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ public function handleClient(
$this->insertTimeout();

$headerSizeLimit = $this->headerSizeLimit;
$cancellation = $this->deferredCancellation->getCancellation();

try {
$buffer = $readableStream->read();
$buffer = $readableStream->read($cancellation);
if ($buffer === null) {
$this->removeTimeout();
return;
Expand Down Expand Up @@ -141,7 +142,7 @@ public function handleClient(
);
}

$chunk = $readableStream->read();
$chunk = $readableStream->read($cancellation);
if ($chunk === null) {
return;
}
Expand Down Expand Up @@ -413,7 +414,8 @@ public function handleClient(
$this->suspendTimeout();

$this->currentBuffer = $buffer;
$this->handleRequest($request);
$this->pendingResponse = async($this->handleRequest(...), $request);
$this->pendingResponse->await();
$this->pendingResponseCount--;

continue;
Expand Down Expand Up @@ -486,7 +488,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
);
}

$chunk = $this->readableStream->read();
$chunk = $this->readableStream->read($cancellation);
if ($chunk === null) {
return;
}
Expand Down Expand Up @@ -514,7 +516,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {

if ($chunkLengthRemaining === 0) {
while (!isset($buffer[1])) {
$chunk = $readableStream->read();
$chunk = $readableStream->read($cancellation);
if ($chunk === null) {
return;
}
Expand Down Expand Up @@ -546,7 +548,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
);
}

$chunk = $this->readableStream->read();
$chunk = $this->readableStream->read($cancellation);
if ($chunk === null) {
return;
}
Expand Down Expand Up @@ -599,7 +601,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
$remaining -= $bodyBufferSize;
}

$body = $readableStream->read();
$body = $readableStream->read($cancellation);
if ($body === null) {
return;
}
Expand Down Expand Up @@ -635,7 +637,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
$bufferLength = \strlen($buffer);

if (!$bufferLength) {
$chunk = $readableStream->read();
$chunk = $readableStream->read($cancellation);
if ($chunk === null) {
return;
}
Expand All @@ -647,7 +649,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
// These first two (extreme) edge cases prevent errors where the packet boundary ends after
// the \r and before the \n at the end of a chunk.
if ($bufferLength === $chunkLengthRemaining || $bufferLength === $chunkLengthRemaining + 1) {
$chunk = $readableStream->read();
$chunk = $readableStream->read($cancellation);
if ($chunk === null) {
return;
}
Expand Down Expand Up @@ -704,7 +706,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
$bodySize += $bodyBufferSize;
}

$chunk = $readableStream->read();
$chunk = $readableStream->read($cancellation);
if ($chunk === null) {
return;
}
Expand Down Expand Up @@ -756,6 +758,12 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
}
} catch (StreamException) {
// Client disconnected, finally block will clean up.
} catch (CancelledException) {
// Server shutting down.
if ($this->bodyQueue === null || !$this->pendingResponseCount) {
// Send a service unavailable response only if another response has not already been sent.
$this->sendServiceUnavailableResponse($request ?? null)->await();
}
} finally {
$this->pendingResponse->finally(function (): void {
$this->removeTimeout();
Expand Down Expand Up @@ -1023,6 +1031,19 @@ private function upgrade(Request $request, Response $response): void
}
}

/**
* Creates a service unavailable response from the error handler and sends that response to the client.
*
* @return Future<void>
*/
private function sendServiceUnavailableResponse(?Request $request): Future
{
$response = $this->errorHandler->handleError(HttpStatus::SERVICE_UNAVAILABLE, request: $request);
$response->setHeader("connection", "close");

return $this->lastWrite = async($this->send(...), $this->lastWrite, $response);
}

/**
* Creates an error response from the error handler and sends that response to the client.
*
Expand Down Expand Up @@ -1062,6 +1083,7 @@ public function stop(): void

$this->pendingResponse->await();
$this->lastWrite?->await();
$this->deferredCancellation->cancel();
}

public function getApplicationLayerProtocols(): array
Expand Down
36 changes: 36 additions & 0 deletions test/Driver/Http1DriverTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -1209,4 +1209,40 @@ public function testTimeoutSuspendedDuringRequestHandler(): void

self::assertStringStartsWith('HTTP/1.1 202', $output->buffer());
}

public function testShutdownDuringRequestRead(): void
{
$driver = new Http1Driver(
new ClosureRequestHandler(fn () => self::fail('Request handler not expected to be called')),
$this->createMock(ErrorHandler::class),
new NullLogger,
);

$input = new Queue();
$input->pushAsync(
// Insufficient request headers
"POST /post HTTP/1.1\r\n" .
"Host: localhost\r\n" .
"Content-Length: 100\r\n"
);

$output = new WritableBuffer();

async(fn () => $driver->handleClient(
$this->createClientMock(),
new ReadableIterableStream($input->iterate()),
$output,
));

delay(0.1); // Allow parser generator to run.

$driver->stop();

delay(0.1); // Give time for cancellation to be processed.

$input->complete();
$output->close();

self::assertStringStartsWith('HTTP/1.0 503', $output->buffer());
}
}

0 comments on commit 027bd02

Please sign in to comment.