Skip to content

Commit

Permalink
HTTP/2 client queued writes should fail when stream creation fails.
Browse files Browse the repository at this point in the history
Motivation:

HTTP/2 client queues writes when the stream has an asynchronous boundary. When the stream creation fails, the failure should be recorded so that queued writes can be guarded against the failure.

Changes:

Record stream creation failure, when a delayed writes observes the failure, the write operation should be failed and not proceed further.
  • Loading branch information
vietj committed Mar 4, 2025
1 parent 879aee4 commit e612c87
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 14 deletions.
31 changes: 21 additions & 10 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public synchronized void onPushPromiseRead(ChannelHandlerContext ctx, int stream
static abstract class Stream extends VertxHttp2Stream<Http2ClientConnection> {

private final boolean push;
protected Http2Exception createFailure;
private HttpResponseHead response;
protected Object metric;
protected Object trace;
Expand Down Expand Up @@ -278,6 +279,10 @@ public Object trace() {

@Override
void doWriteData(ByteBuf chunk, boolean end, Handler<AsyncResult<Void>> handler) {
if (createFailure != null) {
handler.handle(context.failedFuture(createFailure));
return;
}
super.doWriteData(chunk, end, handler);
}

Expand Down Expand Up @@ -559,14 +564,14 @@ public void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boo
EventLoop eventLoop = ctx.nettyEventLoop();
synchronized (this) {
if (shouldQueue(eventLoop)) {
queueForWrite(eventLoop, () -> writeHeaders(request, buf, end, priority, connect, handler));
queueForWrite(eventLoop, () -> writeHeaders(request, buf, end, handler));
return;
}
}
writeHeaders(request, buf, end, priority, connect, handler);
writeHeaders(request, buf, end, handler);
}

private void writeHeaders(HttpRequestHead request, ByteBuf buf, boolean end, StreamPriority priority, boolean connect, Handler<AsyncResult<Void>> handler) {
private void writeHeaders(HttpRequestHead request, ByteBuf buf, boolean end, Handler<AsyncResult<Void>> handler) {
Http2Headers headers = new DefaultHttp2Headers();
headers.method(request.method.name());
boolean e;
Expand All @@ -593,13 +598,12 @@ private void writeHeaders(HttpRequestHead request, ByteBuf buf, boolean end, Str
if (conn.client.options().isDecompressionSupported() && headers.get(HttpHeaderNames.ACCEPT_ENCODING) == null) {
headers.set(HttpHeaderNames.ACCEPT_ENCODING, Http1xClientConnection.determineCompressionAcceptEncoding());
}
try {
createStream(request, headers);
} catch (Http2Exception ex) {
Http2Exception failure = createStream(request, headers);
if (failure != null) {
if (handler != null) {
handler.handle(context.failedFuture(ex));
handler.handle(context.failedFuture(failure));
}
handleException(ex);
handleException(failure);
return;
}
if (buf != null) {
Expand All @@ -610,7 +614,7 @@ private void writeHeaders(HttpRequestHead request, ByteBuf buf, boolean end, Str
}
}

private void createStream(HttpRequestHead head, Http2Headers headers) throws Http2Exception {
private Http2Exception createStream(HttpRequestHead head, Http2Headers headers) {
int id = this.conn.handler.encoder().connection().local().lastStreamCreated();
if (id == 0) {
id = 1;
Expand All @@ -619,7 +623,13 @@ private void createStream(HttpRequestHead head, Http2Headers headers) throws Htt
}
head.id = id;
head.remoteAddress = conn.remoteAddress();
Http2Stream stream = this.conn.handler.encoder().connection().local().createStream(id, false);
Http2Stream stream;
try {
stream = this.conn.handler.encoder().connection().local().createStream(id, false);
} catch (Http2Exception e) {
createFailure = e;
return e;
}
init(stream);
if (conn.metrics != null) {
metric = conn.metrics.requestBegin(headers.path().toString(), head);
Expand All @@ -633,6 +643,7 @@ private void createStream(HttpRequestHead head, Http2Headers headers) throws Htt
}
trace = tracer.sendRequest(context, SpanKind.RPC, conn.client.options().getTracingPolicy(), head, operation, headers_, HttpUtils.CLIENT_HTTP_REQUEST_TAG_EXTRACTOR);
}
return null;
}

@Override
Expand Down
63 changes: 59 additions & 4 deletions src/test/java/io/vertx/core/http/Http2Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@
import io.netty.channel.socket.DuplexChannel;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.JdkSSLEngineOptions;
import io.vertx.core.http.impl.Http2ServerConnection;
import io.vertx.core.net.OpenSSLEngineOptions;
import io.vertx.core.net.SSLEngineOptions;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.test.core.AsyncTestBase;
import io.vertx.test.core.Repeat;
import io.vertx.test.core.TestUtils;
import io.vertx.test.tls.Cert;
import org.junit.Ignore;
Expand All @@ -42,7 +41,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -1170,4 +1168,61 @@ public void testSendFileCancellation() throws Exception {

await();
}

@Repeat(times = 10)
@Test
public void testHttpClientDelayedWriteUponConnectionClose() throws Exception {

int numVerticles = 5;
int numWrites = 100;
int delayCloseMS = 50;

server.connectionHandler(conn -> {
vertx.setTimer(delayCloseMS, id -> {
conn.close();
});
});
server.requestHandler(req -> {
req.endHandler(v -> {
req.response().end();
});
});

startServer(testAddress);
waitFor(numVerticles);
vertx.deployVerticle(() -> new AbstractVerticle() {
int requestCount;
int ackCount;
@Override
public void start() {
request();
}
private void request() {
requestCount++;
client.request(requestOptions)
.compose(req -> {
req.setChunked(true);
for (int i = 0;i < numWrites;i++) {
req.write("Hello").onComplete(ar -> {
ackCount++;
});
}
req.end();
return req.response().compose(HttpClientResponse::body);
})
.onComplete(ar -> {
if (ar.succeeded()) {
request();
} else {
vertx.setTimer(100, id -> {
assertEquals(requestCount * numWrites, ackCount);
complete();
});
}
});
}
}, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER).setInstances(numVerticles));

await();
}
}

0 comments on commit e612c87

Please sign in to comment.