Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[ASTS] Make max bucket size (in coarse partitions) for non-puncher close configurable #7436

Open
wants to merge 1 commit into
base: mdaudali/11-05-_asts_fix_ensure_forward_progress_even_if_a_single_window_does_not_have_enough_timestamps
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public static BackgroundSweeper create(
sweepAssignedBucketStore,
strategies,
numShards,
runtimeConfig.map(
AutoScalingTargetedSweepRuntimeConfig::maxCoarsePartitionsPerBucketForNonPuncherClose),
puncherStore,
timelockService,
bucketAssignerEventHandler);
Expand Down Expand Up @@ -239,12 +241,13 @@ private static DefaultBucketAssigner createBucketAssigner(
DefaultSweepAssignedBucketStore sweepAssignedBucketStore,
List<SweeperStrategy> strategies,
int numShards,
Refreshable<Long> maxCoarsePartitionsPerBucketForNonPuncherClose,
PuncherStore puncherStore,
TimelockService timelockService,
BucketAssignerEventHandler bucketAssignerEventHandler) {
BucketWriter bucketWriter = DefaultBucketWriter.create(sweepAssignedBucketStore, strategies, numShards);
DefaultBucketCloseTimestampCalculator calculator =
DefaultBucketCloseTimestampCalculator.create(puncherStore, timelockService);
DefaultBucketCloseTimestampCalculator calculator = DefaultBucketCloseTimestampCalculator.create(
puncherStore, timelockService, maxCoarsePartitionsPerBucketForNonPuncherClose);
return DefaultBucketAssigner.create(
sweepAssignedBucketStore,
bucketWriter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.refreshable.Refreshable;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -44,25 +45,33 @@ public final class DefaultBucketCloseTimestampCalculator {
@VisibleForTesting
static final long MIN_BUCKET_SIZE = SweepQueueUtils.TS_COARSE_GRANULARITY;

@VisibleForTesting
static final long MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE = 500 * MIN_BUCKET_SIZE; // 5 billion

private final PuncherStore puncherStore;
private final Supplier<Long> freshTimestampSupplier;
private final Refreshable<Long> maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose;
private final Clock clock;

@VisibleForTesting
DefaultBucketCloseTimestampCalculator(
PuncherStore puncherStore, Supplier<Long> freshTimestampSupplier, Clock clock) {
PuncherStore puncherStore,
Supplier<Long> freshTimestampSupplier,
Refreshable<Long> maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose,
Clock clock) {
this.puncherStore = puncherStore;
this.freshTimestampSupplier = freshTimestampSupplier;
this.maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose =
maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose;
this.clock = clock;
}

public static DefaultBucketCloseTimestampCalculator create(
PuncherStore puncherStore, TimelockService timelockService) {
PuncherStore puncherStore,
TimelockService timelockService,
Refreshable<Long> maxNumberOfCoarsePartitionsPerBucket) {
return new DefaultBucketCloseTimestampCalculator(
puncherStore, timelockService::getFreshTimestamp, Clock.systemUTC());
puncherStore,
timelockService::getFreshTimestamp,
maxNumberOfCoarsePartitionsPerBucket,
Clock.systemUTC());
}

// A second possible algorithm, rather than the fixed bounds above, is to (assuming the start timestamp is X):
Expand Down Expand Up @@ -98,8 +107,8 @@ public OptionalLong getBucketCloseTimestamp(long startTimestamp) {
// This has the following interesting case: Suppose that the assigner stopped for a day. Then, suppose that the
// within the following 10 minute period from where the assigner currently is looking at, there is _not_ 10 mil
// timestamps. Then, instead of choosing a 10 million sized block, we have a giant block up to the fresh Ts.
// This is capped at 5 billion, and we were okay with this case when there's clock drift, but we should
// evaluate.
// This is capped at the configurable amount, and we were okay with this case when there's clock drift, but we
// should evaluate.
if (clampedTimestamp - startTimestamp < MIN_BUCKET_SIZE) {
long freshClampedTimestamp = clampTimestampToCoarsePartitionBoundary(freshTimestampSupplier.get());
if (freshClampedTimestamp - startTimestamp < MIN_BUCKET_SIZE) {
Expand All @@ -112,10 +121,16 @@ public OptionalLong getBucketCloseTimestamp(long startTimestamp) {
return OptionalLong.empty();
}
// start timestamp is aligned on a coarse partition
// MAX_BUCKET_SIZE is also a multiple of a coarse partition
// currentMaxBucketSize is also a multiple of a coarse partition
// so excluding overflow, start + MAX will also be aligned with a coarse partition.
long cappedTimestamp =
Math.min(startTimestamp + MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE, freshClampedTimestamp);
long currentMaxCoarsePartitions = maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.get();
Preconditions.checkState(
currentMaxCoarsePartitions > 0,
"max coarse partitions must be positive",
SafeArg.of("maxCoarsePartitions", currentMaxCoarsePartitions));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious as to doing the check here as opposed to in the config object (readability wise the config object seems better / failing faster, though I get that this is maybe defensive against other rogue suppliers that might be passed in).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the concern I wrote - happy to do both (just noticed that we don't really do it often in AtlasDB config, hence the comment), I did the latter for the reason you state.


long currentMaxBucketSize = currentMaxCoarsePartitions * SweepQueueUtils.TS_COARSE_GRANULARITY;
long cappedTimestamp = Math.min(startTimestamp + currentMaxBucketSize, freshClampedTimestamp);
if (cappedTimestamp != freshClampedTimestamp) {
logNonPuncherClose(
"but this is too far from the start timestamp. Proposing a capped timestamp {} instead.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ default Duration minShardProgressUpdaterInterval() {
return Duration.ofMinutes(1);
}

// Modify the below config if you expect your service to churn through far more than 100_000_000 timestamps in a 10
// minute window.
@Value.Default
default long maxCoarsePartitionsPerBucketForNonPuncherClose() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this, rather than maxBucketSizeForNonPuncherClose so that it's (mostly) correct by construction (excluding the fact that you can pass non-positive values in - see concerns)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, agree that we shouldn't allow configuration that break the coarse partition based assumptions - otherwise Background Task is broken

return 10; // 100_000_000L timestamps.
}

static AutoScalingTargetedSweepRuntimeConfig defaultConfig() {
return ImmutableAutoScalingTargetedSweepRuntimeConfig.builder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.palantir.atlasdb.sweep.asts.bucketingthings;

import static com.palantir.atlasdb.sweep.asts.bucketingthings.DefaultBucketCloseTimestampCalculator.MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE;
import static com.palantir.atlasdb.sweep.asts.bucketingthings.DefaultBucketCloseTimestampCalculator.MIN_BUCKET_SIZE;
import static com.palantir.atlasdb.sweep.asts.bucketingthings.DefaultBucketCloseTimestampCalculator.TIME_GAP_BETWEEN_BUCKET_START_AND_END;
import static com.palantir.logsafe.testing.Assertions.assertThatLoggableExceptionThrownBy;
Expand All @@ -26,24 +25,33 @@
import com.palantir.atlasdb.cleaner.PuncherStore;
import com.palantir.atlasdb.sweep.queue.SweepQueueUtils;
import com.palantir.logsafe.SafeArg;
import com.palantir.refreshable.Refreshable;
import com.palantir.refreshable.SettableRefreshable;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collection;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.collections.impl.factory.Sets;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public final class DefaultBucketCloseTimestampCalculatorTest {
private final AtomicLong freshTimestamp = new AtomicLong(0);
private final SettableRefreshable<Long> maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose =
Refreshable.create(500L);
private final FakeClock clock = new FakeClock();

@Mock
Expand All @@ -53,8 +61,8 @@ public final class DefaultBucketCloseTimestampCalculatorTest {

@BeforeEach
public void setup() {
bucketCloseTimestampCalculator =
new DefaultBucketCloseTimestampCalculator(puncherStore, freshTimestamp::get, clock);
bucketCloseTimestampCalculator = new DefaultBucketCloseTimestampCalculator(
puncherStore, freshTimestamp::get, maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose, clock);
}

@Test
Expand Down Expand Up @@ -128,24 +136,66 @@ public void throwsIfStartTimestampNotOnCoarsePartitionBoundary() {
}

@ParameterizedTest
@ValueSource(
longs = {
98 * SweepQueueUtils.TS_COARSE_GRANULARITY,
100 * SweepQueueUtils.TS_COARSE_GRANULARITY,
100 * SweepQueueUtils.TS_COARSE_GRANULARITY + 1,
101 * SweepQueueUtils.TS_COARSE_GRANULARITY - 1
})
@MethodSource("puncherTimestampAndMaxCoarsePartitions")
public void returnsClampedAndCappedTimestampIfPuncherTimestampInsufficientlyFarAndLatestFreshTimestampIsTooFarAhead(
long puncherTimestamp) {
long puncherTimestamp, long maxCoarsePartitions) {
long startTimestamp = 100 * SweepQueueUtils.TS_COARSE_GRANULARITY;
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
clock.advance(TIME_GAP_BETWEEN_BUCKET_START_AND_END);
when(puncherStore.get(clock.millis())).thenReturn(puncherTimestamp);
maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.update(maxCoarsePartitions);
long maxBucketSize = maxCoarsePartitions * SweepQueueUtils.TS_COARSE_GRANULARITY;
freshTimestamp.set(2 * maxBucketSize + startTimestamp);

OptionalLong maybeEndTimestamp = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp);
assertThat(maybeEndTimestamp).hasValue(maxBucketSize + startTimestamp);
}

@ParameterizedTest
@ValueSource(longs = {0, -1})
public void throwsIfMaxBucketSizeIsNonPositiveAndPuncherTimestampInsufficientlyFar(long maxCoarsePartitions) {
long startTimestamp = 0;
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
clock.advance(TIME_GAP_BETWEEN_BUCKET_START_AND_END);
when(puncherStore.get(clock.millis())).thenReturn(startTimestamp);

maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.update(maxCoarsePartitions);
freshTimestamp.set(SweepQueueUtils.TS_COARSE_GRANULARITY + startTimestamp);

freshTimestamp.set(2 * MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE + startTimestamp);
assertThatLoggableExceptionThrownBy(
() -> bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp))
.hasLogMessage("max coarse partitions must be positive")
.hasExactlyArgs(SafeArg.of("maxCoarsePartitions", maxCoarsePartitions));
}

@Test
public void loadsLatestMaxCoarsePartitions() {
long startTimestamp = 0;
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
clock.advance(TIME_GAP_BETWEEN_BUCKET_START_AND_END);
when(puncherStore.get(clock.millis())).thenReturn(startTimestamp);

maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.update(1L);
freshTimestamp.set(SweepQueueUtils.TS_COARSE_GRANULARITY * 10 + startTimestamp);

OptionalLong maybeEndTimestamp = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp);
assertThat(maybeEndTimestamp).hasValue(MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE + startTimestamp);
assertThat(maybeEndTimestamp).hasValue(SweepQueueUtils.TS_COARSE_GRANULARITY + startTimestamp);

maxNumberOfCoarsePartitionsPerBucketForNonPuncherClose.update(6L);
OptionalLong maybeEndTimestamp2 = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp);
assertThat(maybeEndTimestamp2).hasValue(6 * SweepQueueUtils.TS_COARSE_GRANULARITY + startTimestamp);
}

private static Collection<Arguments> puncherTimestampAndMaxCoarsePartitions() {
Set<Long> puncherTimestamps = Set.of(
98 * SweepQueueUtils.TS_COARSE_GRANULARITY,
100 * SweepQueueUtils.TS_COARSE_GRANULARITY,
100 * SweepQueueUtils.TS_COARSE_GRANULARITY + 1,
101 * SweepQueueUtils.TS_COARSE_GRANULARITY - 1);
Set<Long> maxCoarsePartitions = Set.of(1L, 2L, 500L);
return Sets.cartesianProduct(puncherTimestamps, maxCoarsePartitions)
.collect(pair -> Arguments.of(pair.getOne(), pair.getTwo()))
.toList();
}

// TODO(mdaudali): Extract this into its own class if we end up needing this elsewhere.
Expand Down