Skip to content

Commit

Permalink
[Bug #314] Change ThrottledFuture#then to accept the completed future
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaskabc committed Nov 20, 2024
1 parent 5f56201 commit 600bd2a
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import cz.cvut.kbss.termit.service.snapshot.SnapshotProvider;
import cz.cvut.kbss.termit.util.Configuration;
import cz.cvut.kbss.termit.util.Utils;
import cz.cvut.kbss.termit.util.throttle.CacheableFuture;
import cz.cvut.kbss.termit.util.throttle.ThrottledFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -366,7 +366,7 @@ public void refreshLastModified(RefreshLastModifiedEvent event) {
}

@Transactional
public CacheableFuture<Collection<ValidationResult>> validateContents(URI vocabulary) {
public ThrottledFuture<Collection<ValidationResult>> validateContents(URI vocabulary) {
final VocabularyContentValidator validator = context.getBean(VocabularyContentValidator.class);
final Collection<URI> importClosure = getTransitivelyImportedVocabularies(vocabulary);
importClosure.add(vocabulary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
import cz.cvut.kbss.termit.util.TypeAwareClasspathResource;
import cz.cvut.kbss.termit.util.TypeAwareFileSystemResource;
import cz.cvut.kbss.termit.util.TypeAwareResource;
import cz.cvut.kbss.termit.util.throttle.CacheableFuture;
import cz.cvut.kbss.termit.util.throttle.Throttle;
import cz.cvut.kbss.termit.util.throttle.ThrottledFuture;
import jakarta.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -384,7 +384,7 @@ public void remove(Vocabulary asset) {
*
* @param vocabulary Vocabulary to validate
*/
public CacheableFuture<Collection<ValidationResult>> validateContents(URI vocabulary) {
public ThrottledFuture<Collection<ValidationResult>> validateContents(URI vocabulary) {
return repositoryService.validateContents(vocabulary);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import cz.cvut.kbss.termit.util.Configuration;
import cz.cvut.kbss.termit.util.Constants;
import cz.cvut.kbss.termit.util.Utils;
import cz.cvut.kbss.termit.util.throttle.CacheableFuture;
import cz.cvut.kbss.termit.util.throttle.ThrottledFuture;
import cz.cvut.kbss.termit.workspace.EditableVocabularies;
import jakarta.annotation.Nonnull;
import jakarta.validation.Validator;
Expand Down Expand Up @@ -334,7 +334,7 @@ private void ensureNoTermRelationsExists(Vocabulary vocabulary) throws AssetRemo
}
}

public CacheableFuture<Collection<ValidationResult>> validateContents(URI vocabulary) {
public ThrottledFuture<Collection<ValidationResult>> validateContents(URI vocabulary) {
return vocabularyDao.validateContents(vocabulary);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* A future which can provide a cached result before its completion.
* @see Future
*/
public interface CacheableFuture<T> extends ChainableFuture<T> {
public interface CacheableFuture<T> extends Future<T> {

/**
* @return the cached result when available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import java.util.concurrent.Future;
import java.util.function.Consumer;

public interface ChainableFuture<T> extends Future<T> {
public interface ChainableFuture<T, F extends ChainableFuture<T, F>> extends Future<T> {

/**
* Executes this action once the future is completed normally.
* Action is not executed on exceptional completion.
* Executes this action once the future is completed.
* Action is executed no matter if the future is completed successfully, exceptionally or cancelled.
* <p>
* If the future is already completed, action is executed synchronously.
* @param action action to be executed
* If the future is already completed, it is executed synchronously.
* @param action action receiving this future after completion
* @return this future
*/
ChainableFuture then(Consumer<T> action);
ChainableFuture<T, F> then(Consumer<F> action);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package cz.cvut.kbss.termit.util.throttle;

import cz.cvut.kbss.termit.exception.TermItException;
import cz.cvut.kbss.termit.util.Utils;
import cz.cvut.kbss.termit.util.longrunning.LongRunningTask;
import jakarta.annotation.Nonnull;
Expand All @@ -20,7 +19,7 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

public class ThrottledFuture<T> implements CacheableFuture<T>, LongRunningTask {
public class ThrottledFuture<T> implements CacheableFuture<T>, ChainableFuture<T, ThrottledFuture<T>>, LongRunningTask {

private final ReentrantLock lock = new ReentrantLock();
private final ReentrantLock callbackLock = new ReentrantLock();
Expand All @@ -33,7 +32,7 @@ public class ThrottledFuture<T> implements CacheableFuture<T>, LongRunningTask {

private @Nullable Supplier<T> task;

private final List<Consumer<T>> onCompletion = new ArrayList<>();
private final List<Consumer<ThrottledFuture<T>>> onCompletion = new ArrayList<>();

private final AtomicReference<Instant> startedAt = new AtomicReference<>(null);

Expand Down Expand Up @@ -90,7 +89,16 @@ public ThrottledFuture<T> setCachedResult(@Nullable final T cachedResult) {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
if(!future.cancel(mayInterruptIfRunning)) {
return false;
}

if (task != null) {
callbackLock.lock();
onCompletion.forEach(c -> c.accept(this));
callbackLock.unlock();
}
return true;
}

@Override
Expand Down Expand Up @@ -124,7 +132,7 @@ public T get(long timeout, @Nonnull TimeUnit unit)
* @return If the current task is already running, was canceled or already completed, returns a new future for the given task.
* Otherwise, replaces the current task and returns self.
*/
protected ThrottledFuture<T> update(Supplier<T> task, @Nonnull List<Consumer<T>> onCompletion) {
protected ThrottledFuture<T> update(Supplier<T> task, @Nonnull List<Consumer<ThrottledFuture<T>>> onCompletion) {
boolean locked = false;
try {
locked = lock.tryLock();
Expand Down Expand Up @@ -201,14 +209,16 @@ protected void run(@Nullable Consumer<ThrottledFuture<T>> startedCallback) {
T result = null;
if (task != null) {
result = task.get();
final T finalResult = result;
callbackLock.lock();
onCompletion.forEach(c -> c.accept(finalResult));
callbackLock.unlock();
}
future.complete(result);
} catch (Exception e) {
future.completeExceptionally(e);
} finally {
if (task != null) {
callbackLock.lock();
onCompletion.forEach(c -> c.accept(this));
callbackLock.unlock();
}
}
} finally {
if (locked) {
Expand Down Expand Up @@ -242,18 +252,11 @@ public boolean isRunning() {
}

@Override
public ThrottledFuture<T> then(Consumer<T> action) {
public ThrottledFuture<T> then(Consumer<ThrottledFuture<T>> action) {
try {
callbackLock.lock();
if (future.isDone() && !future.isCancelled() && !future.isCompletedExceptionally()) {
try {
action.accept(future.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TermItException(e);
} catch (ExecutionException e) {
throw new TermItException(e);
}
if (future.isDone()) {
action.accept(this);
} else {
onCompletion.add(action);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import cz.cvut.kbss.termit.service.business.VocabularyService;
import cz.cvut.kbss.termit.util.Configuration;
import cz.cvut.kbss.termit.util.Constants;
import cz.cvut.kbss.termit.util.throttle.CacheableFuture;
import cz.cvut.kbss.termit.util.throttle.ThrottledFuture;
import jakarta.annotation.Nonnull;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.MessageHeaders;
Expand Down Expand Up @@ -53,7 +53,7 @@ public void validateVocabulary(@DestinationVariable String localName,
final URI identifier = resolveIdentifier(namespace.orElse(config.getNamespace().getVocabulary()), localName);
final Vocabulary vocabulary = vocabularyService.getReference(identifier);

final CacheableFuture<Collection<ValidationResult>> future = vocabularyService.validateContents(vocabulary.getUri());
final ThrottledFuture<Collection<ValidationResult>> future = vocabularyService.validateContents(vocabulary.getUri());

future.getNow().ifPresentOrElse(validationResults ->
// if there is a result present (returned from cache), send it
Expand All @@ -66,14 +66,15 @@ public void validateVocabulary(@DestinationVariable String localName,
messageHeaders
), () ->
// otherwise reply will be sent once the future is resolved
future.then(results ->
future.then(completedFuture ->
completedFuture.getNow().ifPresent(results ->
sendToSession(
WebSocketDestinations.VOCABULARIES_VALIDATION,
results,
getHeaders(identifier,
Map.of("cached", false)),
messageHeaders
))
)))
);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ void transferUpdatesSecondFutureWithTask() {

@Test
void transferUpdatesSecondFutureWithCallbacks() {
final Consumer<String> firstCallback = (result) -> {};
final Consumer<String> secondCallback = (result) -> {};
final Consumer<ThrottledFuture<String>> firstCallback = (result) -> {};
final Consumer<ThrottledFuture<String>> secondCallback = (result) -> {};
final ThrottledFuture<String> firstFuture = ThrottledFuture.of(()->"").then(firstCallback);
final ThrottledFuture<String> secondFuture = ThrottledFuture.of(()->"").then(secondCallback);
final ThrottledFuture<String> mocked = mock(ThrottledFuture.class);
Expand All @@ -323,14 +323,14 @@ void transferUpdatesSecondFutureWithCallbacks() {

@Test
void callbacksAreClearedAfterTransferring() {
final Consumer<String> firstCallback = (result) -> {};
final Consumer<String> secondCallback = (result) -> {};
final Consumer<ThrottledFuture<String>> firstCallback = (result) -> {};
final Consumer<ThrottledFuture<String>> secondCallback = (result) -> {};
final ThrottledFuture<String> future = ThrottledFuture.of(()->"").then(firstCallback).then(secondCallback);
final ThrottledFuture<String> mocked = mock(ThrottledFuture.class);

future.transfer(mocked);

final ArgumentCaptor<List<Consumer<String>>> captor = ArgumentCaptor.forClass(List.class);
final ArgumentCaptor<List<Consumer<ThrottledFuture<String>>>> captor = ArgumentCaptor.forClass(List.class);

verify(mocked).update(notNull(), captor.capture());
// captor takes the original list from the future
Expand Down Expand Up @@ -386,8 +386,8 @@ void updateSetsTask() {

@Test
void updateAddsCallbacksToTheCurrentOnes() {
final Consumer<String> callback = result -> {};
final Consumer<String> originalCallback = result -> {};
final Consumer<ThrottledFuture<String>> callback = result -> {};
final Consumer<ThrottledFuture<String>> originalCallback = result -> {};
final ThrottledFuture<String> future = ThrottledFuture.of(() -> "").then(originalCallback);

future.update(()->"", List.of(callback));
Expand Down

0 comments on commit 600bd2a

Please sign in to comment.