Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP/3: Ensure HttpClient sends full request when the send function does not change NettyOutbound #3536

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 138 additions & 28 deletions reactor-netty-http/src/http3Test/java/reactor/netty/http/Http3Tests.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.ssl.SniCompletionEvent;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.incubator.codec.http3.Http3DataFrame;
import io.netty.incubator.codec.http3.Http3HeadersFrame;
import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
import io.netty.incubator.codec.quic.QuicChannel;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -40,6 +46,7 @@
import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;
import reactor.netty.ByteBufFlux;
import reactor.netty.ByteBufMono;
import reactor.netty.DisposableServer;
import reactor.netty.LogTracker;
import reactor.netty.NettyPipeline;
Expand All @@ -58,6 +65,7 @@
import javax.net.ssl.SNIHostName;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -66,6 +74,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -365,34 +374,6 @@ void testGetRequest() {
.verify(Duration.ofSeconds(5));
}

@Test
void testHttpClientNoSecurityHttp3Fails() {
disposableServer =
createServer()
.handle((req, res) -> res.sendString(Mono.just("Hello")))
.bindNow();

createClient(disposableServer.port())
.noSSL()
.get()
.uri("/")
.responseContent()
.aggregate()
.asString()
.as(StepVerifier::create)
.verifyErrorMessage(HTTP3_WITHOUT_TLS_CLIENT);
}

@Test
void testHttpServerNoSecurityHttp3Fails() {
createServer()
.noSSL()
.handle((req, res) -> res.sendString(Mono.just("Hello")))
.bind()
.as(StepVerifier::create)
.verifyErrorMessage(HTTP3_WITHOUT_TLS_SERVER);
}

@Test
void testHttp3ForMemoryLeaks() {
disposableServer =
Expand Down Expand Up @@ -428,6 +409,64 @@ void testHttp3ForMemoryLeaks() {
System.gc();
}

@Test
void testHttpClientNoSecurityHttp3Fails() {
disposableServer =
createServer()
.handle((req, res) -> res.sendString(Mono.just("Hello")))
.bindNow();

createClient(disposableServer.port())
.noSSL()
.get()
.uri("/")
.responseContent()
.aggregate()
.asString()
.as(StepVerifier::create)
.verifyErrorMessage(HTTP3_WITHOUT_TLS_CLIENT);
}

@Test
void testHttpServerNoSecurityHttp3Fails() {
createServer()
.noSSL()
.handle((req, res) -> res.sendString(Mono.just("Hello")))
.bind()
.as(StepVerifier::create)
.verifyErrorMessage(HTTP3_WITHOUT_TLS_SERVER);
}

@Test
void testIssue3524Flux() {
// sends the message and then last http content
testRequestBody(sender -> sender.send((req, out) -> out.sendString(Flux.just("te", "st"))), 3);
}

@Test
void testIssue3524Mono() {
// sends "full" request
testRequestBody(sender -> sender.send((req, out) -> out.sendString(Mono.just("test"))), 1);
}

@Test
void testIssue3524MonoEmpty() {
// sends "full" request
testRequestBody(sender -> sender.send((req, out) -> Mono.empty()), 0);
}

@Test
void testIssue3524NoBody() {
// sends "full" request
testRequestBody(sender -> sender.send((req, out) -> out), 0);
}

@Test
void testIssue3524Object() {
// sends "full" request
testRequestBody(sender -> sender.send((req, out) -> out.sendObject(Unpooled.wrappedBuffer("test".getBytes(Charset.defaultCharset())))), 1);
}

@Test
void testMaxActiveStreamsCustomPool() throws Exception {
ConnectionProvider provider = ConnectionProvider.create("testMaxActiveStreamsCustomPool", 1);
Expand Down Expand Up @@ -604,6 +643,77 @@ void testMetrics() throws Exception {
}
}

@Test
void testMonoRequestBodySentAsFullRequest_Flux() {
// sends the message and then last http content
testRequestBody(sender -> sender.send(ByteBufFlux.fromString(Mono.just("test"))), 2);
}

@Test
void testMonoRequestBodySentAsFullRequest_Mono() {
// sends "full" request
testRequestBody(sender -> sender.send(ByteBufMono.fromString(Mono.just("test"))), 1);
}

@Test
void testMonoRequestBodySentAsFullRequest_MonoEmpty() {
// sends "full" request
testRequestBody(sender -> sender.send(Mono.empty()), 0);
}

@SuppressWarnings("FutureReturnValueIgnored")
private void testRequestBody(Function<HttpClient.RequestSender, HttpClient.ResponseReceiver<?>> sendFunction, int expectedMsg) {
disposableServer =
createServer().handle((req, res) -> req.receive()
.then(res.send()))
.bindNow(Duration.ofSeconds(30));

AtomicInteger counterHeaders = new AtomicInteger();
AtomicInteger counterData = new AtomicInteger();
sendFunction.apply(
createClient(disposableServer.port())
.port(disposableServer.port())
.doOnRequest((req, conn) -> {
ChannelPipeline pipeline = conn.channel().pipeline();
ChannelHandlerContext ctx = pipeline.context(NettyPipeline.LoggingHandler);
if (ctx != null) {
pipeline.addAfter(ctx.name(), "testRequestBody",
new ChannelOutboundHandlerAdapter() {
boolean done;

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.channel().closeFuture().addListener(f -> done = true);
super.handlerAdded(ctx);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (!done) {
if (msg instanceof Http3HeadersFrame) {
counterHeaders.getAndIncrement();
}
else if (msg instanceof Http3DataFrame) {
counterData.getAndIncrement();
}
}
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}
});
}
})
.post()
.uri("/"))
.responseContent()
.aggregate()
.asString()
.block(Duration.ofSeconds(30));

assertThat(counterHeaders.get()).isEqualTo(1);
assertThat(counterData.get()).isEqualTo(expectedMsg);
}

@Test
void testPostRequest() {
doTestPostRequest(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import static reactor.netty.ReactorNetty.format;
import static reactor.netty.ReactorNetty.getChannelContext;
import static reactor.netty.ReactorNetty.setChannelContext;
import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED;

/**
* An HTTP/3 implementation for pooled {@link ConnectionProvider}.
Expand Down Expand Up @@ -404,7 +403,6 @@ public void operationComplete(Future<QuicStreamChannel> future) {

ChannelOperations<?, ?> ops = ChannelOperations.get(ch);
if (ops != null) {
obs.onStateChange(ops, STREAM_CONFIGURED);
sink.success(ops);
}
}
Expand Down
Loading