From 6e7808aae1cb2b86843544265a777393f5651544 Mon Sep 17 00:00:00 2001 From: Mohammed Daudali Date: Tue, 12 Nov 2024 11:23:16 +0000 Subject: [PATCH] [ASTS] Make max bucket size (in coarse partitions) for non-puncher close configurable --- .../asts/BucketBasedTargetedSweeper.java | 8 +- ...DefaultBucketCloseTimestampCalculator.java | 36 ++++++--- ...AutoScalingTargetedSweepRuntimeConfig.java | 7 ++ ...ultBucketCloseTimestampCalculatorTest.java | 73 +++++++++++++++---- 4 files changed, 98 insertions(+), 26 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/BucketBasedTargetedSweeper.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/BucketBasedTargetedSweeper.java index 0bb0af2c25..18e60d3d01 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/BucketBasedTargetedSweeper.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/BucketBasedTargetedSweeper.java @@ -42,6 +42,7 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -102,6 +103,8 @@ public static BackgroundSweeper create( sweepAssignedBucketStore, strategies, numShards, + runtimeConfig.map( + AutoScalingTargetedSweepRuntimeConfig::maxCoarsePartitionsPerBucketForNonPuncherClose), puncherStore, timelockService, bucketAssignerEventHandler); @@ -239,12 +242,13 @@ private static DefaultBucketAssigner createBucketAssigner( DefaultSweepAssignedBucketStore sweepAssignedBucketStore, List strategies, int numShards, + Supplier maxCoarsePartitionsPerBucketForNonPuncherClose, PuncherStore puncherStore, TimelockService timelockService, BucketAssignerEventHandler bucketAssignerEventHandler) { BucketWriter bucketWriter = DefaultBucketWriter.create(sweepAssignedBucketStore, strategies, numShards); - DefaultBucketCloseTimestampCalculator calculator = - DefaultBucketCloseTimestampCalculator.create(puncherStore, timelockService); + DefaultBucketCloseTimestampCalculator calculator = DefaultBucketCloseTimestampCalculator.create( + puncherStore, timelockService, maxCoarsePartitionsPerBucketForNonPuncherClose); return DefaultBucketAssigner.create( sweepAssignedBucketStore, bucketWriter, diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculator.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculator.java index c41f1e5d18..57c4ee95d1 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculator.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculator.java @@ -44,25 +44,33 @@ public final class DefaultBucketCloseTimestampCalculator { @VisibleForTesting static final long MIN_BUCKET_SIZE = SweepQueueUtils.TS_COARSE_GRANULARITY; - @VisibleForTesting - static final long MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE = 500 * MIN_BUCKET_SIZE; // 5 billion - private final PuncherStore puncherStore; private final Supplier freshTimestampSupplier; + private final Supplier maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose; private final Clock clock; @VisibleForTesting DefaultBucketCloseTimestampCalculator( - PuncherStore puncherStore, Supplier freshTimestampSupplier, Clock clock) { + PuncherStore puncherStore, + Supplier freshTimestampSupplier, + Supplier maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose, + Clock clock) { this.puncherStore = puncherStore; this.freshTimestampSupplier = freshTimestampSupplier; + this.maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose = + maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose; this.clock = clock; } public static DefaultBucketCloseTimestampCalculator create( - PuncherStore puncherStore, TimelockService timelockService) { + PuncherStore puncherStore, + TimelockService timelockService, + Supplier maxNumberOfCoarsePartitionsPerBucket) { return new DefaultBucketCloseTimestampCalculator( - puncherStore, timelockService::getFreshTimestamp, Clock.systemUTC()); + puncherStore, + timelockService::getFreshTimestamp, + maxNumberOfCoarsePartitionsPerBucket, + Clock.systemUTC()); } // A second possible algorithm, rather than the fixed bounds above, is to (assuming the start timestamp is X): @@ -98,8 +106,8 @@ public OptionalLong getBucketCloseTimestamp(long startTimestamp) { // This has the following interesting case: Suppose that the assigner stopped for a day. Then, suppose that the // within the following 10 minute period from where the assigner currently is looking at, there is _not_ 10 mil // timestamps. Then, instead of choosing a 10 million sized block, we have a giant block up to the fresh Ts. - // This is capped at 5 billion, and we were okay with this case when there's clock drift, but we should - // evaluate. + // This is capped at the configurable amount, and we were okay with this case when there's clock drift, but we + // should evaluate. if (clampedTimestamp - startTimestamp < MIN_BUCKET_SIZE) { long freshClampedTimestamp = clampTimestampToCoarsePartitionBoundary(freshTimestampSupplier.get()); if (freshClampedTimestamp - startTimestamp < MIN_BUCKET_SIZE) { @@ -112,10 +120,16 @@ public OptionalLong getBucketCloseTimestamp(long startTimestamp) { return OptionalLong.empty(); } // start timestamp is aligned on a coarse partition - // MAX_BUCKET_SIZE is also a multiple of a coarse partition + // currentMaxBucketSize is also a multiple of a coarse partition // so excluding overflow, start + MAX will also be aligned with a coarse partition. - long cappedTimestamp = - Math.min(startTimestamp + MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE, freshClampedTimestamp); + long currentMaxCoarsePartitions = maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.get(); + Preconditions.checkState( + currentMaxCoarsePartitions > 0, + "max coarse partitions must be positive", + SafeArg.of("maxCoarsePartitions", currentMaxCoarsePartitions)); + + long currentMaxBucketSize = currentMaxCoarsePartitions * SweepQueueUtils.TS_COARSE_GRANULARITY; + long cappedTimestamp = Math.min(startTimestamp + currentMaxBucketSize, freshClampedTimestamp); if (cappedTimestamp != freshClampedTimestamp) { logNonPuncherClose( "but this is too far from the start timestamp. Proposing a capped timestamp {} instead.", diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/config/AutoScalingTargetedSweepRuntimeConfig.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/config/AutoScalingTargetedSweepRuntimeConfig.java index 3b1dfed30a..4fc2324778 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/config/AutoScalingTargetedSweepRuntimeConfig.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/config/AutoScalingTargetedSweepRuntimeConfig.java @@ -54,6 +54,13 @@ default Duration minShardProgressUpdaterInterval() { return Duration.ofMinutes(1); } + // Modify the below config if you expect your service to churn through far more than 100_000_000 timestamps in a 10 + // minute window. + @Value.Default + default long maxCoarsePartitionsPerBucketForNonPuncherClose() { + return 10; // 100_000_000L timestamps. + } + static AutoScalingTargetedSweepRuntimeConfig defaultConfig() { return ImmutableAutoScalingTargetedSweepRuntimeConfig.builder().build(); } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculatorTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculatorTest.java index 8bfd39705d..84ac5273a5 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculatorTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculatorTest.java @@ -16,7 +16,6 @@ package com.palantir.atlasdb.sweep.asts.bucketingthings; -import static com.palantir.atlasdb.sweep.asts.bucketingthings.DefaultBucketCloseTimestampCalculator.MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE; import static com.palantir.atlasdb.sweep.asts.bucketingthings.DefaultBucketCloseTimestampCalculator.MIN_BUCKET_SIZE; import static com.palantir.atlasdb.sweep.asts.bucketingthings.DefaultBucketCloseTimestampCalculator.TIME_GAP_BETWEEN_BUCKET_START_AND_END; import static com.palantir.logsafe.testing.Assertions.assertThatLoggableExceptionThrownBy; @@ -30,13 +29,18 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneId; +import java.util.Collection; import java.util.OptionalLong; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.collections.impl.factory.Sets; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -44,6 +48,7 @@ @ExtendWith(MockitoExtension.class) public final class DefaultBucketCloseTimestampCalculatorTest { private final AtomicLong freshTimestamp = new AtomicLong(0); + private final AtomicLong maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose = new AtomicLong(500); private final FakeClock clock = new FakeClock(); @Mock @@ -53,8 +58,8 @@ public final class DefaultBucketCloseTimestampCalculatorTest { @BeforeEach public void setup() { - bucketCloseTimestampCalculator = - new DefaultBucketCloseTimestampCalculator(puncherStore, freshTimestamp::get, clock); + bucketCloseTimestampCalculator = new DefaultBucketCloseTimestampCalculator( + puncherStore, freshTimestamp::get, maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose::get, clock); } @Test @@ -128,24 +133,66 @@ public void throwsIfStartTimestampNotOnCoarsePartitionBoundary() { } @ParameterizedTest - @ValueSource( - longs = { - 98 * SweepQueueUtils.TS_COARSE_GRANULARITY, - 100 * SweepQueueUtils.TS_COARSE_GRANULARITY, - 100 * SweepQueueUtils.TS_COARSE_GRANULARITY + 1, - 101 * SweepQueueUtils.TS_COARSE_GRANULARITY - 1 - }) + @MethodSource("puncherTimestampAndMaxCoarsePartitions") public void returnsClampedAndCappedTimestampIfPuncherTimestampInsufficientlyFarAndLatestFreshTimestampIsTooFarAhead( - long puncherTimestamp) { + long puncherTimestamp, long maxCoarsePartitions) { long startTimestamp = 100 * SweepQueueUtils.TS_COARSE_GRANULARITY; when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis()); clock.advance(TIME_GAP_BETWEEN_BUCKET_START_AND_END); when(puncherStore.get(clock.millis())).thenReturn(puncherTimestamp); + maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.set(maxCoarsePartitions); + long maxBucketSize = maxCoarsePartitions * SweepQueueUtils.TS_COARSE_GRANULARITY; + freshTimestamp.set(2 * maxBucketSize + startTimestamp); + + OptionalLong maybeEndTimestamp = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp); + assertThat(maybeEndTimestamp).hasValue(maxBucketSize + startTimestamp); + } + + @ParameterizedTest + @ValueSource(longs = {0, -1}) + public void throwsIfMaxBucketSizeIsNonPositiveAndPuncherTimestampInsufficientlyFar(long maxCoarsePartitions) { + long startTimestamp = 0; + when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis()); + clock.advance(TIME_GAP_BETWEEN_BUCKET_START_AND_END); + when(puncherStore.get(clock.millis())).thenReturn(startTimestamp); + + maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.set(maxCoarsePartitions); + freshTimestamp.set(SweepQueueUtils.TS_COARSE_GRANULARITY + startTimestamp); - freshTimestamp.set(2 * MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE + startTimestamp); + assertThatLoggableExceptionThrownBy( + () -> bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp)) + .hasLogMessage("max coarse partitions must be positive") + .hasExactlyArgs(SafeArg.of("maxCoarsePartitions", maxCoarsePartitions)); + } + + @Test + public void loadsLatestMaxCoarsePartitions() { + long startTimestamp = 0; + when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis()); + clock.advance(TIME_GAP_BETWEEN_BUCKET_START_AND_END); + when(puncherStore.get(clock.millis())).thenReturn(startTimestamp); + + maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.set(1); + freshTimestamp.set(SweepQueueUtils.TS_COARSE_GRANULARITY * 10 + startTimestamp); OptionalLong maybeEndTimestamp = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp); - assertThat(maybeEndTimestamp).hasValue(MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE + startTimestamp); + assertThat(maybeEndTimestamp).hasValue(SweepQueueUtils.TS_COARSE_GRANULARITY + startTimestamp); + + maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.set(6); + OptionalLong maybeEndTimestamp2 = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp); + assertThat(maybeEndTimestamp2).hasValue(6 * SweepQueueUtils.TS_COARSE_GRANULARITY + startTimestamp); + } + + private static Collection puncherTimestampAndMaxCoarsePartitions() { + Set puncherTimestamps = Set.of( + 98 * SweepQueueUtils.TS_COARSE_GRANULARITY, + 100 * SweepQueueUtils.TS_COARSE_GRANULARITY, + 100 * SweepQueueUtils.TS_COARSE_GRANULARITY + 1, + 101 * SweepQueueUtils.TS_COARSE_GRANULARITY - 1); + Set maxCoarsePartitions = Set.of(1L, 2L, 500L); + return Sets.cartesianProduct(puncherTimestamps, maxCoarsePartitions) + .collect(pair -> Arguments.of(pair.getOne(), pair.getTwo())) + .toList(); } // TODO(mdaudali): Extract this into its own class if we end up needing this elsewhere.