From 3de5b3327b7c36312bccee58a3f5bb8be255e994 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 3 Dec 2024 19:37:09 +0200 Subject: [PATCH] Handle the incoming LastHttpContent with no content as if it is EMPTY_LAST_CONTENT When the incoming LastHttpContent is EMPTY_LAST_CONTENT, Reactor Netty directly completes the inbound. Handle the incoming LastHttpContent with no content in the same way instead of buffering it in FluxReceive. Adapt the tests - instead of counting the received content on the server, directly count the outbound data on the client. --- .../http/server/HttpServerOperations.java | 16 +++++++++-- .../java/reactor/netty/http/Http2Tests.java | 28 +++++++++++++++++-- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java index 8bf2238a4a..db7259e7ac 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java @@ -796,7 +796,13 @@ else if (msg == EMPTY_LAST_CONTENT) { handleLastHttpContent(); } else if (msgClass == DefaultLastHttpContent.class) { - super.onInboundNext(ctx, msg); + DefaultLastHttpContent lastHttpContent = (DefaultLastHttpContent) msg; + if (lastHttpContent.content().readableBytes() > 0) { + super.onInboundNext(ctx, msg); + } + else { + lastHttpContent.release(); + } handleLastHttpContent(); } else if (msgClass == DefaultHttpContent.class) { @@ -831,7 +837,13 @@ else if (msg instanceof HttpRequest) { } } else if (msg instanceof LastHttpContent) { - super.onInboundNext(ctx, msg); + LastHttpContent lastHttpContent = (LastHttpContent) msg; + if (lastHttpContent.content().readableBytes() > 0) { + super.onInboundNext(ctx, msg); + } + else { + lastHttpContent.release(); + } handleLastHttpContent(); } else { diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java b/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java index 2cca2a77f0..3b64aedfd8 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java @@ -17,7 +17,12 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2DataFrame; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.SelfSignedCertificate; @@ -375,25 +380,42 @@ void testMonoRequestBodySentAsFullRequest_Mono() { doTestMonoRequestBodySentAsFullRequest(ByteBufMono.fromString(Mono.just("test")), 1); } + @SuppressWarnings("FutureReturnValueIgnored") private void doTestMonoRequestBodySentAsFullRequest(Publisher body, int expectedMsg) { Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); Http2SslContextSpec clientCtx = Http2SslContextSpec.forClient() .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)); - AtomicInteger counter = new AtomicInteger(); disposableServer = createServer() .protocol(HttpProtocol.H2) .secure(spec -> spec.sslContext(serverCtx)) - .handle((req, res) -> req.receiveContent() - .doOnNext(httpContent -> counter.getAndIncrement()) + .handle((req, res) -> req.receive() .then(res.send())) .bindNow(Duration.ofSeconds(30)); + AtomicInteger counter = new AtomicInteger(); createClient(disposableServer.port()) .protocol(HttpProtocol.H2) .secure(spec -> spec.sslContext(clientCtx)) + .doOnRequest((req, conn) -> { + ChannelPipeline pipeline = conn.channel().parent().pipeline(); + ChannelHandlerContext ctx = pipeline.context(Http2FrameCodec.class); + if (ctx != null) { + pipeline.addAfter(ctx.name(), "testMonoRequestBodySentAsFullRequest", + new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg instanceof Http2DataFrame) { + counter.getAndIncrement(); + } + //"FutureReturnValueIgnored" this is deliberate + ctx.write(msg, promise); + } + }); + } + }) .post() .uri("/") .send(body)