Skip to content

Commit

Permalink
IGNITE-23569 Fix TcpClientChannel.close reliability (#4737)
Browse files Browse the repository at this point in the history
* Make sure all cleanup steps are performed
* Handle exception in `sock.close()` - we don't care about network issues when closing the channel
  • Loading branch information
ptupitsyn authored Nov 18, 2024
1 parent b60a4e7 commit 6884b67
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,36 +222,42 @@ public void close() {
* Close the channel with cause.
*/
private void close(@Nullable Throwable cause, boolean graceful) {
if (closed.compareAndSet(false, true)) {
if (cause != null && (cause instanceof TimeoutException || cause.getCause() instanceof TimeoutException)) {
metrics.connectionsLostTimeoutIncrement();
} else if (!graceful) {
metrics.connectionsLostIncrement();
}
if (!closed.compareAndSet(false, true)) {
return;
}

// Disconnect can happen before we initialize the timer.
var timer = heartbeatTimer;
if (cause != null && (cause instanceof TimeoutException || cause.getCause() instanceof TimeoutException)) {
metrics.connectionsLostTimeoutIncrement();
} else if (!graceful) {
metrics.connectionsLostIncrement();
}

if (timer != null) {
timer.cancel();
}
// Disconnect can happen before we initialize the timer.
var timer = heartbeatTimer;

if (sock != null) {
sock.close();
}
if (timer != null) {
timer.cancel();
}

for (TimeoutObjectImpl pendingReq : pendingReqs.values()) {
pendingReq.future().completeExceptionally(
for (TimeoutObjectImpl pendingReq : pendingReqs.values()) {
pendingReq.future().completeExceptionally(
new IgniteClientConnectionException(CONNECTION_ERR, "Channel is closed", endpoint(), cause));
}

for (CompletableFuture<PayloadInputChannel> handler : notificationHandlers.values()) {
try {
handler.completeExceptionally(
new IgniteClientConnectionException(CONNECTION_ERR, "Channel is closed", endpoint(), cause));
} catch (Throwable ignored) {
// Ignore.
}
}

for (CompletableFuture<PayloadInputChannel> handler : notificationHandlers.values()) {
try {
handler.completeExceptionally(
new IgniteClientConnectionException(CONNECTION_ERR, "Channel is closed", endpoint(), cause));
} catch (Exception ignored) {
// Ignore.
}
if (sock != null) {
try {
sock.close();
} catch (Throwable t) {
log.warn("Failed to close the channel [remoteAddress=" + cfg.getAddress() + "]: " + t.getMessage(), t);
}
}
}
Expand Down Expand Up @@ -696,7 +702,7 @@ private ChannelFuture handshakeReqAsync(ProtocolVersion proposedVer) {
srvVer, ProtocolBitmaskFeature.allFeaturesAsEnumSet(), serverIdleTimeout, clusterNode, clusterIds, clusterName);

return null;
} catch (Exception e) {
} catch (Throwable e) {
log.warn("Failed to handle handshake response [remoteAddress=" + cfg.getAddress() + "]: " + e.getMessage(), e);

throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void testHandshakesFailed() {
public void testHandshakesFailedTimeout() throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
Function<Integer, Boolean> shouldDropConnection = requestIdx -> false;
Function<Integer, Integer> responseDelay = idx -> counter.incrementAndGet() == 1 ? 500 : 0;
Function<Integer, Integer> responseDelay = idx -> counter.incrementAndGet() == 1 ? 600 : 0;
server = new TestServer(
1000,
new FakeIgnite(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void testNoResponseFromServerWithinConnectTimeoutThrowsException() {
@SuppressWarnings("ThrowableNotThrown")
@Test
public void testNoResponseFromServerWithinOperationTimeoutThrowsException() {
Function<Integer, Integer> responseDelay = x -> x > 2 ? 100 : 0;
Function<Integer, Integer> responseDelay = x -> x > 2 ? 600 : 0;

try (var srv = new TestServer(300, new FakeIgnite(), x -> false, responseDelay, null, UUID.randomUUID(), null, null)) {
Builder builder = IgniteClient.builder()
Expand Down

0 comments on commit 6884b67

Please sign in to comment.