Skip to content

Commit

Permalink
Different way of emitting events to the sink - retry handler created …
Browse files Browse the repository at this point in the history
…for each call.
  • Loading branch information
piotrczarnas committed Jul 27, 2024
1 parent 5e4cf40 commit c420bd9
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public class TableStatusCacheImpl implements TableStatusCache {
private boolean started;
private Sinks.Many<CurrentTableStatusKey> loadTableStatusRequestSink;
private Disposable subscription;
private Sinks.EmitFailureHandler emitFailureHandlerPublisher;
private int queuedOperationsCount;
private final Object lock = new Object();
private CompletableFuture<Integer> queueEmptyFuture;
Expand All @@ -86,12 +85,19 @@ public TableStatusCacheImpl(DqoCacheConfigurationProperties dqoCacheConfiguratio
this.dqoQueueConfigurationProperties = dqoQueueConfigurationProperties;
this.checkResultsDataService = checkResultsDataService;
this.userDomainIdentityFactory = userDomainIdentityFactory;
this.emitFailureHandlerPublisher = Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(
this.dqoQueueConfigurationProperties.getPublishBusyLoopingDurationSeconds()));
this.queueEmptyFuture = new CompletableFuture<>();
this.queueEmptyFuture.complete(0);
}

/**
* Creates a failure handler with a new duration.
* @return Failure handler.
*/
protected Sinks.EmitFailureHandler createFailureHandler() {
return Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(
this.dqoQueueConfigurationProperties.getPublishBusyLoopingDurationSeconds()));
}

/**
* The operation that is called to create a new table entry for the cache and queue a table load operation.
* @param tableStatusKey Table status key.
Expand All @@ -100,7 +106,7 @@ public TableStatusCacheImpl(DqoCacheConfigurationProperties dqoCacheConfiguratio
protected CurrentTableStatusCacheEntry loadEntryCore(CurrentTableStatusKey tableStatusKey) {
CurrentTableStatusCacheEntry currentTableStatusCacheEntry = new CurrentTableStatusCacheEntry(tableStatusKey, CurrentTableStatusEntryStatus.LOADING_QUEUED);
if (this.loadTableStatusRequestSink != null) {
this.loadTableStatusRequestSink.emitNext(tableStatusKey, this.emitFailureHandlerPublisher);
this.loadTableStatusRequestSink.emitNext(tableStatusKey, createFailureHandler());
incrementAwaitingOperationsCount();
}
return currentTableStatusCacheEntry;
Expand Down Expand Up @@ -165,7 +171,7 @@ public void invalidateTableStatus(CurrentTableStatusKey tableStatusKey, boolean

currentTableStatusCacheEntry.setStatus(CurrentTableStatusEntryStatus.REFRESH_QUEUED);
if (this.loadTableStatusRequestSink != null) {
this.loadTableStatusRequestSink.emitNext(tableStatusKey, this.emitFailureHandlerPublisher);
this.loadTableStatusRequestSink.emitNext(tableStatusKey, createFailureHandler());
incrementAwaitingOperationsCount();
}
}
Expand Down Expand Up @@ -281,10 +287,11 @@ public void start() {
int concurrency = Runtime.getRuntime().availableProcessors();
this.subscription = requestLoadFlux.subscribeOn(Schedulers.boundedElastic())
.flatMap(list -> Flux.fromIterable(list)) // single thread forwarder
.parallel(concurrency)
.flatMap(tableKey -> {
onRequestLoadTableStatus(tableKey);
return Mono.empty();
}, concurrency, concurrency * 2)
})
.subscribe();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public class LabelsIndexerImpl implements LabelsIndexer {
private boolean started;
private Sinks.Many<LabelRefreshKey> loadObjectRequestSink;
private Disposable subscription;
private Sinks.EmitFailureHandler emitFailureHandlerPublisher;
private int queuedOperationsCount;
private final Object lock = new Object();
private CompletableFuture<Integer> queueEmptyFuture;
Expand Down Expand Up @@ -97,12 +96,19 @@ public LabelsIndexerImpl(DqoCacheConfigurationProperties dqoCacheConfigurationPr
this.userHomeContextFactory = userHomeContextFactory;
this.userDomainIdentityFactory = userDomainIdentityFactory;
this.globalLabelsContainer = globalLabelsContainer;
this.emitFailureHandlerPublisher = Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(
this.dqoQueueConfigurationProperties.getPublishBusyLoopingDurationSeconds()));
this.queueEmptyFuture = new CompletableFuture<>();
this.queueEmptyFuture.complete(0);
}

/**
* Creates a failure handler with a new duration.
* @return Failure handler.
*/
protected Sinks.EmitFailureHandler createFailureHandler() {
return Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(
this.dqoQueueConfigurationProperties.getPublishBusyLoopingDurationSeconds()));
}

/**
* The operation that is called to request loading labels for a new entry (connection or table).
* @param targetKey Entry key.
Expand All @@ -111,7 +117,7 @@ public LabelsIndexerImpl(DqoCacheConfigurationProperties dqoCacheConfigurationPr
protected LabelsLoadEntry loadEntryCore(LabelRefreshKey targetKey) {
LabelsLoadEntry labelsLoadEntry = new LabelsLoadEntry(targetKey, LabelRefreshStatus.LOADING_QUEUED);
if (this.loadObjectRequestSink != null) {
this.loadObjectRequestSink.emitNext(targetKey, this.emitFailureHandlerPublisher);
this.loadObjectRequestSink.emitNext(targetKey, createFailureHandler());
incrementAwaitingOperationsCount();
}
return labelsLoadEntry;
Expand All @@ -137,7 +143,7 @@ public void invalidateObject(LabelRefreshKey targetLabelsKey, boolean replacingC
}

currentTableStatusCacheEntry.setStatus(LabelRefreshStatus.REFRESH_QUEUED);
this.loadObjectRequestSink.emitNext(targetLabelsKey, this.emitFailureHandlerPublisher);
this.loadObjectRequestSink.emitNext(targetLabelsKey, createFailureHandler());
incrementAwaitingOperationsCount();
}

Expand Down Expand Up @@ -342,10 +348,11 @@ public void start() {
int concurrency = Runtime.getRuntime().availableProcessors();
this.subscription = requestLoadFlux
.flatMap((List<LabelRefreshKey> targetKeys) -> Mono.just(targetKeys)) // single thread forwarder
.parallel(concurrency)
.flatMap((List<LabelRefreshKey> targetKeys) -> {
onRequestLoadLabelsForObjects(targetKeys);
return Mono.empty();
}, concurrency, concurrency * 2)
})
.subscribe();
}

Expand Down

0 comments on commit c420bd9

Please sign in to comment.