Skip to content

Commit

Permalink
found the reason why and added test case
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed Aug 30, 2024
1 parent 2ae4fcc commit cae9d88
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ public CompletableFuture<Void> close() {
for (K k : connections.keySet()) {
Sync<K, T, F> remove = connections.remove(k);
if (remove != null) {
remove.doWithConnection(e -> futures.add(e.closeAsync()));
CompletionStage<Void> closeFuture = remove.future.thenAccept(AsyncCloseable::closeAsync);
// always synchronously add the future, made it immutably in Futures.allOf()
futures.add(closeFuture.toCompletableFuture());
}
}

Expand Down Expand Up @@ -218,7 +220,6 @@ static class Sync<K, T extends AsyncCloseable, F extends CompletionStage<T>> {

@SuppressWarnings("unchecked")
public Sync(K key, F future) {

this.key = key;
this.future = (F) future.whenComplete((connection, throwable) -> {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.lettuce.core.internal;

import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class AsyncConnectionProviderTest {

@Test
public void testFutureListLength() throws InterruptedException, ExecutionException, TimeoutException {

CountDownLatch slowCreate = new CountDownLatch(1);
CountDownLatch slowShutdown = new CountDownLatch(1);

// create a provider with a slow connection creation
AsyncConnectionProvider<String, AsyncCloseable, CompletableFuture<AsyncCloseable>> provider = new AsyncConnectionProvider<>(
key -> {
return countDownFuture(slowCreate, new io.lettuce.core.api.AsyncCloseable() {

@Override
public CompletableFuture<Void> closeAsync() {
return CompletableFuture.completedFuture(null);
}

});
});

// add slow shutdown connection first
SlowCloseFuture slowCloseFuture = new SlowCloseFuture(slowShutdown);
provider.register("slowShutdown", new io.lettuce.core.api.AsyncCloseable() {

@Override
public CompletableFuture<Void> closeAsync() {
return slowCloseFuture;
}

});

// add slow creation connection
CompletableFuture<AsyncCloseable> createFuture = provider.getConnection("slowCreate");

// close the connection.
CompletableFuture<Void> closeFuture = provider.close();

// the connection has not been created yet, so the close futures array always has 1 element
// we block the iterator on the slowCloseFuture
// then we count down the creation, the close future will be added to the list
slowCreate.countDown();

// the close future is added to the list, we unblock the iterator
slowShutdown.countDown();

// assert close future is completed, and no exceptions are thrown
closeFuture.get(10, TimeUnit.SECONDS);
Assert.assertTrue(createFuture.isDone());
}

private <T> CompletableFuture<T> countDownFuture(CountDownLatch countDownLatch, T value) {
return CompletableFuture.runAsync(() -> {
try {
countDownLatch.await(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).thenApply(v -> value);
}

static class SlowCloseFuture extends CompletableFuture<Void> {

private final CountDownLatch countDownLatch;

SlowCloseFuture(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

@Override
public CompletableFuture<Void> toCompletableFuture() {
// we block the iterator on here
try {
countDownLatch.await(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return super.toCompletableFuture();
}

@Override
public Void get() {
return null;
}

}

}

0 comments on commit cae9d88

Please sign in to comment.