From c420bd9e50dfa64020385659005215f0a6b33771 Mon Sep 17 00:00:00 2001 From: Piotr Czarnas Date: Sat, 27 Jul 2024 16:25:02 +0200 Subject: [PATCH] Different way of emitting events to the sink - retry handler created for each call. --- .../statuscache/TableStatusCacheImpl.java | 19 +++++++++++++------ .../labels/labelloader/LabelsIndexerImpl.java | 19 +++++++++++++------ 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/dqops/src/main/java/com/dqops/data/checkresults/statuscache/TableStatusCacheImpl.java b/dqops/src/main/java/com/dqops/data/checkresults/statuscache/TableStatusCacheImpl.java index 6af9c90734..da6ba293a6 100644 --- a/dqops/src/main/java/com/dqops/data/checkresults/statuscache/TableStatusCacheImpl.java +++ b/dqops/src/main/java/com/dqops/data/checkresults/statuscache/TableStatusCacheImpl.java @@ -61,7 +61,6 @@ public class TableStatusCacheImpl implements TableStatusCache { private boolean started; private Sinks.Many loadTableStatusRequestSink; private Disposable subscription; - private Sinks.EmitFailureHandler emitFailureHandlerPublisher; private int queuedOperationsCount; private final Object lock = new Object(); private CompletableFuture queueEmptyFuture; @@ -86,12 +85,19 @@ public TableStatusCacheImpl(DqoCacheConfigurationProperties dqoCacheConfiguratio this.dqoQueueConfigurationProperties = dqoQueueConfigurationProperties; this.checkResultsDataService = checkResultsDataService; this.userDomainIdentityFactory = userDomainIdentityFactory; - this.emitFailureHandlerPublisher = Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds( - this.dqoQueueConfigurationProperties.getPublishBusyLoopingDurationSeconds())); this.queueEmptyFuture = new CompletableFuture<>(); this.queueEmptyFuture.complete(0); } + /** + * Creates a failure handler with a new duration. + * @return Failure handler. + */ + protected Sinks.EmitFailureHandler createFailureHandler() { + return Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds( + this.dqoQueueConfigurationProperties.getPublishBusyLoopingDurationSeconds())); + } + /** * The operation that is called to create a new table entry for the cache and queue a table load operation. * @param tableStatusKey Table status key. @@ -100,7 +106,7 @@ public TableStatusCacheImpl(DqoCacheConfigurationProperties dqoCacheConfiguratio protected CurrentTableStatusCacheEntry loadEntryCore(CurrentTableStatusKey tableStatusKey) { CurrentTableStatusCacheEntry currentTableStatusCacheEntry = new CurrentTableStatusCacheEntry(tableStatusKey, CurrentTableStatusEntryStatus.LOADING_QUEUED); if (this.loadTableStatusRequestSink != null) { - this.loadTableStatusRequestSink.emitNext(tableStatusKey, this.emitFailureHandlerPublisher); + this.loadTableStatusRequestSink.emitNext(tableStatusKey, createFailureHandler()); incrementAwaitingOperationsCount(); } return currentTableStatusCacheEntry; @@ -165,7 +171,7 @@ public void invalidateTableStatus(CurrentTableStatusKey tableStatusKey, boolean currentTableStatusCacheEntry.setStatus(CurrentTableStatusEntryStatus.REFRESH_QUEUED); if (this.loadTableStatusRequestSink != null) { - this.loadTableStatusRequestSink.emitNext(tableStatusKey, this.emitFailureHandlerPublisher); + this.loadTableStatusRequestSink.emitNext(tableStatusKey, createFailureHandler()); incrementAwaitingOperationsCount(); } } @@ -281,10 +287,11 @@ public void start() { int concurrency = Runtime.getRuntime().availableProcessors(); this.subscription = requestLoadFlux.subscribeOn(Schedulers.boundedElastic()) .flatMap(list -> Flux.fromIterable(list)) // single thread forwarder + .parallel(concurrency) .flatMap(tableKey -> { onRequestLoadTableStatus(tableKey); return Mono.empty(); - }, concurrency, concurrency * 2) + }) .subscribe(); } diff --git a/dqops/src/main/java/com/dqops/metadata/labels/labelloader/LabelsIndexerImpl.java b/dqops/src/main/java/com/dqops/metadata/labels/labelloader/LabelsIndexerImpl.java index 0b43ab8898..b41a05a6d4 100644 --- a/dqops/src/main/java/com/dqops/metadata/labels/labelloader/LabelsIndexerImpl.java +++ b/dqops/src/main/java/com/dqops/metadata/labels/labelloader/LabelsIndexerImpl.java @@ -69,7 +69,6 @@ public class LabelsIndexerImpl implements LabelsIndexer { private boolean started; private Sinks.Many loadObjectRequestSink; private Disposable subscription; - private Sinks.EmitFailureHandler emitFailureHandlerPublisher; private int queuedOperationsCount; private final Object lock = new Object(); private CompletableFuture queueEmptyFuture; @@ -97,12 +96,19 @@ public LabelsIndexerImpl(DqoCacheConfigurationProperties dqoCacheConfigurationPr this.userHomeContextFactory = userHomeContextFactory; this.userDomainIdentityFactory = userDomainIdentityFactory; this.globalLabelsContainer = globalLabelsContainer; - this.emitFailureHandlerPublisher = Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds( - this.dqoQueueConfigurationProperties.getPublishBusyLoopingDurationSeconds())); this.queueEmptyFuture = new CompletableFuture<>(); this.queueEmptyFuture.complete(0); } + /** + * Creates a failure handler with a new duration. + * @return Failure handler. + */ + protected Sinks.EmitFailureHandler createFailureHandler() { + return Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds( + this.dqoQueueConfigurationProperties.getPublishBusyLoopingDurationSeconds())); + } + /** * The operation that is called to request loading labels for a new entry (connection or table). * @param targetKey Entry key. @@ -111,7 +117,7 @@ public LabelsIndexerImpl(DqoCacheConfigurationProperties dqoCacheConfigurationPr protected LabelsLoadEntry loadEntryCore(LabelRefreshKey targetKey) { LabelsLoadEntry labelsLoadEntry = new LabelsLoadEntry(targetKey, LabelRefreshStatus.LOADING_QUEUED); if (this.loadObjectRequestSink != null) { - this.loadObjectRequestSink.emitNext(targetKey, this.emitFailureHandlerPublisher); + this.loadObjectRequestSink.emitNext(targetKey, createFailureHandler()); incrementAwaitingOperationsCount(); } return labelsLoadEntry; @@ -137,7 +143,7 @@ public void invalidateObject(LabelRefreshKey targetLabelsKey, boolean replacingC } currentTableStatusCacheEntry.setStatus(LabelRefreshStatus.REFRESH_QUEUED); - this.loadObjectRequestSink.emitNext(targetLabelsKey, this.emitFailureHandlerPublisher); + this.loadObjectRequestSink.emitNext(targetLabelsKey, createFailureHandler()); incrementAwaitingOperationsCount(); } @@ -342,10 +348,11 @@ public void start() { int concurrency = Runtime.getRuntime().availableProcessors(); this.subscription = requestLoadFlux .flatMap((List targetKeys) -> Mono.just(targetKeys)) // single thread forwarder + .parallel(concurrency) .flatMap((List targetKeys) -> { onRequestLoadLabelsForObjects(targetKeys); return Mono.empty(); - }, concurrency, concurrency * 2) + }) .subscribe(); }