diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java index 790e4083d..fafa414c1 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java @@ -814,12 +814,24 @@ else if (msg instanceof HttpRequest) { } else { request.release(); + // HTTP/1.1 TLS Upgrade (RFC-2817) on empty requests (GET/HEAD/OPTIONS) + if (!isHttp2() && request.headers().contains(HttpHeaderNames.UPGRADE)) { + //force auto read to enable more accurate close selection now inbound is done + channel().config().setAutoRead(true); + onInboundComplete(); + } } if (isHttp2()) { //force auto read to enable more accurate close selection now inbound is done channel().config().setAutoRead(true); onInboundComplete(); } + else if (!isHttp2() && request.headers().contains(HttpHeaderNames.UPGRADE)) { + // HTTP/1.1 TLS Upgrade (RFC-2817) requests (GET/HEAD/OPTIONS) with empty / non-empty payload + // No need to call onInboundComplete(), it will be triggered by onInboundNext(...) + // since this is the last HTTP content + stopReadTimeout(); + } } } else if (msg instanceof LastHttpContent) { diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index 21c3e1b69..e5ca0883d 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -72,7 +72,9 @@ import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.compression.Brotli; import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; @@ -82,6 +84,7 @@ import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.logging.LogLevel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; @@ -101,7 +104,10 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.netty.BaseHttpTest; @@ -1662,6 +1668,104 @@ void testIssue632() throws Exception { assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); } + @Test + void testIssue3538() throws Exception { + disposableServer = + createServer() + .protocol(HttpProtocol.H2C, HttpProtocol.HTTP11) + .route(r -> r.get("/", (req, res) -> { + final EchoAction action = new EchoAction(); + + req + .receiveContent().switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT)) + .subscribe(action); + + return res.sendObject(action); + } + )) + .bindNow(); + assertThat(disposableServer).isNotNull(); + + final ByteBuf content = createHttpClientForContextWithPort() + .protocol(HttpProtocol.HTTP11) + .headers(h -> + h.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE) + .add(HttpHeaderNames.UPGRADE, "TLS/1.2")) + .get() + .uri("/") + .responseContent() + .blockLast(Duration.ofSeconds(30)); + + assertThat(content).isNull(); + } + + @Test + void testIssue3538GetWithPayload() throws Exception { + disposableServer = + createServer() + .protocol(HttpProtocol.H2C, HttpProtocol.HTTP11) + .route(r -> r.get("/", (req, res) -> { + final EchoAction action = new EchoAction(); + + req + .receiveContent().switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT)) + .subscribe(action); + + return res.sendObject(action); + } + )) + .bindNow(); + assertThat(disposableServer).isNotNull(); + + // The H2C max content length is 0 by default (no content is expected), + // so the request is rejected with HTTP/413 Content Too Large + StepVerifier.create(createHttpClientForContextWithPort() + .protocol(HttpProtocol.HTTP11) + .headers(h -> + h.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE) + .add(HttpHeaderNames.UPGRADE, "TLS/1.2")) + .request(HttpMethod.GET) + .send((req, res) -> res.sendString(Mono.just("testIssue3538"))) + .uri("/") + .response((r, buf) -> Mono.just(r.status().code()))) + .expectNextMatches(status -> status == 413) + .expectComplete() + .verify(Duration.ofSeconds(30)); + } + + @Test + void testIssue3538GetWithPayloadAndH2cMaxContentLength() throws Exception { + disposableServer = + createServer() + .protocol(HttpProtocol.H2C, HttpProtocol.HTTP11) + .httpRequestDecoder(spec -> spec.h2cMaxContentLength(100)) + .route(r -> r.get("/", (req, res) -> { + final EchoAction action = new EchoAction(); + + req + .receiveContent().switchIfEmpty(Mono.just(LastHttpContent.EMPTY_LAST_CONTENT)) + .subscribe(action); + + return res.sendObject(action); + } + )) + .bindNow(); + assertThat(disposableServer).isNotNull(); + + final ByteBuf content = createHttpClientForContextWithPort() + .protocol(HttpProtocol.HTTP11) + .headers(h -> + h.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE) + .add(HttpHeaderNames.UPGRADE, "TLS/1.2")) + .request(HttpMethod.GET) + .send((req, res) -> res.sendString(Mono.just("testIssue3538"))) + .uri("/") + .responseContent() + .blockLast(Duration.ofSeconds(30)); + + assertThat(content).isNotNull(); + } + @Test void testIssue694() { disposableServer = @@ -3513,4 +3617,30 @@ void testDeleteMethod(boolean chunked) { .expectComplete() .verify(Duration.ofSeconds(5)); } + + private static class EchoAction implements Publisher, Consumer { + private final Publisher sender; + private volatile FluxSink emitter; + + EchoAction() { + this.sender = Flux.create(emitter -> this.emitter = emitter); + } + + @Override + public void accept(HttpContent message) { + if (message.content().readableBytes() > 0) { + emitter.next(new DefaultHttpContent(message.content().retain())); + } + + if (message instanceof LastHttpContent) { + emitter.complete(); + } + } + + @Override + public void subscribe(Subscriber s) { + sender.subscribe(s); + } + + } }