Skip to content

Commit 2f38c3c

Browse files
committed
Add the track of the completion of the topology refresh cancel
1 parent 83a470e commit 2f38c3c

File tree

2 files changed

+15
-29
lines changed

2 files changed

+15
-29
lines changed

src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java

+12-18
Original file line numberDiff line numberDiff line change
@@ -94,29 +94,33 @@ protected void activateTopologyRefreshIfNeeded() {
9494
/**
9595
* Suspend (cancel) periodic topology refresh.
9696
*/
97-
public void suspendTopologyRefresh() {
97+
public CompletableFuture<Void> suspendTopologyRefresh() {
98+
CompletableFuture<Void> completionFuture = new CompletableFuture<>();
9899

99100
if (clusterTopologyRefreshActivated.compareAndSet(true, false)) {
100-
101101
ScheduledFuture<?> scheduledFuture = clusterTopologyRefreshFuture.get();
102102

103103
try {
104-
scheduledFuture.cancel(false);
105-
clusterTopologyRefreshFuture.set(null);
104+
if (scheduledFuture != null) {
105+
scheduledFuture.cancel(false);
106+
clusterTopologyRefreshFuture.set(null);
107+
}
108+
completionFuture.complete(null);
106109
} catch (Exception e) {
107110
logger.debug("Could not cancel Cluster topology refresh", e);
111+
completionFuture.completeExceptionally(e);
108112
}
113+
} else {
114+
completionFuture.complete(null);
109115
}
116+
117+
return completionFuture;
110118
}
111119

112120
public boolean isTopologyRefreshInProgress() {
113121
return clusterTopologyRefreshTask.get();
114122
}
115123

116-
public CompletableFuture<Void> getTopologyRefreshCompletionFuture() {
117-
return clusterTopologyRefreshTask.getCompletionFuture();
118-
}
119-
120124
@Override
121125
public void run() {
122126

@@ -322,16 +326,10 @@ private static class ClusterTopologyRefreshTask extends AtomicBoolean implements
322326

323327
private final Supplier<CompletionStage<?>> reloadTopologyAsync;
324328

325-
private final CompletableFuture<Void> completionFuture = new CompletableFuture<>();
326-
327329
ClusterTopologyRefreshTask(Supplier<CompletionStage<?>> reloadTopologyAsync) {
328330
this.reloadTopologyAsync = reloadTopologyAsync;
329331
}
330332

331-
public CompletableFuture<Void> getCompletionFuture() {
332-
return completionFuture;
333-
}
334-
335333
public void run() {
336334

337335
if (compareAndSet(false, true)) {
@@ -354,16 +352,12 @@ void doRun() {
354352

355353
if (throwable != null) {
356354
logger.warn("Cannot refresh Redis Cluster topology", throwable);
357-
completionFuture.completeExceptionally(throwable);
358-
} else {
359-
completionFuture.complete(null);
360355
}
361356

362357
set(false);
363358
});
364359
} catch (Exception e) {
365360
logger.warn("Cannot refresh Redis Cluster topology", e);
366-
completionFuture.completeExceptionally(e);
367361
}
368362
}
369363

src/main/java/io/lettuce/core/cluster/RedisClusterClient.java

+3-11
Original file line numberDiff line numberDiff line change
@@ -968,8 +968,8 @@ public CompletionStage<Void> refreshPartitionsAsync() {
968968
*
969969
* @since 6.3
970970
*/
971-
public void suspendTopologyRefresh() {
972-
topologyRefreshScheduler.suspendTopologyRefresh();
971+
public CompletableFuture<Void> suspendTopologyRefresh() {
972+
return topologyRefreshScheduler.suspendTopologyRefresh();
973973
}
974974

975975
/**
@@ -1151,15 +1151,7 @@ public void setPartitions(Partitions partitions) {
11511151
@Override
11521152
public CompletableFuture<Void> shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) {
11531153

1154-
suspendTopologyRefresh();
1155-
1156-
CompletableFuture<Void> topologyRefreshFuture = topologyRefreshScheduler.getTopologyRefreshCompletionFuture();
1157-
1158-
return topologyRefreshFuture.thenCompose(ignore -> super.shutdownAsync(quietPeriod, timeout, timeUnit))
1159-
.exceptionally(ex -> {
1160-
System.err.println("Error during topology refresh or shutdown: " + ex.getMessage());
1161-
return null;
1162-
});
1154+
return suspendTopologyRefresh().thenCompose(voidResult -> super.shutdownAsync(quietPeriod, timeout, timeUnit));
11631155
}
11641156

11651157
// -------------------------------------------------------------------------

0 commit comments

Comments
 (0)