Skip to content

Commit

Permalink
use CompletableFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr committed Jul 18, 2024
1 parent 73b8992 commit ae84624
Showing 1 changed file with 13 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

@Slf4j
public class InfoCommandUtils {
Expand All @@ -21,104 +21,36 @@ public static String sendInfoCommand(IAerospikeClient client, Node node, String
public static String sendInfoCommand(IAerospikeClient client, InfoPolicy infoPolicy, Node node, String command) {
InfoListenerWithStringValue listener = new InfoListenerWithStringValue() {

volatile String stringValue = "";
volatile String infoCommand = "";
volatile boolean isComplete = false;
volatile AerospikeException exception;
private final CompletableFuture<String> stringValueFuture = new CompletableFuture<>();

@Override
public synchronized String getStringValue() {
return stringValue;
}

@Override
public synchronized boolean isComplete() {
return isComplete;
}

@Override
public synchronized AerospikeException getException() {
return exception;
}

@Override
public synchronized String getInfoCommand() {
return infoCommand;
public CompletableFuture<String> getValueFuture() {
return stringValueFuture;
}

@Override
public void onSuccess(Map<String, String> map) {
stringValue = map.get(command);
isComplete = true;
stringValueFuture.complete(map.get(command));
}

@Override
public void onFailure(AerospikeException ae) {
exception = ae;
infoCommand = command;
isComplete = true;
throw ae;
}
};

client.info(client.getCluster().eventLoops.next(), listener, infoPolicy, node, command);
waitForCompletionOrTimeout(listener);
failIfExceptionFound(listener);

return listener.getStringValue() == null ? "" : listener.getStringValue();
}

private static void failIfExceptionFound(InfoListenerWithStringValue listener) {
if (listener.getException() != null) {
throw new AerospikeException(String.format("Info command %s failed", listener.getInfoCommand()),
listener.getException());
}
}

private static void waitForCompletionOrTimeout(InfoListenerWithStringValue listener) {
// Create a CountDownLatch with initial count 1
CountDownLatch latch = new CountDownLatch(1);

// Start a separate thread to wait for isComplete()
Thread waitingThread = getWaitingThread(listener, latch);

String value;
try {
// Wait for completion or timeout
boolean timeoutOver = latch.await(1, TimeUnit.SECONDS); // timeout is 1 second
if (!timeoutOver) {
waitingThread.interrupt(); // Interrupt waiting thread if timeout occurs
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Interrupted
log.error("Interrupted while waiting for info command to complete");
value = listener.getValueFuture().join();
} catch (CompletionException ce) {
throw new AerospikeException(String.format("Info command %s failed", command), ce);
}
}

private static Thread getWaitingThread(InfoListenerWithStringValue listener, CountDownLatch latch) {
Thread waitingThread = new Thread(() -> {
while (!listener.isComplete()) {
try {
//noinspection ResultOfMethodCallIgnored
latch.await(1, TimeUnit.MILLISECONDS); // Wait briefly before re-checking
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return; // Interrupted, exit thread
}
}
latch.countDown(); // Release latch when isComplete() is true
});
waitingThread.start();
return waitingThread;
return value == null ? "" : value;
}

interface InfoListenerWithStringValue extends InfoListener {

String getStringValue();

@SuppressWarnings("BooleanMethodIsAlwaysInverted")
boolean isComplete();

AerospikeException getException();

String getInfoCommand();
CompletableFuture<String> getValueFuture();
}
}

0 comments on commit ae84624

Please sign in to comment.