Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[ASTS] Make max bucket size (in coarse partitions) for non-puncher cl…
Browse files Browse the repository at this point in the history
…ose configurable
  • Loading branch information
mdaudali committed Nov 12, 2024
1 parent 750d5b8 commit 9bd2045
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -102,6 +103,8 @@ public static BackgroundSweeper create(
sweepAssignedBucketStore,
strategies,
numShards,
runtimeConfig.map(
AutoScalingTargetedSweepRuntimeConfig::maxCoarsePartitionsPerBucketForNonPuncherClose),
puncherStore,
timelockService,
bucketAssignerEventHandler);
Expand Down Expand Up @@ -239,12 +242,13 @@ private static DefaultBucketAssigner createBucketAssigner(
DefaultSweepAssignedBucketStore sweepAssignedBucketStore,
List<SweeperStrategy> strategies,
int numShards,
Supplier<Long> maxCoarsePartitionsPerBucketForNonPuncherClose,
PuncherStore puncherStore,
TimelockService timelockService,
BucketAssignerEventHandler bucketAssignerEventHandler) {
BucketWriter bucketWriter = DefaultBucketWriter.create(sweepAssignedBucketStore, strategies, numShards);
DefaultBucketCloseTimestampCalculator calculator =
DefaultBucketCloseTimestampCalculator.create(puncherStore, timelockService);
DefaultBucketCloseTimestampCalculator calculator = DefaultBucketCloseTimestampCalculator.create(
puncherStore, timelockService, maxCoarsePartitionsPerBucketForNonPuncherClose);
return DefaultBucketAssigner.create(
sweepAssignedBucketStore,
bucketWriter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,33 @@ public final class DefaultBucketCloseTimestampCalculator {
@VisibleForTesting
static final long MIN_BUCKET_SIZE = SweepQueueUtils.TS_COARSE_GRANULARITY;

@VisibleForTesting
static final long MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE = 500 * MIN_BUCKET_SIZE; // 5 billion

private final PuncherStore puncherStore;
private final Supplier<Long> freshTimestampSupplier;
private final Supplier<Long> maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose;
private final Clock clock;

@VisibleForTesting
DefaultBucketCloseTimestampCalculator(
PuncherStore puncherStore, Supplier<Long> freshTimestampSupplier, Clock clock) {
PuncherStore puncherStore,
Supplier<Long> freshTimestampSupplier,
Supplier<Long> maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose,
Clock clock) {
this.puncherStore = puncherStore;
this.freshTimestampSupplier = freshTimestampSupplier;
this.maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose =
maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose;
this.clock = clock;
}

public static DefaultBucketCloseTimestampCalculator create(
PuncherStore puncherStore, TimelockService timelockService) {
PuncherStore puncherStore,
TimelockService timelockService,
Supplier<Long> maxNumberOfCoarsePartitionsPerBucket) {
return new DefaultBucketCloseTimestampCalculator(
puncherStore, timelockService::getFreshTimestamp, Clock.systemUTC());
puncherStore,
timelockService::getFreshTimestamp,
maxNumberOfCoarsePartitionsPerBucket,
Clock.systemUTC());
}

// A second possible algorithm, rather than the fixed bounds above, is to (assuming the start timestamp is X):
Expand Down Expand Up @@ -98,8 +106,8 @@ public OptionalLong getBucketCloseTimestamp(long startTimestamp) {
// This has the following interesting case: Suppose that the assigner stopped for a day. Then, suppose that the
// within the following 10 minute period from where the assigner currently is looking at, there is _not_ 10 mil
// timestamps. Then, instead of choosing a 10 million sized block, we have a giant block up to the fresh Ts.
// This is capped at 5 billion, and we were okay with this case when there's clock drift, but we should
// evaluate.
// This is capped at the configurable amount, and we were okay with this case when there's clock drift, but we
// should evaluate.
if (clampedTimestamp - startTimestamp < MIN_BUCKET_SIZE) {
long freshClampedTimestamp = clampTimestampToCoarsePartitionBoundary(freshTimestampSupplier.get());
if (freshClampedTimestamp - startTimestamp < MIN_BUCKET_SIZE) {
Expand All @@ -112,10 +120,16 @@ public OptionalLong getBucketCloseTimestamp(long startTimestamp) {
return OptionalLong.empty();
}
// start timestamp is aligned on a coarse partition
// MAX_BUCKET_SIZE is also a multiple of a coarse partition
// currentMaxBucketSize is also a multiple of a coarse partition
// so excluding overflow, start + MAX will also be aligned with a coarse partition.
long cappedTimestamp =
Math.min(startTimestamp + MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE, freshClampedTimestamp);
long currentMaxCoarsePartitions = maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.get();
Preconditions.checkState(
currentMaxCoarsePartitions > 0,
"max coarse partitions must be positive",
SafeArg.of("maxCoarsePartitions", currentMaxCoarsePartitions));

long currentMaxBucketSize = currentMaxCoarsePartitions * SweepQueueUtils.TS_COARSE_GRANULARITY;
long cappedTimestamp = Math.min(startTimestamp + currentMaxBucketSize, freshClampedTimestamp);
if (cappedTimestamp != freshClampedTimestamp) {
logNonPuncherClose(
"but this is too far from the start timestamp. Proposing a capped timestamp {} instead.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ default Duration minShardProgressUpdaterInterval() {
return Duration.ofMinutes(1);
}

// Modify the below config if you expect your service to churn through far more than 100_000_000 timestamps in a 10
// minute window.
@Value.Default
default long maxCoarsePartitionsPerBucketForNonPuncherClose() {
return 10; // 100_000_000L timestamps.
}

static AutoScalingTargetedSweepRuntimeConfig defaultConfig() {
return ImmutableAutoScalingTargetedSweepRuntimeConfig.builder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.palantir.atlasdb.sweep.asts.bucketingthings;

import static com.palantir.atlasdb.sweep.asts.bucketingthings.DefaultBucketCloseTimestampCalculator.MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE;
import static com.palantir.atlasdb.sweep.asts.bucketingthings.DefaultBucketCloseTimestampCalculator.MIN_BUCKET_SIZE;
import static com.palantir.atlasdb.sweep.asts.bucketingthings.DefaultBucketCloseTimestampCalculator.TIME_GAP_BETWEEN_BUCKET_START_AND_END;
import static com.palantir.logsafe.testing.Assertions.assertThatLoggableExceptionThrownBy;
Expand All @@ -30,20 +29,26 @@
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collection;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.collections.impl.factory.Sets;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public final class DefaultBucketCloseTimestampCalculatorTest {
private final AtomicLong freshTimestamp = new AtomicLong(0);
private final AtomicLong maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose = new AtomicLong(500);
private final FakeClock clock = new FakeClock();

@Mock
Expand All @@ -53,8 +58,8 @@ public final class DefaultBucketCloseTimestampCalculatorTest {

@BeforeEach
public void setup() {
bucketCloseTimestampCalculator =
new DefaultBucketCloseTimestampCalculator(puncherStore, freshTimestamp::get, clock);
bucketCloseTimestampCalculator = new DefaultBucketCloseTimestampCalculator(
puncherStore, freshTimestamp::get, maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose::get, clock);
}

@Test
Expand Down Expand Up @@ -126,18 +131,65 @@ public void throwsIfStartTimestampNotOnCoarsePartitionBoundary() {
}

@ParameterizedTest
@ValueSource(longs = {98 * SweepQueueUtils.TS_COARSE_GRANULARITY, 100 * SweepQueueUtils.TS_COARSE_GRANULARITY})
@MethodSource("puncherTimestampAndMaxCoarsePartitions")
public void returnsClampedAndCappedTimestampIfPuncherTimestampInsufficientlyFarAndLatestFreshTimestampIsTooFarAhead(
long puncherTimestamp) {
long puncherTimestamp, long maxCoarsePartitions) {
long startTimestamp = 100 * SweepQueueUtils.TS_COARSE_GRANULARITY;
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
clock.advance(TIME_GAP_BETWEEN_BUCKET_START_AND_END);
when(puncherStore.get(clock.millis())).thenReturn(puncherTimestamp);
maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.set(maxCoarsePartitions);
long maxBucketSize = maxCoarsePartitions * SweepQueueUtils.TS_COARSE_GRANULARITY;
freshTimestamp.set(2 * maxBucketSize + startTimestamp);

freshTimestamp.set(2 * MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE + startTimestamp);
OptionalLong maybeEndTimestamp = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp);
assertThat(maybeEndTimestamp).hasValue(maxBucketSize + startTimestamp);
}

@ParameterizedTest
@ValueSource(longs = {0, -1})
public void throwsIfMaxBucketSizeIsNonPositiveAndPuncherTimestampInsufficientlyFar(long maxCoarsePartitions) {
long startTimestamp = 0;
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
clock.advance(TIME_GAP_BETWEEN_BUCKET_START_AND_END);
when(puncherStore.get(clock.millis())).thenReturn(startTimestamp);

maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.set(maxCoarsePartitions);
freshTimestamp.set(SweepQueueUtils.TS_COARSE_GRANULARITY + startTimestamp);

assertThatLoggableExceptionThrownBy(
() -> bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp))
.hasLogMessage("max coarse partitions must be positive")
.hasExactlyArgs(SafeArg.of("maxCoarsePartitions", maxCoarsePartitions));
}

@Test
public void loadsLatestMaxCoarsePartitions() {
long startTimestamp = 0;
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
clock.advance(TIME_GAP_BETWEEN_BUCKET_START_AND_END);
when(puncherStore.get(clock.millis())).thenReturn(startTimestamp);

maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.set(1);
freshTimestamp.set(SweepQueueUtils.TS_COARSE_GRANULARITY * 10 + startTimestamp);

OptionalLong maybeEndTimestamp = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp);
assertThat(maybeEndTimestamp).hasValue(MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE + startTimestamp);
assertThat(maybeEndTimestamp).hasValue(SweepQueueUtils.TS_COARSE_GRANULARITY + startTimestamp);

maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.set(6);
OptionalLong maybeEndTimestamp2 = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp);
assertThat(maybeEndTimestamp2).hasValue(6 * SweepQueueUtils.TS_COARSE_GRANULARITY + startTimestamp);
}

private static Collection<Arguments> puncherTimestampAndMaxCoarsePartitions() {
Set<Long> puncherTimestamps = Set.of(
98 * SweepQueueUtils.TS_COARSE_GRANULARITY,
100 * SweepQueueUtils.TS_COARSE_GRANULARITY,
100 * SweepQueueUtils.TS_COARSE_GRANULARITY + 1);
Set<Long> maxCoarsePartitions = Set.of(1L, 2L, 500L);
return Sets.cartesianProduct(puncherTimestamps, maxCoarsePartitions)
.collect(pair -> Arguments.of(pair.getOne(), pair.getTwo()))
.toList();
}

// TODO(mdaudali): Extract this into its own class if we end up needing this elsewhere.
Expand Down

0 comments on commit 9bd2045

Please sign in to comment.