Skip to content

Commit

Permalink
HTTP/3: Ensure HttpClient sends full request when the send function d…
Browse files Browse the repository at this point in the history
…oes not change NettyOutbound (#3536)
  • Loading branch information
violetagg authored Dec 9, 2024
1 parent 52d8efb commit bf66a1a
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 30 deletions.
166 changes: 138 additions & 28 deletions reactor-netty-http/src/http3Test/java/reactor/netty/http/Http3Tests.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.ssl.SniCompletionEvent;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.incubator.codec.http3.Http3DataFrame;
import io.netty.incubator.codec.http3.Http3HeadersFrame;
import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
import io.netty.incubator.codec.quic.QuicChannel;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -40,6 +46,7 @@
import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;
import reactor.netty.ByteBufFlux;
import reactor.netty.ByteBufMono;
import reactor.netty.DisposableServer;
import reactor.netty.LogTracker;
import reactor.netty.NettyPipeline;
Expand All @@ -58,6 +65,7 @@
import javax.net.ssl.SNIHostName;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -66,6 +74,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -365,34 +374,6 @@ void testGetRequest() {
.verify(Duration.ofSeconds(5));
}

@Test
void testHttpClientNoSecurityHttp3Fails() {
disposableServer =
createServer()
.handle((req, res) -> res.sendString(Mono.just("Hello")))
.bindNow();

createClient(disposableServer.port())
.noSSL()
.get()
.uri("/")
.responseContent()
.aggregate()
.asString()
.as(StepVerifier::create)
.verifyErrorMessage(HTTP3_WITHOUT_TLS_CLIENT);
}

@Test
void testHttpServerNoSecurityHttp3Fails() {
createServer()
.noSSL()
.handle((req, res) -> res.sendString(Mono.just("Hello")))
.bind()
.as(StepVerifier::create)
.verifyErrorMessage(HTTP3_WITHOUT_TLS_SERVER);
}

@Test
void testHttp3ForMemoryLeaks() {
disposableServer =
Expand Down Expand Up @@ -428,6 +409,64 @@ void testHttp3ForMemoryLeaks() {
System.gc();
}

@Test
void testHttpClientNoSecurityHttp3Fails() {
disposableServer =
createServer()
.handle((req, res) -> res.sendString(Mono.just("Hello")))
.bindNow();

createClient(disposableServer.port())
.noSSL()
.get()
.uri("/")
.responseContent()
.aggregate()
.asString()
.as(StepVerifier::create)
.verifyErrorMessage(HTTP3_WITHOUT_TLS_CLIENT);
}

@Test
void testHttpServerNoSecurityHttp3Fails() {
createServer()
.noSSL()
.handle((req, res) -> res.sendString(Mono.just("Hello")))
.bind()
.as(StepVerifier::create)
.verifyErrorMessage(HTTP3_WITHOUT_TLS_SERVER);
}

@Test
void testIssue3524Flux() {
// sends the message and then last http content
testRequestBody(sender -> sender.send((req, out) -> out.sendString(Flux.just("te", "st"))), 3);
}

@Test
void testIssue3524Mono() {
// sends "full" request
testRequestBody(sender -> sender.send((req, out) -> out.sendString(Mono.just("test"))), 1);
}

@Test
void testIssue3524MonoEmpty() {
// sends "full" request
testRequestBody(sender -> sender.send((req, out) -> Mono.empty()), 0);
}

@Test
void testIssue3524NoBody() {
// sends "full" request
testRequestBody(sender -> sender.send((req, out) -> out), 0);
}

@Test
void testIssue3524Object() {
// sends "full" request
testRequestBody(sender -> sender.send((req, out) -> out.sendObject(Unpooled.wrappedBuffer("test".getBytes(Charset.defaultCharset())))), 1);
}

@Test
void testMaxActiveStreamsCustomPool() throws Exception {
ConnectionProvider provider = ConnectionProvider.create("testMaxActiveStreamsCustomPool", 1);
Expand Down Expand Up @@ -604,6 +643,77 @@ void testMetrics() throws Exception {
}
}

@Test
void testMonoRequestBodySentAsFullRequest_Flux() {
// sends the message and then last http content
testRequestBody(sender -> sender.send(ByteBufFlux.fromString(Mono.just("test"))), 2);
}

@Test
void testMonoRequestBodySentAsFullRequest_Mono() {
// sends "full" request
testRequestBody(sender -> sender.send(ByteBufMono.fromString(Mono.just("test"))), 1);
}

@Test
void testMonoRequestBodySentAsFullRequest_MonoEmpty() {
// sends "full" request
testRequestBody(sender -> sender.send(Mono.empty()), 0);
}

@SuppressWarnings("FutureReturnValueIgnored")
private void testRequestBody(Function<HttpClient.RequestSender, HttpClient.ResponseReceiver<?>> sendFunction, int expectedMsg) {
disposableServer =
createServer().handle((req, res) -> req.receive()
.then(res.send()))
.bindNow(Duration.ofSeconds(30));

AtomicInteger counterHeaders = new AtomicInteger();
AtomicInteger counterData = new AtomicInteger();
sendFunction.apply(
createClient(disposableServer.port())
.port(disposableServer.port())
.doOnRequest((req, conn) -> {
ChannelPipeline pipeline = conn.channel().pipeline();
ChannelHandlerContext ctx = pipeline.context(NettyPipeline.LoggingHandler);
if (ctx != null) {
pipeline.addAfter(ctx.name(), "testRequestBody",
new ChannelOutboundHandlerAdapter() {
boolean done;

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.channel().closeFuture().addListener(f -> done = true);
super.handlerAdded(ctx);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (!done) {
if (msg instanceof Http3HeadersFrame) {
counterHeaders.getAndIncrement();
}
else if (msg instanceof Http3DataFrame) {
counterData.getAndIncrement();
}
}
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}
});
}
})
.post()
.uri("/"))
.responseContent()
.aggregate()
.asString()
.block(Duration.ofSeconds(30));

assertThat(counterHeaders.get()).isEqualTo(1);
assertThat(counterData.get()).isEqualTo(expectedMsg);
}

@Test
void testPostRequest() {
doTestPostRequest(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import static reactor.netty.ReactorNetty.format;
import static reactor.netty.ReactorNetty.getChannelContext;
import static reactor.netty.ReactorNetty.setChannelContext;
import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED;

/**
* An HTTP/3 implementation for pooled {@link ConnectionProvider}.
Expand Down Expand Up @@ -404,7 +403,6 @@ public void operationComplete(Future<QuicStreamChannel> future) {

ChannelOperations<?, ?> ops = ChannelOperations.get(ch);
if (ops != null) {
obs.onStateChange(ops, STREAM_CONFIGURED);
sink.success(ops);
}
}
Expand Down

0 comments on commit bf66a1a

Please sign in to comment.