Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
TargetedSweeper no longer reads runtime prefs on construction (#5888)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilyanep authored Feb 3, 2022
1 parent fd9a0ea commit 734ace5
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package com.palantir.atlasdb.sweep.queue;

import com.google.common.base.Suppliers;
import com.google.common.math.DoubleMath;
import java.math.RoundingMode;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

/**
* This class calculates the delay for the next iteration of targeted sweep from the current delay and the outcome
Expand All @@ -48,42 +51,42 @@ class SweepDelay {
static final double EXPONENTIAL_BACKOFF_MULTIPLIER =
Math.exp(Math.log((double) MAX_BACKOFF / MIN_BACKOFF) / ITERATIONS_TO_REACH_MAX_BACKOFF);

private final long initialPause;
private final long maxPauseMillis;
private final LongSupplier initialPause;
private final LongSupplier maxPauseMillis;
private final LongConsumer sweepDelayMetricsUpdater;
private final AtomicLong currentPause;
private final Supplier<AtomicLong> currentPause;
private final AtomicLong insufficientConsistencyPause = new AtomicLong(MIN_BACKOFF);
private final IntSupplier readBatchThreshold;

SweepDelay(long configPause, LongConsumer sweepDelayMetricsUpdater, IntSupplier readBatchThreshold) {
this.maxPauseMillis = Math.max(DEFAULT_MAX_PAUSE_MILLIS, configPause);
this.initialPause = Math.max(MIN_PAUSE_MILLIS, configPause);
SweepDelay(LongSupplier configPause, LongConsumer sweepDelayMetricsUpdater, IntSupplier readBatchThreshold) {
this.maxPauseMillis = () -> Math.max(DEFAULT_MAX_PAUSE_MILLIS, configPause.getAsLong());
this.initialPause = () -> Math.max(MIN_PAUSE_MILLIS, configPause.getAsLong());
this.sweepDelayMetricsUpdater = sweepDelayMetricsUpdater;
this.readBatchThreshold = readBatchThreshold;
this.currentPause = new AtomicLong(initialPause);
this.currentPause = Suppliers.memoize(() -> new AtomicLong(initialPause.getAsLong()));
}

long getInitialPause() {
return initialPause;
return initialPause.getAsLong();
}

long getMaxPause() {
return maxPauseMillis;
return maxPauseMillis.getAsLong();
}

long getNextPause(SweepIterationResult result) {
return SweepIterationResults.caseOf(result)
.success(this::updateCurrentPauseAndGet)
.unableToAcquireShard_(maxPauseMillis)
.unableToAcquireShard_(maxPauseMillis.getAsLong())
.insufficientConsistency_(getInsufficientConsistencyPauseAndCalculateNext())
.otherError_(maxPauseMillis)
.otherError_(maxPauseMillis.getAsLong())
.disabled_(MIN_BACKOFF);
}

private long updateCurrentPauseAndGet(long numSwept) {
resetInsufficientConsistencyBackoff();
long target = pauseTarget(numSwept);
long newPause = currentPause.updateAndGet(oldPause -> (4 * oldPause + target) / 5);
long newPause = currentPause.get().updateAndGet(oldPause -> (4 * oldPause + target) / 5);
sweepDelayMetricsUpdater.accept(newPause);
return newPause;
}
Expand All @@ -99,10 +102,10 @@ private long getInsufficientConsistencyPauseAndCalculateNext() {

private long pauseTarget(long numSwept) {
if (numSwept <= Math.min(BATCH_CELLS_LOW_THRESHOLD, readBatchThreshold.getAsInt() - 1)) {
return maxPauseMillis;
return maxPauseMillis.getAsLong();
} else if (numSwept >= readBatchThreshold.getAsInt()) {
return MIN_PAUSE_MILLIS;
}
return initialPause;
return initialPause.getAsLong();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private BackgroundSweepScheduler(int numThreads, SweeperStrategy sweepStrategy)
this.numThreads = numThreads;
this.sweepStrategy = sweepStrategy;
this.delay = new SweepDelay(
runtime.get().pauseMillis(),
() -> runtime.get().pauseMillis(),
millis -> metrics.updateSweepDelayMetric(sweepStrategy, millis),
() -> runtime.get().batchCellThreshold());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class ScalingSweepTaskSchedulerTest {
private final ScalingSweepTaskScheduler scheduler = createScheduler(delay);
private final AtomicLong metrics = new AtomicLong();
private final ScalingSweepTaskScheduler schedulerWithDelay =
createScheduler(new SweepDelay(DELAY, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE));
createScheduler(new SweepDelay(() -> DELAY, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE));

private boolean firstIteration = true;

Expand Down Expand Up @@ -115,7 +115,7 @@ public void whenOtherErrorRescheduleAfterMaxPause() throws Exception {

@Test
public void whenVeryFewEntriesIncreasePause() throws Exception {
SweepDelay sweepDelay = new SweepDelay(100L, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE);
SweepDelay sweepDelay = new SweepDelay(() -> 100L, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE);
ScalingSweepTaskScheduler schedulerWithRealDelay = createScheduler(sweepDelay);
when(sweepIteration.call()).thenReturn(SUCCESS_TINY);

Expand All @@ -127,7 +127,7 @@ public void whenVeryFewEntriesIncreasePause() throws Exception {

@Test
public void whenVeryManyEntriesDecreasePause() throws Exception {
SweepDelay sweepDelay = new SweepDelay(100L, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE);
SweepDelay sweepDelay = new SweepDelay(() -> 100L, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE);
ScalingSweepTaskScheduler schedulerWithRealDelay = createScheduler(sweepDelay);
when(sweepIteration.call()).thenReturn(SUCCESS_HUGE);

Expand All @@ -139,7 +139,7 @@ public void whenVeryManyEntriesDecreasePause() throws Exception {

@Test
public void exceptionalIterationsDoNotAffectPause() throws Exception {
SweepDelay sweepDelay = new SweepDelay(100L, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE);
SweepDelay sweepDelay = new SweepDelay(() -> 100L, metrics::set, () -> SweepQueueUtils.SWEEP_BATCH_SIZE);
ScalingSweepTaskScheduler schedulerWithRealDelay = createScheduler(sweepDelay);
when(sweepIteration.call())
.thenReturn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class SweepDelayTest {

private final AtomicLong metrics = new AtomicLong();
private final AtomicInteger sweepBatchSize = new AtomicInteger(SWEEP_BATCH_SIZE);
private SweepDelay delay = new SweepDelay(INITIAL_DELAY, metrics::set, sweepBatchSize::get);
private SweepDelay delay = new SweepDelay(() -> INITIAL_DELAY, metrics::set, sweepBatchSize::get);

@Test
public void iterationWithNormalBatchReturnsInitialPause() {
Expand All @@ -49,15 +49,15 @@ public void iterationWithNormalBatchReturnsInitialPause() {

@Test
public void configurationBelowMinimumIsSetToMinimum() {
SweepDelay negativeDelay = new SweepDelay(-5L, metrics::set, sweepBatchSize::get);
SweepDelay negativeDelay = new SweepDelay(() -> -5L, metrics::set, sweepBatchSize::get);

assertThat(negativeDelay.getNextPause(SUCCESS)).isEqualTo(MIN_PAUSE_MILLIS);
assertThat(metrics).hasValue(MIN_PAUSE_MILLIS);
}

@Test
public void configurationAboveDefaultMaximumIsRespected() {
SweepDelay largeDelay = new SweepDelay(2 * DEFAULT_MAX_PAUSE_MILLIS, metrics::set, sweepBatchSize::get);
SweepDelay largeDelay = new SweepDelay(() -> 2 * DEFAULT_MAX_PAUSE_MILLIS, metrics::set, sweepBatchSize::get);

assertThat(largeDelay.getNextPause(SUCCESS)).isEqualTo(2 * DEFAULT_MAX_PAUSE_MILLIS);
assertThat(metrics).hasValue(2 * DEFAULT_MAX_PAUSE_MILLIS);
Expand Down

0 comments on commit 734ace5

Please sign in to comment.