From aeaa0f48c6c1f40d54f134f67eddd5ec50908ad7 Mon Sep 17 00:00:00 2001 From: Mohammed Daudali Date: Tue, 5 Nov 2024 11:22:21 +0000 Subject: [PATCH] [ASTS] Fix: ensure forward progress even if a single window does not have enough timestamps --- ...DefaultBucketCloseTimestampCalculator.java | 29 +++++++-------- ...ultBucketCloseTimestampCalculatorTest.java | 35 +++++++++++-------- 2 files changed, 33 insertions(+), 31 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculator.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculator.java index 8cc57cfbc3..c41f1e5d18 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculator.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculator.java @@ -91,9 +91,16 @@ public OptionalLong getBucketCloseTimestamp(long startTimestamp) { } long finalLogicalTimestamp = puncherStore.get(closeWallClockTimeMillis); + long clampedTimestamp = clampTimestampToCoarsePartitionBoundary(finalLogicalTimestamp); - // if this case happens, it's possibly clockdrift or a delayed write. - if (finalLogicalTimestamp <= startTimestamp) { + // if this case happens, it's possibly clockdrift or a delayed write, or we just make too slow progress + // TODO(mdaudali): Re-evaluate if the lower bound of 10_000_000 timestamps per 10 mins makes sense. + // This has the following interesting case: Suppose that the assigner stopped for a day. Then, suppose that the + // within the following 10 minute period from where the assigner currently is looking at, there is _not_ 10 mil + // timestamps. Then, instead of choosing a 10 million sized block, we have a giant block up to the fresh Ts. + // This is capped at 5 billion, and we were okay with this case when there's clock drift, but we should + // evaluate. + if (clampedTimestamp - startTimestamp < MIN_BUCKET_SIZE) { long freshClampedTimestamp = clampTimestampToCoarsePartitionBoundary(freshTimestampSupplier.get()); if (freshClampedTimestamp - startTimestamp < MIN_BUCKET_SIZE) { logNonPuncherClose( @@ -127,16 +134,6 @@ public OptionalLong getBucketCloseTimestamp(long startTimestamp) { } return OptionalLong.of(cappedTimestamp); } else { - long clampedTimestamp = clampTimestampToCoarsePartitionBoundary(finalLogicalTimestamp); - if (clampedTimestamp - startTimestamp < MIN_BUCKET_SIZE) { - log.info( - "The proposed final timestamp {} is not sufficiently far from start timestamp {} (minimum" - + " size: {}) to close the bucket", - SafeArg.of("proposedClampedTimestamp", clampedTimestamp), - SafeArg.of("startTimestamp", startTimestamp), - SafeArg.of("minimumSize", MIN_BUCKET_SIZE)); - return OptionalLong.empty(); - } return OptionalLong.of(clampedTimestamp); } } @@ -162,10 +159,10 @@ private void logNonPuncherClose( .addAll(Arrays.asList(additionalArgs)) .build(); log.info( - "Read a logical timestamp {} from the puncher store that's less than or equal to the start timestamp" - + " {}, despite requesting a time {} after the start timestamp's associated wall clock time {}." - + " This is likely due to some form of clock drift, but should not be happening repeatedly. We" - + " read a fresh timestamp that has been clamped down to the nearest coarse" + "Read a logical timestamp {} from the puncher store that's not sufficiently far from the start" + + " timestamp {}, despite requesting a time {} after the start timestamp's associated wall" + + " clock time {}. This is likely due to some form of clock drift, but should not be happening" + + " repeatedly. We read a fresh timestamp that has been clamped down to the nearest coarse" + " partition boundary {}, " + logMessageSuffix, args); } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculatorTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculatorTest.java index 8cf5d5a9d9..8db886d7a1 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculatorTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultBucketCloseTimestampCalculatorTest.java @@ -66,14 +66,6 @@ public void throwsIfStartTimestampNotOnCoarsePartitionBoundary() { .hasExactlyArgs(SafeArg.of("startTimestamp", startTimestamp)); } - @Test - public void returnsEmptyIfSufficientTimeHasNotPassedSinceStartTimestamp() { - long startTimestamp = 18 * SweepQueueUtils.TS_COARSE_GRANULARITY; // Arbitrarily chosen. - when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis()); - OptionalLong maybeEndTimestamp = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp); - assertThat(maybeEndTimestamp).isEmpty(); - } - @Test public void returnsLogicalTimestampSufficientTimeAfterStartTimestampIfTenMinutesHasPassedAndLogicalTimestampAheadOfStart() { @@ -90,11 +82,14 @@ public void returnsEmptyIfSufficientTimeHasNotPassedSinceStartTimestamp() { @ParameterizedTest @ValueSource( - longs = {2300 * SweepQueueUtils.TS_COARSE_GRANULARITY, 2315 * SweepQueueUtils.TS_COARSE_GRANULARITY - }) // less than, and equal to. - // This is to test what happens when the puncher store returns a timestamp less than (or equal to) the start - // timestamp - // In both of these cases, we should not use the punch table result, but instead fallback to the relevant algorithm. + longs = { + 2300 * SweepQueueUtils.TS_COARSE_GRANULARITY, + 2315 * SweepQueueUtils.TS_COARSE_GRANULARITY, + 2315 * SweepQueueUtils.TS_COARSE_GRANULARITY + 1 + }) // less than, equal to, and insufficiently greater than + // This is to test what happens when the puncher store returns a timestamp less than / equal / insufficiently + // greater than the start timestamp + // In all of these cases, we should not use the punch table result, but instead fallback to the relevant algorithm. public void returnsEmptyIfSufficientTimeHasPassedPuncherTimestampBeforeStartAndLatestFreshTimestampNotFarEnoughAhead( long puncherTimestamp) { @@ -110,7 +105,12 @@ public void returnsEmptyIfSufficientTimeHasNotPassedSinceStartTimestamp() { } @ParameterizedTest - @ValueSource(longs = {123 * SweepQueueUtils.TS_COARSE_GRANULARITY, 312 * SweepQueueUtils.TS_COARSE_GRANULARITY}) + @ValueSource( + longs = { + 123 * SweepQueueUtils.TS_COARSE_GRANULARITY, + 312 * SweepQueueUtils.TS_COARSE_GRANULARITY, + 312 * SweepQueueUtils.TS_COARSE_GRANULARITY + 1 + }) public void returnsLatestClampedFreshTimestampIfSufficientTimeHasPassedPuncherTimestampBeforeStartAndCalculatedTimestampFarEnoughAhead( long puncherTimestamp) { @@ -126,7 +126,12 @@ public void returnsEmptyIfSufficientTimeHasNotPassedSinceStartTimestamp() { } @ParameterizedTest - @ValueSource(longs = {98 * SweepQueueUtils.TS_COARSE_GRANULARITY, 100 * SweepQueueUtils.TS_COARSE_GRANULARITY}) + @ValueSource( + longs = { + 98 * SweepQueueUtils.TS_COARSE_GRANULARITY, + 100 * SweepQueueUtils.TS_COARSE_GRANULARITY, + 100 * SweepQueueUtils.TS_COARSE_GRANULARITY + 1 + }) public void returnsClampedAndCappedTimestampIfPuncherTimestampBeforeStartAndLatestFreshTimestampIsTooFarAhead( long puncherTimestamp) { long startTimestamp = 100 * SweepQueueUtils.TS_COARSE_GRANULARITY;