Skip to content

Commit

Permalink
Merge #3663 into 1.3.0-M1
Browse files Browse the repository at this point in the history
Signed-off-by: Violeta Georgieva <[email protected]>
  • Loading branch information
violetagg committed Mar 4, 2025
2 parents 38643f7 + b5c5e14 commit af5c2f4
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,11 @@ protected void onOutboundComplete() {
"zero-length header"));
}
//"FutureReturnValueIgnored" this is deliberate
channel().writeAndFlush(newFullBodyMessage());
HttpMessage msg = Objects.equals(method(), HttpMethod.GET) ||
Objects.equals(method(), HttpMethod.HEAD) ||
Objects.equals(method(), HttpMethod.DELETE) ?
newFullBodyMessage() : newFullBodyMessage(Unpooled.EMPTY_BUFFER);
channel().writeAndFlush(msg);
}
else if (markSentBody()) {
//"FutureReturnValueIgnored" this is deliberate
Expand Down Expand Up @@ -906,9 +910,18 @@ final Mono<Void> send() {
if (!channel().isActive()) {
return Mono.error(AbortedException.beforeSend());
}
return FutureMono.deferFuture(() -> markSentHeaderAndBody() ?
channel().writeAndFlush(newFullBodyMessage()) :
channel().newSucceededFuture());
return FutureMono.deferFuture(() -> {
if (markSentHeaderAndBody()) {
HttpMessage msg = Objects.equals(method(), HttpMethod.GET) ||
Objects.equals(method(), HttpMethod.HEAD) ||
Objects.equals(method(), HttpMethod.DELETE) ?
newFullBodyMessage() : newFullBodyMessage(Unpooled.EMPTY_BUFFER);
return channel().writeAndFlush(msg);
}
else {
return channel().newSucceededFuture();
}
});
}

final void setNettyResponse(HttpResponse nettyResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.LastHttpContent;
Expand Down Expand Up @@ -100,6 +101,8 @@
import java.util.function.Function;
import java.util.function.Predicate;

import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpMethod.POST;
import static org.assertj.core.api.Assertions.assertThat;
import static reactor.netty.ConnectionObserver.State.CONNECTED;

Expand Down Expand Up @@ -1027,7 +1030,7 @@ void testProtocolVersion(HttpServer server, HttpClient client) {
@ParameterizedCompatibleCombinationsTest
void testMonoRequestBodySentAsFullRequest_Flux(HttpServer server, HttpClient client) {
// sends the message and then last http content
testRequestBody(server, client, sender -> sender.send(ByteBufFlux.fromString(Mono.just("test"))), 2);
testRequestBody(server, client, POST, sender -> sender.send(ByteBufFlux.fromString(Mono.just("test"))), 2, null, false);
}

@ParameterizedCompatibleCombinationsTest
Expand All @@ -1039,13 +1042,13 @@ void testMonoRequestBodySentAsFullRequest_Mono(HttpServer server, HttpClient cli
@ParameterizedCompatibleCombinationsTest
void testMonoRequestBodySentAsFullRequest_MonoEmpty(HttpServer server, HttpClient client) {
// sends "full" request
testRequestBody(server, client, sender -> sender.send(Mono.empty()), 1);
testRequestBody(server, client, POST, sender -> sender.send(Mono.empty()), 1, "0", false);
}

@ParameterizedCompatibleCombinationsTest
void testIssue3524Flux(HttpServer server, HttpClient client) {
// sends the message and then last http content
testRequestBody(server, client, sender -> sender.send((req, out) -> out.sendString(Flux.just("te", "st"))), 3);
testRequestBody(server, client, POST, sender -> sender.send((req, out) -> out.sendString(Flux.just("te", "st"))), 3, null, false);
}

@ParameterizedCompatibleCombinationsTest
Expand All @@ -1055,15 +1058,27 @@ void testIssue3524Mono(HttpServer server, HttpClient client) {
}

@ParameterizedCompatibleCombinationsTest
void testIssue3524MonoEmpty(HttpServer server, HttpClient client) {
void testIssue3524MonoEmptyGet(HttpServer server, HttpClient client) {
// sends "full" request
testRequestBody(server, client, sender -> sender.send((req, out) -> Mono.empty()), 1, true);
testRequestBody(server, client, GET, sender -> sender.send((req, out) -> Mono.empty()), 1, null, true);
}

@ParameterizedCompatibleCombinationsTest
void testIssue3524NoBody(HttpServer server, HttpClient client) {
void testIssue3524MonoEmptyPost(HttpServer server, HttpClient client) {
// sends "full" request
testRequestBody(server, client, sender -> sender.send((req, out) -> out), 1, true);
testRequestBody(server, client, POST, sender -> sender.send((req, out) -> Mono.empty()), 1, "0", false);
}

@ParameterizedCompatibleCombinationsTest
void testIssue3524NoBodyGet(HttpServer server, HttpClient client) {
// sends "full" request
testRequestBody(server, client, GET, sender -> sender.send((req, out) -> out), 1, null, true);
}

@ParameterizedCompatibleCombinationsTest
void testIssue3524NoBodyPost(HttpServer server, HttpClient client) {
// sends "full" request
testRequestBody(server, client, POST, sender -> sender.send((req, out) -> out), 1, "0", false);
}

@ParameterizedCompatibleCombinationsTest
Expand All @@ -1075,12 +1090,13 @@ void testIssue3524Object(HttpServer server, HttpClient client) {

private void testRequestBody(HttpServer server, HttpClient client,
Function<HttpClient.RequestSender, HttpClient.ResponseReceiver<?>> sendFunction, int expectedMsg) {
testRequestBody(server, client, sendFunction, expectedMsg, false);
testRequestBody(server, client, POST, sendFunction, expectedMsg, "4", false);
}

@SuppressWarnings("FutureReturnValueIgnored")
private void testRequestBody(HttpServer server, HttpClient client,
Function<HttpClient.RequestSender, HttpClient.ResponseReceiver<?>> sendFunction, int expectedMsg, boolean contentHeadersDoNotExist) {
private void testRequestBody(HttpServer server, HttpClient client, HttpMethod method,
Function<HttpClient.RequestSender, HttpClient.ResponseReceiver<?>> sendFunction, int expectedMsg,
@Nullable String contentLength, boolean contentHeadersDoNotExist) {
disposableServer =
server.handle((req, res) -> req.receive()
.then(res.send()))
Expand Down Expand Up @@ -1130,7 +1146,7 @@ else if (msg instanceof ByteBuf) {
});
}
})
.post()
.request(method)
.uri("/"))
.responseContent()
.aggregate()
Expand All @@ -1142,6 +1158,14 @@ else if (msg instanceof ByteBuf) {
assertThat(requestHeaders.get().get(HttpHeaderNames.CONTENT_LENGTH)).isNull();
assertThat(requestHeaders.get().get(HttpHeaderNames.TRANSFER_ENCODING)).isNull();
}
else if (contentLength != null) {
assertThat(requestHeaders.get().get(HttpHeaderNames.CONTENT_LENGTH)).isNotNull().isEqualTo(contentLength);
assertThat(requestHeaders.get().get(HttpHeaderNames.TRANSFER_ENCODING)).isNull();
}
else {
assertThat(requestHeaders.get().get(HttpHeaderNames.CONTENT_LENGTH)).isNull();
assertThat(requestHeaders.get().get(HttpHeaderNames.TRANSFER_ENCODING)).isNotNull();
}
}

static final class IdleTimeoutTestChannelInboundHandler extends ChannelInboundHandlerAdapter {
Expand Down

0 comments on commit af5c2f4

Please sign in to comment.