Skip to content

Commit

Permalink
Add Priority handling for HTTP/3
Browse files Browse the repository at this point in the history
  • Loading branch information
bwoebi committed Jan 13, 2024
1 parent eb70ad6 commit a3dbb24
Show file tree
Hide file tree
Showing 15 changed files with 858 additions and 6 deletions.
15 changes: 15 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"amphp/http-client": "^5",
"amphp/log": "^2",
"amphp/php-cs-fixer-config": "^2",
"httpwg/structured-field-tests": "1.0",
"league/uri-components": "^2.4.2 | ^7.1",
"monolog/monolog": "^3",
"phpunit/phpunit": "^9",
Expand Down Expand Up @@ -82,6 +83,20 @@
"test": "@php -dzend.assertions=1 -dassert.exception=1 ./vendor/bin/phpunit",
"code-style": "@php ./vendor/bin/php-cs-fixer fix"
},
"repositories": [
{
"type": "package",
"package": {
"name": "httpwg/structured-field-tests",
"version": "1.0",
"source": {
"url": "https://github.com/httpwg/structured-field-tests",
"type": "git",
"reference": "origin/main"
}
}
}
],
"config": {
"allow-plugins": {
"ocramius/package-versions": false
Expand Down
26 changes: 23 additions & 3 deletions src/Driver/Http3Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace Amp\Http\Server\Driver;

use Amp\ByteStream\ClosedException;
use Amp\ByteStream\ReadableIterableStream;
use Amp\CancelledException;
use Amp\DeferredCancellation;
Expand Down Expand Up @@ -255,7 +254,7 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti
}

$trailerDeferred = new DeferredFuture;
$bodyQueue = new Queue();
$bodyQueue = new Queue;

try {
$trailers = new Trailers(
Expand Down Expand Up @@ -296,6 +295,10 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti
$expectedLength = null;
}

if (isset($headers["priority"])) {
$this->updatePriority($stream, $headers["priority"]);
}

$dataSuspension = null;
$body = new RequestBody(
new ReadableIterableStream($bodyQueue->pipe()),
Expand Down Expand Up @@ -439,6 +442,18 @@ function (int $bodySize) use (&$bodySizeLimit, &$dataSuspension) {
$parser->abort(new Http3ConnectionException("A push stream must not be initiated by the client", Http3Error::H3_STREAM_CREATION_ERROR));
break;

case Http3Frame::PRIORITY_UPDATE_Request:
[, $streamId, $structuredUpdate] = $frame;
// The RFC says we _should_ temporarily buffer unknown stream ids. We currently don't for simplicity. To eventually improve?
if ($stream = $connection->getStream($streamId)) {

Check failure on line 448 in src/Driver/Http3Driver.php

View workflow job for this annotation

GitHub Actions / PHP 8.2

PossiblyUndefinedMethod

src/Driver/Http3Driver.php:448:52: PossiblyUndefinedMethod: Method Amp\Socket\Socket::getStream does not exist (see https://psalm.dev/108)
$this->updatePriority($stream, $structuredUpdate);
}
break;

case Http3Frame::PRIORITY_UPDATE_Push:
$parser->abort(new Http3ConnectionException("No PRIORITY_UPDATE frame may be sent for unpromised push streams", Http3Error::H3_ID_ERROR));
break;

default:
$parser->abort(new Http3ConnectionException("An unexpected stream or frame was received", Http3Error::H3_FRAME_UNEXPECTED));
}
Expand All @@ -451,6 +466,12 @@ function (int $bodySize) use (&$bodySizeLimit, &$dataSuspension) {
}
}

public function updatePriority(QuicSocket $socket, array|string $headers)

Check failure on line 469 in src/Driver/Http3Driver.php

View workflow job for this annotation

GitHub Actions / PHP 8.2

MissingReturnType

src/Driver/Http3Driver.php:469:21: MissingReturnType: Method Amp\Http\Server\Driver\Http3Driver::updatePriority does not have a return type, expecting void (see https://psalm.dev/050)
{
[$urgency, $incremental] = Http3Parser::parsePriority($headers);
$socket->setPriority($urgency + 124 /* 127 is default for QUIC, 3 is default for HTTP */, $incremental);
}

public function getPendingRequestCount(): int
{
return $this->requestStreams->count();
Expand All @@ -473,7 +494,6 @@ public function stop(): void
$this->highestStreamId,
)) || true);


$outstanding = $this->requestStreams->count();
if ($outstanding === 0) {
$this->writer->close();
Expand Down
2 changes: 1 addition & 1 deletion src/Driver/Internal/Http3/Http3Frame.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ enum Http3Frame: int
case ORIGIN = 0x0c;
case MAX_PUSH_ID = 0x0d;
case PRIORITY_UPDATE_Request = 0xF0700;
case PRIORITY_UPDATE_Response = 0xF0701;
case PRIORITY_UPDATE_Push = 0xF0701;
}
35 changes: 33 additions & 2 deletions src/Driver/Internal/Http3/Http3Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
namespace Amp\Http\Server\Driver\Internal\Http3;

use Amp\Http\Http2\Http2Parser;
use Amp\Http\Server\Driver\Internal\Http3\Rfc8941\Boolean;
use Amp\Http\Server\Driver\Internal\Http3\Rfc8941\Number;
use Amp\Pipeline\ConcurrentIterator;
use Amp\Pipeline\Queue;
use Amp\Quic\QuicConnection;
Expand Down Expand Up @@ -310,7 +312,7 @@ public function process(): ConcurrentIterator
return;
}

if ($frame !== Http3Frame::GOAWAY || $frame !== Http3Frame::MAX_PUSH_ID || $frame !== Http3Frame::CANCEL_PUSH) {
if ($frame !== Http3Frame::GOAWAY && $frame !== Http3Frame::MAX_PUSH_ID && $frame !== Http3Frame::CANCEL_PUSH && $frame !== Http3Frame::PRIORITY_UPDATE_Request && $frame !== Http3Frame::PRIORITY_UPDATE_Push) {
throw new Http3ConnectionException("An unexpected frame was received on the control stream", Http3Error::H3_FRAME_UNEXPECTED);
}

Expand All @@ -321,7 +323,11 @@ public function process(): ConcurrentIterator
}
return;
}
$this->queue->push([$frame, $id]);
if ($frame === Http3Frame::PRIORITY_UPDATE_Request || $frame === Http3Frame::PRIORITY_UPDATE_Push) {
$this->queue->push([$frame, $id, \substr($contents, $tmpOff)]);
} else {
$this->queue->push([$frame, $id]);
}
}

// no break
Expand Down Expand Up @@ -368,6 +374,31 @@ public function process(): ConcurrentIterator
return $this->queue->iterate();
}

// Note: format is shared with HTTP/2
public static function parsePriority(array|string $headers): ?array
{
$urgency = 3;
$incremental = false;
if ($priority = Rfc8941::parseDictionary($headers)) {
if (isset($priority["u"])) {
$number = $priority["u"];
if ($number instanceof Number) {
$value = $number->item;
if (\is_int($value) && $value >= 0 && $value <= 7) {
$urgency = $number->item;
}
}
}
if (isset($priority["i"])) {
$bool = $priority["i"];
if ($bool instanceof Boolean) {
$incremental = $bool->item;
}
}
}
return [$urgency, $incremental];
}

public function abort(Http3ConnectionException $exception)
{
if (!$this->queue->isComplete()) {
Expand Down
Loading

0 comments on commit a3dbb24

Please sign in to comment.