Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[ASTS] clamp to boundaries (#7303)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdaudali authored Oct 7, 2024
1 parent f3dd18f commit 6bd37f7
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.CompileTimeConstant;
import com.palantir.atlasdb.cleaner.PuncherStore;
import com.palantir.atlasdb.sweep.queue.SweepQueueUtils;
import com.palantir.lock.client.TimeLockClient;
import com.palantir.logsafe.Arg;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
Expand All @@ -40,10 +42,10 @@ final class DefaultBucketCloseTimestampCalculator {
static final Duration TIME_GAP_BETWEEN_BUCKET_START_AND_END = Duration.ofMinutes(10);

@VisibleForTesting
static final long MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE = 5_000_000_000L;
static final long MIN_BUCKET_SIZE = SweepQueueUtils.TS_COARSE_GRANULARITY;

@VisibleForTesting
static final long MIN_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE = 50_000L;
static final long MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE = 500 * MIN_BUCKET_SIZE; // 5 billion

private final PuncherStore puncherStore;
private final Supplier<Long> freshTimestampSupplier;
Expand Down Expand Up @@ -77,6 +79,10 @@ public static DefaultBucketCloseTimestampCalculator create(
// We’re explicitly not doing this algorithm now given the added complexity, but this may be implemented if the
// fixed parameters are too coarse.
public OptionalLong getBucketCloseTimestamp(long startTimestamp) {
Preconditions.checkArgument(
startTimestamp == clampTimestampToCoarsePartitionBoundary(startTimestamp),
"startTimestamp must be on a coarse partition boundary",
SafeArg.of("startTimestamp", startTimestamp));
long openWallClockTimeMillis = puncherStore.getMillisForTimestamp(startTimestamp);
long closeWallClockTimeMillis = openWallClockTimeMillis + TIME_GAP_BETWEEN_BUCKET_START_AND_END.toMillis();

Expand All @@ -88,61 +94,79 @@ public OptionalLong getBucketCloseTimestamp(long startTimestamp) {

// if this case happens, it's possibly clockdrift or a delayed write.
if (finalLogicalTimestamp <= startTimestamp) {
long freshTimestamp = freshTimestampSupplier.get();
if (freshTimestamp - startTimestamp < MIN_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE) {
long freshClampedTimestamp = clampTimestampToCoarsePartitionBoundary(freshTimestampSupplier.get());
if (freshClampedTimestamp - startTimestamp < MIN_BUCKET_SIZE) {
logNonPuncherClose(
"but this is not sufficiently far from the start timestamp to close the bucket.",
finalLogicalTimestamp,
startTimestamp,
openWallClockTimeMillis,
freshTimestamp);
freshClampedTimestamp);
return OptionalLong.empty();
}

long cappedTimestamp = Math.min(startTimestamp + MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE, freshTimestamp);
if (cappedTimestamp != freshTimestamp) {
// start timestamp is aligned on a coarse partition
// MAX_BUCKET_SIZE is also a multiple of a coarse partition
// so excluding overflow, start + MAX will also be aligned with a coarse partition.
long cappedTimestamp =
Math.min(startTimestamp + MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE, freshClampedTimestamp);
if (cappedTimestamp != freshClampedTimestamp) {
logNonPuncherClose(
"but this is too far from the start timestamp. Proposing a capped timestamp {} instead.",
finalLogicalTimestamp,
startTimestamp,
openWallClockTimeMillis,
freshTimestamp,
freshClampedTimestamp,
SafeArg.of("cappedTimestamp", cappedTimestamp));
} else {
logNonPuncherClose(
"and this is sufficiently far from the start timestamp to close the bucket.",
finalLogicalTimestamp,
startTimestamp,
openWallClockTimeMillis,
freshTimestamp);
freshClampedTimestamp);
}
return OptionalLong.of(cappedTimestamp);
} else {
return OptionalLong.of(finalLogicalTimestamp);
long clampedTimestamp = clampTimestampToCoarsePartitionBoundary(finalLogicalTimestamp);
if (clampedTimestamp - startTimestamp < MIN_BUCKET_SIZE) {
log.info(
"The proposed final timestamp {} is not sufficiently far from start timestamp {} (minimum"
+ " size: {}) to close the bucket",
SafeArg.of("proposedClampedTimestamp", clampedTimestamp),
SafeArg.of("startTimestamp", startTimestamp),
SafeArg.of("minimumSize", MIN_BUCKET_SIZE));
return OptionalLong.empty();
}
return OptionalLong.of(clampedTimestamp);
}
}

private long clampTimestampToCoarsePartitionBoundary(long timestamp) {
return SweepQueueUtils.minTsForCoarsePartition(SweepQueueUtils.tsPartitionCoarse(timestamp));
}

private void logNonPuncherClose(
@CompileTimeConstant String logMessageSuffix,
long finalTimestampFromPunchTable,
long startTimestamp,
long openWallClockTimeMillis,
long freshTimestamp,
long freshClampedTimestamp,
Arg<?>... additionalArgs) {
List<Arg<?>> args = ImmutableList.<Arg<?>>builder()
.add(
SafeArg.of("finalTimestampFromPunchTable", finalTimestampFromPunchTable),
SafeArg.of("startTimestamp", startTimestamp),
SafeArg.of("timeGap", TIME_GAP_BETWEEN_BUCKET_START_AND_END),
SafeArg.of("openWallClockTimeMillis", openWallClockTimeMillis),
SafeArg.of("freshTimestamp", freshTimestamp))
SafeArg.of("freshClampedTimestamp", freshClampedTimestamp))
.addAll(Arrays.asList(additionalArgs))
.build();
log.info(
"Read a logical timestamp {} from the puncher store that's less than or equal to the start timestamp"
+ " {}, despite requesting a time {} after the start timestamp's associated wall clock time {}."
+ " This is likely due to some form of clock drift, but should not be happening repeatedly. We"
+ " read a fresh timestamp {}, " + logMessageSuffix,
+ " read a fresh timestamp that has been clamped down to the nearest coarse"
+ " partition boundary {}, " + logMessageSuffix,
args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
package com.palantir.atlasdb.sweep.asts.bucketingthings;

import static com.palantir.atlasdb.sweep.asts.bucketingthings.DefaultBucketCloseTimestampCalculator.MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE;
import static com.palantir.atlasdb.sweep.asts.bucketingthings.DefaultBucketCloseTimestampCalculator.MIN_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE;
import static com.palantir.atlasdb.sweep.asts.bucketingthings.DefaultBucketCloseTimestampCalculator.MIN_BUCKET_SIZE;
import static com.palantir.atlasdb.sweep.asts.bucketingthings.DefaultBucketCloseTimestampCalculator.TIME_GAP_BETWEEN_BUCKET_START_AND_END;
import static com.palantir.logsafe.testing.Assertions.assertThatLoggableExceptionThrownBy;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.Mockito.when;

import com.palantir.atlasdb.cleaner.PuncherStore;
import com.palantir.atlasdb.sweep.queue.SweepQueueUtils;
import com.palantir.logsafe.SafeArg;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -54,9 +57,18 @@ public void setup() {
new DefaultBucketCloseTimestampCalculator(puncherStore, freshTimestamp::get, clock);
}

@Test
public void throwsIfStartTimestampNotOnCoarsePartitionBoundary() {
long startTimestamp = 18; // Arbitrarily chosen.
assertThatLoggableExceptionThrownBy(
() -> bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp))
.hasLogMessage("startTimestamp must be on a coarse partition boundary")
.hasExactlyArgs(SafeArg.of("startTimestamp", startTimestamp));
}

@Test
public void returnsEmptyIfSufficientTimeHasNotPassedSinceStartTimestamp() {
int startTimestamp = 18; // Arbitrarily chosen.
long startTimestamp = 18 * SweepQueueUtils.TS_COARSE_GRANULARITY; // Arbitrarily chosen.
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
OptionalLong maybeEndTimestamp = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp);
assertThat(maybeEndTimestamp).isEmpty();
Expand All @@ -65,63 +77,67 @@ public void returnsEmptyIfSufficientTimeHasNotPassedSinceStartTimestamp() {
@Test
public void
returnsLogicalTimestampSufficientTimeAfterStartTimestampIfTenMinutesHasPassedAndLogicalTimestampAheadOfStart() {
int startTimestamp = 123;
long startTimestamp = 123 * SweepQueueUtils.TS_COARSE_GRANULARITY;
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
clock.advance(TIME_GAP_BETWEEN_BUCKET_START_AND_END);
when(puncherStore.get(clock.millis())).thenReturn(153L);

long punchStoreTimestamp = 2 * SweepQueueUtils.TS_COARSE_GRANULARITY + startTimestamp;
when(puncherStore.get(clock.millis())).thenReturn(punchStoreTimestamp);

OptionalLong maybeEndTimestamp = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp);
assertThat(maybeEndTimestamp).hasValue(153L);
assertThat(maybeEndTimestamp).hasValue(punchStoreTimestamp);
}

@ParameterizedTest
@ValueSource(longs = {2300, 2315}) // less than, and equal to.
@ValueSource(
longs = {2300 * SweepQueueUtils.TS_COARSE_GRANULARITY, 2315 * SweepQueueUtils.TS_COARSE_GRANULARITY
}) // less than, and equal to.
// This is to test what happens when the puncher store returns a timestamp less than (or equal to) the start
// timestamp
// In both of these cases, we should not use the punch table result, but instead fallback to the relevant algorithm.
public void
returnsEmptyIfSufficientTimeHasPassedPuncherTimestampBeforeStartAndLatestFreshTimestampNotFarEnoughAhead(
long puncherTimestamp) {
int startTimestamp = 2315;
long startTimestamp = 2315 * SweepQueueUtils.TS_COARSE_GRANULARITY;
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
clock.advance(TIME_GAP_BETWEEN_BUCKET_START_AND_END);
when(puncherStore.get(clock.millis())).thenReturn(puncherTimestamp);

freshTimestamp.set(MIN_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE - 1 + startTimestamp);
freshTimestamp.set(MIN_BUCKET_SIZE - 1 + startTimestamp);

OptionalLong maybeEndTimestamp = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp);
assertThat(maybeEndTimestamp).isEmpty();
}

@ParameterizedTest
@ValueSource(longs = {123, 35124})
@ValueSource(longs = {123 * SweepQueueUtils.TS_COARSE_GRANULARITY, 312 * SweepQueueUtils.TS_COARSE_GRANULARITY})
public void
returnsLatestFreshTimestampIfSufficientTimeHasPassedPuncherTimestampBeforeStartAndCalculatedTimestampFarEnoughAhead(
returnsLatestClampedFreshTimestampIfSufficientTimeHasPassedPuncherTimestampBeforeStartAndCalculatedTimestampFarEnoughAhead(
long puncherTimestamp) {
int startTimestamp = 35124;
long startTimestamp = 312 * SweepQueueUtils.TS_COARSE_GRANULARITY;
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
clock.advance(TIME_GAP_BETWEEN_BUCKET_START_AND_END);
when(puncherStore.get(clock.millis())).thenReturn(puncherTimestamp);

freshTimestamp.set(MIN_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE + 1 + startTimestamp);
freshTimestamp.set(11 * SweepQueueUtils.TS_COARSE_GRANULARITY + startTimestamp);

OptionalLong maybeEndTimestamp = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp);
assertThat(maybeEndTimestamp).hasValue(freshTimestamp.get());
}

@ParameterizedTest
@ValueSource(longs = {98, 100})
public void returnsCappedTimestampIfPuncherTimestampBeforeStartAndLatestFreshTimestampIsTooFarAhead(
@ValueSource(longs = {98 * SweepQueueUtils.TS_COARSE_GRANULARITY, 100 * SweepQueueUtils.TS_COARSE_GRANULARITY})
public void returnsClampedAndCappedTimestampIfPuncherTimestampBeforeStartAndLatestFreshTimestampIsTooFarAhead(
long puncherTimestamp) {
int startTimestamp = 100;
long startTimestamp = 100 * SweepQueueUtils.TS_COARSE_GRANULARITY;
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
clock.advance(TIME_GAP_BETWEEN_BUCKET_START_AND_END);
when(puncherStore.get(clock.millis())).thenReturn(puncherTimestamp);

freshTimestamp.set(2 * MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE);
freshTimestamp.set(2 * MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE + startTimestamp);

OptionalLong maybeEndTimestamp = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp);
assertThat(maybeEndTimestamp).hasValue(startTimestamp + MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE);
assertThat(maybeEndTimestamp).hasValue(MAX_BUCKET_SIZE_FOR_NON_PUNCHER_CLOSE + startTimestamp);
}

// TODO(mdaudali): Extract this into its own class if we end up needing this elsewhere.
Expand Down

0 comments on commit 6bd37f7

Please sign in to comment.