diff --git a/src/Driver/Internal/Http3/Http3Parser.php b/src/Driver/Internal/Http3/Http3Parser.php index 8d59bb0c..4cc97ddc 100644 --- a/src/Driver/Internal/Http3/Http3Parser.php +++ b/src/Driver/Internal/Http3/Http3Parser.php @@ -4,6 +4,7 @@ use Amp\ByteStream\ReadableStream; use Amp\Cancellation; +use Amp\CancelledException; use Amp\DeferredCancellation; use Amp\Http\StructuredFields\Boolean; use Amp\Http\StructuredFields\Number; @@ -19,6 +20,7 @@ class Http3Parser { private ?QuicSocket $qpackDecodeStream = null; private ?QuicSocket $qpackEncodeStream = null; + private bool $receivedControlStream = false; private Queue $queue; /** @var array */ private array $datagramReceivers = []; @@ -42,13 +44,13 @@ public static function decodeVarint(string $string, int &$off): int --$off; return -1; } - return ($int << 8) + \ord($string[$off++]); + return (($int & 0x3F) << 8) + \ord($string[$off++]); case 0x80: if (\strlen($string) < $off + 3) { --$off; return -1; } - return ($int << 24) + (\ord($string[$off++]) << 16) + (\ord($string[$off++]) << 8) + \ord($string[$off++]); + return (($int & 0x3F) << 24) + (\ord($string[$off++]) << 16) + (\ord($string[$off++]) << 8) + \ord($string[$off++]); default: if (\strlen($string) < $off-- + 7) { return -1; @@ -297,8 +299,8 @@ private static function processHeaders(array $decoded): ?array return [$headers, $pseudo]; } - // I'm unable to suppress https://github.com/vimeo/psalm/issues/10669 - /* @return ConcurrentIterator */ + // Omitting it due to https://github.com/vimeo/psalm/issues/10002 + /* @return ConcurrentIterator */ public function process(): ConcurrentIterator { EventLoop::queue(function () { @@ -332,6 +334,11 @@ public function process(): ConcurrentIterator // unidirectional stream switch (Http3StreamType::tryFrom($type)) { case Http3StreamType::Control: + if ($this->receivedControlStream) { + throw new Http3ConnectionException("There must be only one control stream", Http3Error::H3_STREAM_CREATION_ERROR); + } + $this->receivedControlStream = true; + if (![$frame, $contents] = $this->readFullFrame($stream, $buf, $off, 0x1000)) { if (!$stream->getConnection()->isClosed()) { throw new Http3ConnectionException("The control stream was closed", Http3Error::H3_CLOSED_CRITICAL_STREAM); @@ -345,7 +352,7 @@ public function process(): ConcurrentIterator $this->parseSettings($contents); while (true) { - if (![$frame, $contents] = $this->readFullFrame($stream, $buf, $off, 0x100)) { + if (![$frame, $contents] = $this->readFullFrame($stream, $buf, $off, 0x1000)) { if (!$stream->getConnection()->isClosed()) { throw new Http3ConnectionException("The control stream was closed", Http3Error::H3_CLOSED_CRITICAL_STREAM); } @@ -385,14 +392,14 @@ public function process(): ConcurrentIterator // We don't do anything with these streams yet, but we must not close them according to RFC 9204 Section 4.2 case Http3StreamType::QPackEncode: if ($this->qpackEncodeStream) { - return; + throw new Http3ConnectionException("There must be only one QPACK encoding stream", Http3Error::H3_STREAM_CREATION_ERROR); } $this->qpackEncodeStream = $stream; break; case Http3StreamType::QPackDecode: if ($this->qpackDecodeStream) { - return; + throw new Http3ConnectionException("There must be only one QPACK decoding stream", Http3Error::H3_STREAM_CREATION_ERROR); } $this->qpackDecodeStream = $stream; break; @@ -455,22 +462,25 @@ private function datagramReceiver(): void $this->datagramReceiveEmpty = new DeferredCancellation; $cancellation = $this->datagramReceiveEmpty->getCancellation(); EventLoop::queue(function () use ($cancellation) { - while (null !== $buf = $this->connection->receive($cancellation)) { - $off = 0; - $quarterStreamId = self::decodeVarint($buf, $off); - if (isset($this->datagramReceivers[$quarterStreamId])) { - $this->datagramReceivers[$quarterStreamId]->resume(\substr($buf, $off)); - unset($this->datagramReceivers[$quarterStreamId]); + try { + while (null !== $buf = $this->connection->receive($cancellation)) { + $off = 0; + $quarterStreamId = self::decodeVarint($buf, $off); + if (isset($this->datagramReceivers[$quarterStreamId])) { + $this->datagramReceivers[$quarterStreamId]->resume(\substr($buf, $off)); + unset($this->datagramReceivers[$quarterStreamId]); + + if (!$this->datagramReceivers) { + return; + } - if (!$this->datagramReceivers) { - return; + // We need to await a tick to allow datagram receivers to request a new datagram to avoid needlessly discarding datagram frames + $suspension = EventLoop::getSuspension(); + EventLoop::queue($suspension->resume(...)); + $suspension->suspend(); } - - // We need to await a tick to allow datagram receivers to request a new datagram to avoid needlessly discarding datagram frames - $suspension = EventLoop::getSuspension(); - EventLoop::queue($suspension->resume(...)); - $suspension->suspend(); } + } catch (CancelledException) { } }); } @@ -489,7 +499,9 @@ public function receiveDatagram(QuicSocket $stream, ?Cancellation $cancellation if (!isset($this->datagramCloseHandlerInstalled[$quarterStreamId])) { $this->datagramCloseHandlerInstalled[$quarterStreamId] = true; $stream->onClose(function () use ($quarterStreamId) { - $this->datagramReceivers[$quarterStreamId]->resume(); + if (isset($this->datagramReceivers[$quarterStreamId])) { + $this->datagramReceivers[$quarterStreamId]->resume(); + } unset($this->datagramReceivers[$quarterStreamId], $this->datagramCloseHandlerInstalled[$quarterStreamId]); if (!$this->datagramReceivers) { $this->datagramReceiveEmpty->cancel(); diff --git a/src/Driver/Internal/Http3/Http3Writer.php b/src/Driver/Internal/Http3/Http3Writer.php index ff614d5e..cb40d9d0 100644 --- a/src/Driver/Internal/Http3/Http3Writer.php +++ b/src/Driver/Internal/Http3/Http3Writer.php @@ -48,6 +48,26 @@ public function sendData(QuicSocket $stream, string $payload): void self::sendKnownFrame($stream, Http3Frame::DATA, $payload); } + public function sendPriorityRequest(int $streamId, string $structuredPriorityData): void + { + self::sendKnownFrame($this->controlStream, Http3Frame::PRIORITY_UPDATE_Request, self::encodeVarint($streamId) . $structuredPriorityData); + } + + public function sendPriorityPush(int $streamId, string $structuredPriorityData): void + { + self::sendKnownFrame($this->controlStream, Http3Frame::PRIORITY_UPDATE_Push, self::encodeVarint($streamId) . $structuredPriorityData); + } + + public function sendMaxPushId(int $pushId): void + { + self::sendKnownFrame($this->controlStream, Http3Frame::MAX_PUSH_ID, self::encodeVarint($pushId)); + } + + public function sendCancelPush(int $pushId): void + { + self::sendKnownFrame($this->controlStream, Http3Frame::CANCEL_PUSH, self::encodeVarint($pushId)); + } + public function sendGoaway(int $highestStreamId): void { self::sendKnownFrame($this->controlStream, Http3Frame::GOAWAY, self::encodeVarint($highestStreamId)); diff --git a/src/Driver/Internal/Http3/QPack.php b/src/Driver/Internal/Http3/QPack.php index a8ca5ff4..10fdafa4 100644 --- a/src/Driver/Internal/Http3/QPack.php +++ b/src/Driver/Internal/Http3/QPack.php @@ -140,7 +140,7 @@ private static function decodeDynamicInteger(string $input, int $maxBits, int &$ $int += ($c & 0x7f) << $bitshift; $bitshift += 7; - if ($int > 2147483647) { + if ($int > 0x7FFFFFFF) { throw new QPackException(Http3Error::QPACK_DECOMPRESSION_FAILED, 'Invalid integer, too large'); } } while ($c & 0x80); diff --git a/test/Driver/Http3/Http3ParserTest.php b/test/Driver/Http3/Http3ParserTest.php index 1f47278d..0bc049db 100644 --- a/test/Driver/Http3/Http3ParserTest.php +++ b/test/Driver/Http3/Http3ParserTest.php @@ -2,14 +2,19 @@ namespace Amp\Http\Server\Test\Driver\Http3; +use Amp\CancelledException; +use Amp\DeferredCancellation; +use Amp\Http\Server\Driver\Internal\Http3\Http3ConnectionException; use Amp\Http\Server\Driver\Internal\Http3\Http3Frame; use Amp\Http\Server\Driver\Internal\Http3\Http3Parser; +use Amp\Http\Server\Driver\Internal\Http3\Http3StreamType; use Amp\Http\Server\Driver\Internal\Http3\Http3Writer; use Amp\Http\Server\Driver\Internal\Http3\QPack; use Amp\Pipeline\ConcurrentIterator; use Amp\Quic\Pair\PairConnection; use Amp\Quic\QuicSocket; use PHPUnit\Framework\TestCase; +use Revolt\EventLoop; class Http3ParserTest extends TestCase { private int $paddingIndex = 0; @@ -25,8 +30,8 @@ public function insertPaddingFrame(QuicSocket $stream, $size = 1): void Http3Writer::sendFrame($stream, self::PADDING_INDICES[$this->paddingIndex++ % count(self::PADDING_INDICES)], str_repeat("a", $size)); } - /** @return list{PairConnection, PairConnection, Http3Parser, Http3Writer, ConcurrentIterator, \Generator, QuicSocket, QuicSocket} */ - public function runParsingRequest($endEarly = false, $sendSingleBytes = false) + /** @return list{PairConnection, PairConnection, Http3Parser, Http3Writer, ConcurrentIterator, \Generator, QuicSocket, QuicSocket, QPack} */ + public function runParsingRequest($endEarly = false, $sendSingleBytes = false): array { [$server, $client] = PairConnection::createPair(); @@ -72,39 +77,58 @@ public function runParsingRequest($endEarly = false, $sendSingleBytes = false) $this->assertSame(["header1" => ["a", "b"]], $headers); $this->assertSame([":method" => "GET"], $pseudo); - var_dump("!"); - if ($endEarly) { $req->end(); $generator->next(); $this->assertFalse($generator->valid()); } - return [$server, $client, $parser, $writer, $processor, $generator, $req, $stream]; + return [$server, $client, $parser, $writer, $processor, $generator, $req, $stream, $qpack]; } - public function testParsingRequest() + public function testParsingRequest(): void { $this->runParsingRequest(endEarly: true); } - public function testRequestWithData() + public function testRequestWithData(): void { - [$server, $client, $parser, $writer, $processor, $generator, $req, $stream] = $this->runParsingRequest(); + [$server, $client, $parser, $writer, $processor, $generator, $req, $stream, $qpack] = $this->runParsingRequest(); + + Http3Writer::sendFrame($req, Http3Frame::PUSH_PROMISE->value, Http3Writer::encodeVarint(2) . $qpack->encode([["header", "value"]])); + $generator->next(); + $this->assertSame(Http3Frame::PUSH_PROMISE, $generator->key()); + $this->assertSame([2, [["header" => ["value"]], []]], $generator->current()); + + Http3Writer::sendFrame($req, Http3Frame::PUSH_PROMISE->value, Http3Writer::encodeVarint(3) . $qpack->encode([["header", "other"]])); + $generator->next(); + $this->assertSame(Http3Frame::PUSH_PROMISE, $generator->key()); + $this->assertSame([3, [["header" => ["other"]], []]], $generator->current()); + $writer->sendData($req, "some"); $generator->next(); $this->assertSame(Http3Frame::DATA, $generator->key()); $this->assertSame("some", $generator->current()); + Http3Writer::sendFrame($req, Http3Frame::PUSH_PROMISE->value, Http3Writer::encodeVarint(4) . $qpack->encode([[":header", "other"]])); + $generator->next(); + $this->assertSame(Http3Frame::PUSH_PROMISE, $generator->key()); + $this->assertSame([4, [[], [":header" => "other"]]], $generator->current()); + $req->resetSending(); $generator->next(); $this->assertNull($generator->key()); } - public function testSendingSingleBytes() + public function testSendingSingleBytes(): void { - [$server, $client, $parser, $writer, $processor, $generator, $req, $stream] = $this->runParsingRequest(sendSingleBytes: true); + [$server, $client, $parser, $writer, $processor, $generator, $req, $stream, $qpack] = $this->runParsingRequest(sendSingleBytes: true); + + $sendFuture = \Amp\async(function () use ($writer, $qpack, $req) { + $this->insertPaddingFrame($req); + $this->insertPaddingFrame($req); + + $writer->sendHeaderFrame($req, $qpack->encode([["header", "value"]])); - $sendFuture = \Amp\async(function () use ($writer, $req) { $this->insertPaddingFrame($req); $this->insertPaddingFrame($req); @@ -114,6 +138,11 @@ public function testSendingSingleBytes() $writer->sendData($req, "d"); }); + // multiple leading headers are allowed + $generator->next(); + $this->assertSame(Http3Frame::HEADERS, $generator->key()); + $this->assertSame([["header" => ["value"]], []], $generator->current()); + $generator->next(); $this->assertSame(Http3Frame::DATA, $generator->key()); $this->assertSame("a", $generator->current()); @@ -140,4 +169,150 @@ public function testSendingSingleBytes() $this->assertNull($generator->key()); $this->assertNull($sendFuture->await()); } + + function testTrailers() { + [$server, $client, $parser, $writer, $processor, $generator, $req, $stream, $qpack] = $this->runParsingRequest(); + + $writer->sendData($req, "abc"); + + $generator->next(); + $this->assertSame(Http3Frame::DATA, $generator->key()); + $this->assertSame("abc", $generator->current()); + + $writer->sendHeaderFrame($req, $qpack->encode([["header", "value"]])); + + $generator->next(); + $this->assertSame(Http3Frame::HEADERS, $generator->key()); + $this->assertSame([["header" => ["value"]], []], $generator->current()); + + Http3Writer::sendFrame($req, Http3Frame::PUSH_PROMISE->value, Http3Writer::encodeVarint(2) . $qpack->encode([["header", "value"]])); + $generator->next(); + $this->assertSame(Http3Frame::PUSH_PROMISE, $generator->key()); + $this->assertSame([2, [["header" => ["value"]], []]], $generator->current()); + + Http3Writer::sendFrame($req, Http3Frame::PUSH_PROMISE->value, Http3Writer::encodeVarint(3) . $qpack->encode([["header", "other"]])); + $generator->next(); + $this->assertSame(Http3Frame::PUSH_PROMISE, $generator->key()); + $this->assertSame([3, [["header" => ["other"]], []]], $generator->current()); + + $writer->sendData($req, "abc"); + + try { + $generator->next(); + } catch (Http3ConnectionException $e) { + } + + if (!isset($e)) { + $this->fail("Message did not reject disallowed DATA frame after trailing headers"); + } + } + + function testUnidirectionalStreams() { + [$server, $client] = PairConnection::createPair(); + + $parser = new Http3Parser($server, 0x1000, new QPack); + + $writer = new Http3Writer($client, $sentSettings = [10 => 20, 11 => 30]); + + $writer->sendPriorityPush(1, "foo"); + + $processor = $parser->process(); + + $processor->continue(); + $this->assertSame([Http3Frame::SETTINGS, $sentSettings], $processor->getValue()); + + $processor->continue(); + $this->assertSame([Http3Frame::PRIORITY_UPDATE_Push, 1, "foo"], $processor->getValue()); + + $writer->sendPriorityRequest(2, "bar"); + $processor->continue(); + $this->assertSame([Http3Frame::PRIORITY_UPDATE_Request, 2, "bar"], $processor->getValue()); + + $writer->sendMaxPushId(3); + $processor->continue(); + $this->assertSame([Http3Frame::MAX_PUSH_ID, 3], $processor->getValue()); + + $writer->sendCancelPush(4); + $processor->continue(); + $this->assertSame([Http3Frame::CANCEL_PUSH, 4], $processor->getValue()); + + $writer->sendGoaway(5); + $processor->continue(); + $this->assertSame([Http3Frame::GOAWAY, 5], $processor->getValue()); + + // Accepted and handled internally + $qpackEncode = $client->openStream(); + $qpackEncode->endReceiving(); + $qpackEncode->write(Http3Writer::encodeVarint(Http3StreamType::QPackEncode->value)); + $qpackDecode = $client->openStream(); + $qpackDecode->endReceiving(); + $qpackDecode->write(Http3Writer::encodeVarint(Http3StreamType::QPackDecode->value)); + + $custom = $client->openStream(); + $custom->endReceiving(); + $custom->write(Http3Writer::encodeVarint(5) . "body"); + + $processor->continue(); + [$type, $buf, $stream] = $processor->getValue(); + $this->assertSame(5, $type); + $this->assertSame("body", $buf); + + $custom->write("more data"); + $this->assertSame("more data", $stream->read()); + + $stream = $client->openStream(); + $stream->endReceiving(); + $stream->write(Http3Writer::encodeVarint(Http3StreamType::Control->value)); + try { + $processor->continue(); + } catch (Http3ConnectionException $e) { + } + if (!isset($e)) { + $this->fail("There must be only one control stream"); + } + } + + public function testParsePriority() { + $this->assertSame([7, true], Http3Parser::parsePriority("u=7, i")); + $this->assertSame([3, true], Http3Parser::parsePriority("u=8, i")); + $this->assertSame([3, false], Http3Parser::parsePriority("u=-1, i=1")); + $this->assertSame([0, false], Http3Parser::parsePriority("u=0, i=foo")); + } + + public function testDatagram() { + [$server, $client] = PairConnection::createPair(); + + $clientStream = $client->openStream(); + $clientStream->write(""); // force open + $serverStream = $server->accept(); + + $clientStream2 = $client->openStream(); + $clientStream2->write(""); // force open + $serverStream2 = $server->accept(); + + $parser = new Http3Parser($server, 0x1000, new QPack); + $writer = new Http3Writer($client, []); + + $writer->writeDatagram($clientStream, "some data"); + $writer->writeDatagram($clientStream, "more data"); + $this->assertSame("some data", $parser->receiveDatagram($serverStream)); + $this->assertSame("more data", $parser->receiveDatagram($serverStream)); + + EventLoop::queue(fn() => $writer->writeDatagram($clientStream2, "second")); + $this->assertSame("second", $parser->receiveDatagram($serverStream2)); + + $cancel = new DeferredCancellation; + EventLoop::queue($cancel->cancel(...)); + try { + $parser->receiveDatagram($serverStream2, $cancel->getCancellation()); + } catch (CancelledException $e) { + } + if (!isset($e)) { + $this->fail("The datagram wasn't cancelled"); + } + + EventLoop::queue(fn() => $clientStream->close()); + $this->assertNull($parser->receiveDatagram($serverStream)); + $this->assertNull($parser->receiveDatagram($serverStream)); + } }