Skip to content

Commit

Permalink
Merge #3514 into 1.2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Nov 19, 2024
2 parents f98c2a6 + 09f6e16 commit 3b74fcf
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -130,7 +130,8 @@ public Mono<Void> disposeLater(Duration quietPeriod, Duration timeout) {
}
}

return Mono.when(clMono, sslMono, slMono, cnclMono, cnslMono, cnsrvlMono);
return Mono.when(clMono, sslMono, slMono, cnclMono, cnslMono, cnsrvlMono)
.timeout(timeout, Mono.error(new IllegalStateException("LoopResources couldn't be disposed within " + timeout.toMillis() + "ms")));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1910,6 +1910,73 @@ public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg)
.isEqualTo("delay500delay1000");
}

@Test
public void testGracefulShutdownIssue3509() throws Exception {
CountDownLatch connectionOpenedLatch = new CountDownLatch(2);
CountDownLatch connectionClosedLatch = new CountDownLatch(1);
CountDownLatch responseReceivedLatch = new CountDownLatch(1);
LoopResources loop = LoopResources.create("testGracefulShutdownIssue3509");
AtomicBoolean stop = new AtomicBoolean(false);
Sinks.One<String> delay = Sinks.one();
this.disposableServer =
createServer().runOn(loop)
.doOnConnection(c -> {
c.onDispose().subscribe(null, null, connectionClosedLatch::countDown);
connectionOpenedLatch.countDown();
})
// Register a channel group, when invoking disposeNow()
// the implementation will wait for the active requests to finish
.channelGroup(new DefaultChannelGroup(new DefaultEventExecutor()))
.route(r ->
r.get("/delay", (req, res) -> res.sendString(delay.asMono()))
.get("/cpuIntensive", (req, res) -> {
// Simulate some long-running CPU-intensive work
while (!stop.get()) {
// this is deliberate
}
return res.sendString(Mono.just("cpuIntensive"));
}))
.bindNow(Duration.ofSeconds(30));

HttpClient client = createClient(this.disposableServer::address);

AtomicReference<String> result = new AtomicReference<>();
Flux.just("/delay", "/cpuIntensive")
.flatMap(s -> client.get().uri(s).responseContent().aggregate().asString())
.subscribe(s -> {
result.set(s);
responseReceivedLatch.countDown();
});

assertThat(connectionOpenedLatch.await(30, TimeUnit.SECONDS)).isTrue();

// Stop accepting incoming requests, wait at most 1s for the active requests to finish
try {
this.disposableServer.channel()
.closeFuture()
.addListener(f -> delay.tryEmitValue("delay"));
// Reduce the timeout for testing purposes, by default it is 3s
this.disposableServer.disposeNow(Duration.ofSeconds(1));
fail("Expectation is that the socket cannot be closed");
}
catch (IllegalStateException e) {
// The socket couldn't be stopped, continue with shutdown
this.disposableServer = null;
}

// Reduce the quiet period and the timeout for testing purposes
// by default the quiet period is 2s and the timeout is 15s
loop.disposeLater(Duration.ofMillis(100), Duration.ofSeconds(1))
.as(StepVerifier::create)
.expectError(IllegalStateException.class)
.verify(Duration.ofSeconds(5));

assertThat(connectionClosedLatch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(responseReceivedLatch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(result.get()).isNotNull().isEqualTo("delay");
stop.set(true);
}

@Test
void testHttpServerWithDomainSocketsNIOTransport() {
assertThatExceptionOfType(ChannelBindException.class)
Expand Down

0 comments on commit 3b74fcf

Please sign in to comment.