From 734ace50b0ace70703bb4362935fefb38c45e0ad Mon Sep 17 00:00:00 2001 From: Ilya Nepomnyashchiy Date: Thu, 3 Feb 2022 09:51:38 -0800 Subject: [PATCH] TargetedSweeper no longer reads runtime prefs on construction (#5888) --- .../atlasdb/sweep/queue/SweepDelay.java | 31 ++++++++++--------- .../atlasdb/sweep/queue/TargetedSweeper.java | 2 +- .../queue/ScalingSweepTaskSchedulerTest.java | 8 ++--- .../atlasdb/sweep/queue/SweepDelayTest.java | 6 ++-- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepDelay.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepDelay.java index edc7f88c037..5bdeb70a6d8 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepDelay.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepDelay.java @@ -16,12 +16,15 @@ package com.palantir.atlasdb.sweep.queue; +import com.google.common.base.Suppliers; import com.google.common.math.DoubleMath; import java.math.RoundingMode; import java.time.Duration; import java.util.concurrent.atomic.AtomicLong; import java.util.function.IntSupplier; import java.util.function.LongConsumer; +import java.util.function.LongSupplier; +import java.util.function.Supplier; /** * This class calculates the delay for the next iteration of targeted sweep from the current delay and the outcome @@ -48,42 +51,42 @@ class SweepDelay { static final double EXPONENTIAL_BACKOFF_MULTIPLIER = Math.exp(Math.log((double) MAX_BACKOFF / MIN_BACKOFF) / ITERATIONS_TO_REACH_MAX_BACKOFF); - private final long initialPause; - private final long maxPauseMillis; + private final LongSupplier initialPause; + private final LongSupplier maxPauseMillis; private final LongConsumer sweepDelayMetricsUpdater; - private final AtomicLong currentPause; + private final Supplier currentPause; private final AtomicLong insufficientConsistencyPause = new AtomicLong(MIN_BACKOFF); private final IntSupplier readBatchThreshold; - SweepDelay(long configPause, LongConsumer sweepDelayMetricsUpdater, IntSupplier readBatchThreshold) { - this.maxPauseMillis = Math.max(DEFAULT_MAX_PAUSE_MILLIS, configPause); - this.initialPause = Math.max(MIN_PAUSE_MILLIS, configPause); + SweepDelay(LongSupplier configPause, LongConsumer sweepDelayMetricsUpdater, IntSupplier readBatchThreshold) { + this.maxPauseMillis = () -> Math.max(DEFAULT_MAX_PAUSE_MILLIS, configPause.getAsLong()); + this.initialPause = () -> Math.max(MIN_PAUSE_MILLIS, configPause.getAsLong()); this.sweepDelayMetricsUpdater = sweepDelayMetricsUpdater; this.readBatchThreshold = readBatchThreshold; - this.currentPause = new AtomicLong(initialPause); + this.currentPause = Suppliers.memoize(() -> new AtomicLong(initialPause.getAsLong())); } long getInitialPause() { - return initialPause; + return initialPause.getAsLong(); } long getMaxPause() { - return maxPauseMillis; + return maxPauseMillis.getAsLong(); } long getNextPause(SweepIterationResult result) { return SweepIterationResults.caseOf(result) .success(this::updateCurrentPauseAndGet) - .unableToAcquireShard_(maxPauseMillis) + .unableToAcquireShard_(maxPauseMillis.getAsLong()) .insufficientConsistency_(getInsufficientConsistencyPauseAndCalculateNext()) - .otherError_(maxPauseMillis) + .otherError_(maxPauseMillis.getAsLong()) .disabled_(MIN_BACKOFF); } private long updateCurrentPauseAndGet(long numSwept) { resetInsufficientConsistencyBackoff(); long target = pauseTarget(numSwept); - long newPause = currentPause.updateAndGet(oldPause -> (4 * oldPause + target) / 5); + long newPause = currentPause.get().updateAndGet(oldPause -> (4 * oldPause + target) / 5); sweepDelayMetricsUpdater.accept(newPause); return newPause; } @@ -99,10 +102,10 @@ private long getInsufficientConsistencyPauseAndCalculateNext() { private long pauseTarget(long numSwept) { if (numSwept <= Math.min(BATCH_CELLS_LOW_THRESHOLD, readBatchThreshold.getAsInt() - 1)) { - return maxPauseMillis; + return maxPauseMillis.getAsLong(); } else if (numSwept >= readBatchThreshold.getAsInt()) { return MIN_PAUSE_MILLIS; } - return initialPause; + return initialPause.getAsLong(); } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/TargetedSweeper.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/TargetedSweeper.java index f9661d4d9ca..497b43a4cc1 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/TargetedSweeper.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/TargetedSweeper.java @@ -253,7 +253,7 @@ private BackgroundSweepScheduler(int numThreads, SweeperStrategy sweepStrategy) this.numThreads = numThreads; this.sweepStrategy = sweepStrategy; this.delay = new SweepDelay( - runtime.get().pauseMillis(), + () -> runtime.get().pauseMillis(), millis -> metrics.updateSweepDelayMetric(sweepStrategy, millis), () -> runtime.get().batchCellThreshold()); } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/ScalingSweepTaskSchedulerTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/ScalingSweepTaskSchedulerTest.java index a403bab4fa2..0551cb3d068 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/ScalingSweepTaskSchedulerTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/ScalingSweepTaskSchedulerTest.java @@ -52,7 +52,7 @@ public class ScalingSweepTaskSchedulerTest { private final ScalingSweepTaskScheduler scheduler = createScheduler(delay); private final AtomicLong metrics = new AtomicLong(); private final ScalingSweepTaskScheduler schedulerWithDelay = - createScheduler(new SweepDelay(DELAY, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE)); + createScheduler(new SweepDelay(() -> DELAY, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE)); private boolean firstIteration = true; @@ -115,7 +115,7 @@ public void whenOtherErrorRescheduleAfterMaxPause() throws Exception { @Test public void whenVeryFewEntriesIncreasePause() throws Exception { - SweepDelay sweepDelay = new SweepDelay(100L, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE); + SweepDelay sweepDelay = new SweepDelay(() -> 100L, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE); ScalingSweepTaskScheduler schedulerWithRealDelay = createScheduler(sweepDelay); when(sweepIteration.call()).thenReturn(SUCCESS_TINY); @@ -127,7 +127,7 @@ public void whenVeryFewEntriesIncreasePause() throws Exception { @Test public void whenVeryManyEntriesDecreasePause() throws Exception { - SweepDelay sweepDelay = new SweepDelay(100L, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE); + SweepDelay sweepDelay = new SweepDelay(() -> 100L, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE); ScalingSweepTaskScheduler schedulerWithRealDelay = createScheduler(sweepDelay); when(sweepIteration.call()).thenReturn(SUCCESS_HUGE); @@ -139,7 +139,7 @@ public void whenVeryManyEntriesDecreasePause() throws Exception { @Test public void exceptionalIterationsDoNotAffectPause() throws Exception { - SweepDelay sweepDelay = new SweepDelay(100L, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE); + SweepDelay sweepDelay = new SweepDelay(() -> 100L, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE); ScalingSweepTaskScheduler schedulerWithRealDelay = createScheduler(sweepDelay); when(sweepIteration.call()) .thenReturn( diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/SweepDelayTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/SweepDelayTest.java index 3cae421b664..1c06c130a22 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/SweepDelayTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/SweepDelayTest.java @@ -39,7 +39,7 @@ public class SweepDelayTest { private final AtomicLong metrics = new AtomicLong(); private final AtomicInteger sweepBatchSize = new AtomicInteger(SWEEP_BATCH_SIZE); - private SweepDelay delay = new SweepDelay(INITIAL_DELAY, metrics::set, sweepBatchSize::get); + private SweepDelay delay = new SweepDelay(() -> INITIAL_DELAY, metrics::set, sweepBatchSize::get); @Test public void iterationWithNormalBatchReturnsInitialPause() { @@ -49,7 +49,7 @@ public void iterationWithNormalBatchReturnsInitialPause() { @Test public void configurationBelowMinimumIsSetToMinimum() { - SweepDelay negativeDelay = new SweepDelay(-5L, metrics::set, sweepBatchSize::get); + SweepDelay negativeDelay = new SweepDelay(() -> -5L, metrics::set, sweepBatchSize::get); assertThat(negativeDelay.getNextPause(SUCCESS)).isEqualTo(MIN_PAUSE_MILLIS); assertThat(metrics).hasValue(MIN_PAUSE_MILLIS); @@ -57,7 +57,7 @@ public void configurationBelowMinimumIsSetToMinimum() { @Test public void configurationAboveDefaultMaximumIsRespected() { - SweepDelay largeDelay = new SweepDelay(2 * DEFAULT_MAX_PAUSE_MILLIS, metrics::set, sweepBatchSize::get); + SweepDelay largeDelay = new SweepDelay(() -> 2 * DEFAULT_MAX_PAUSE_MILLIS, metrics::set, sweepBatchSize::get); assertThat(largeDelay.getNextPause(SUCCESS)).isEqualTo(2 * DEFAULT_MAX_PAUSE_MILLIS); assertThat(metrics).hasValue(2 * DEFAULT_MAX_PAUSE_MILLIS);