Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[ASTS] Fix: ensure forward progress even if a single window does not …
Browse files Browse the repository at this point in the history
…have enough timestamps
  • Loading branch information
mdaudali committed Nov 12, 2024
1 parent 39c2be9 commit dbeb7ab
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,16 @@ public OptionalLong getBucketCloseTimestamp(long startTimestamp) {
}

long finalLogicalTimestamp = puncherStore.get(closeWallClockTimeMillis);
long clampedTimestamp = clampTimestampToCoarsePartitionBoundary(finalLogicalTimestamp);

// if this case happens, it's possibly clockdrift or a delayed write.
if (finalLogicalTimestamp <= startTimestamp) {
// if this case happens, it's possibly clockdrift or a delayed write, or we just make too slow progress
// TODO(mdaudali): Re-evaluate if the lower bound of 10_000_000 timestamps per 10 mins makes sense.
// This has the following interesting case: Suppose that the assigner stopped for a day. Then, suppose that the
// within the following 10 minute period from where the assigner currently is looking at, there is _not_ 10 mil
// timestamps. Then, instead of choosing a 10 million sized block, we have a giant block up to the fresh Ts.
// This is capped at 5 billion, and we were okay with this case when there's clock drift, but we should
// evaluate.
if (clampedTimestamp - startTimestamp < MIN_BUCKET_SIZE) {
long freshClampedTimestamp = clampTimestampToCoarsePartitionBoundary(freshTimestampSupplier.get());
if (freshClampedTimestamp - startTimestamp < MIN_BUCKET_SIZE) {
logNonPuncherClose(
Expand Down Expand Up @@ -127,16 +134,6 @@ public OptionalLong getBucketCloseTimestamp(long startTimestamp) {
}
return OptionalLong.of(cappedTimestamp);
} else {
long clampedTimestamp = clampTimestampToCoarsePartitionBoundary(finalLogicalTimestamp);
if (clampedTimestamp - startTimestamp < MIN_BUCKET_SIZE) {
log.info(
"The proposed final timestamp {} is not sufficiently far from start timestamp {} (minimum"
+ " size: {}) to close the bucket",
SafeArg.of("proposedClampedTimestamp", clampedTimestamp),
SafeArg.of("startTimestamp", startTimestamp),
SafeArg.of("minimumSize", MIN_BUCKET_SIZE));
return OptionalLong.empty();
}
return OptionalLong.of(clampedTimestamp);
}
}
Expand All @@ -162,10 +159,10 @@ private void logNonPuncherClose(
.addAll(Arrays.asList(additionalArgs))
.build();
log.info(
"Read a logical timestamp {} from the puncher store that's less than or equal to the start timestamp"
+ " {}, despite requesting a time {} after the start timestamp's associated wall clock time {}."
+ " This is likely due to some form of clock drift, but should not be happening repeatedly. We"
+ " read a fresh timestamp that has been clamped down to the nearest coarse"
"Read a logical timestamp {} from the puncher store that's not sufficiently far from the start"
+ " timestamp {}, despite requesting a time {} after the start timestamp's associated wall"
+ " clock time {}. This is likely due to some form of clock drift, but should not be happening"
+ " repeatedly. We read a fresh timestamp that has been clamped down to the nearest coarse"
+ " partition boundary {}, " + logMessageSuffix,
args);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@ public void throwsIfStartTimestampNotOnCoarsePartitionBoundary() {
.hasExactlyArgs(SafeArg.of("startTimestamp", startTimestamp));
}

@Test
public void returnsEmptyIfSufficientTimeHasNotPassedSinceStartTimestamp() {
long startTimestamp = 18 * SweepQueueUtils.TS_COARSE_GRANULARITY; // Arbitrarily chosen.
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
OptionalLong maybeEndTimestamp = bucketCloseTimestampCalculator.getBucketCloseTimestamp(startTimestamp);
assertThat(maybeEndTimestamp).isEmpty();
}

@Test
public void
returnsLogicalTimestampSufficientTimeAfterStartTimestampIfTenMinutesHasPassedAndLogicalTimestampAheadOfStart() {
Expand All @@ -90,13 +82,16 @@ public void returnsEmptyIfSufficientTimeHasNotPassedSinceStartTimestamp() {

@ParameterizedTest
@ValueSource(
longs = {2300 * SweepQueueUtils.TS_COARSE_GRANULARITY, 2315 * SweepQueueUtils.TS_COARSE_GRANULARITY
}) // less than, and equal to.
// This is to test what happens when the puncher store returns a timestamp less than (or equal to) the start
// timestamp
// In both of these cases, we should not use the punch table result, but instead fallback to the relevant algorithm.
longs = {
2300 * SweepQueueUtils.TS_COARSE_GRANULARITY,
2315 * SweepQueueUtils.TS_COARSE_GRANULARITY,
2315 * SweepQueueUtils.TS_COARSE_GRANULARITY + 1
}) // less than, equal to, and insufficiently greater than
// This is to test what happens when the puncher store returns a timestamp less than / equal / insufficiently
// greater than the start timestamp
// In all of these cases, we should not use the punch table result, but instead fallback to the relevant algorithm.
public void
returnsEmptyIfSufficientTimeHasPassedPuncherTimestampBeforeStartAndLatestFreshTimestampNotFarEnoughAhead(
returnsEmptyIfSufficientTimeHasPassedPuncherTimestampInsufficientlyFarAndLatestFreshTimestampNotFarEnoughAhead(
long puncherTimestamp) {
long startTimestamp = 2315 * SweepQueueUtils.TS_COARSE_GRANULARITY;
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
Expand All @@ -110,9 +105,14 @@ public void returnsEmptyIfSufficientTimeHasNotPassedSinceStartTimestamp() {
}

@ParameterizedTest
@ValueSource(longs = {123 * SweepQueueUtils.TS_COARSE_GRANULARITY, 312 * SweepQueueUtils.TS_COARSE_GRANULARITY})
@ValueSource(
longs = {
123 * SweepQueueUtils.TS_COARSE_GRANULARITY,
312 * SweepQueueUtils.TS_COARSE_GRANULARITY,
312 * SweepQueueUtils.TS_COARSE_GRANULARITY + 1
})
public void
returnsLatestClampedFreshTimestampIfSufficientTimeHasPassedPuncherTimestampBeforeStartAndCalculatedTimestampFarEnoughAhead(
returnsLatestClampedFreshTimestampIfSufficientTimeHasPassedPuncherTimestampInsufficientlyFarAndCalculatedTimestampFarEnoughAhead(
long puncherTimestamp) {
long startTimestamp = 312 * SweepQueueUtils.TS_COARSE_GRANULARITY;
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
Expand All @@ -126,8 +126,13 @@ public void returnsEmptyIfSufficientTimeHasNotPassedSinceStartTimestamp() {
}

@ParameterizedTest
@ValueSource(longs = {98 * SweepQueueUtils.TS_COARSE_GRANULARITY, 100 * SweepQueueUtils.TS_COARSE_GRANULARITY})
public void returnsClampedAndCappedTimestampIfPuncherTimestampBeforeStartAndLatestFreshTimestampIsTooFarAhead(
@ValueSource(
longs = {
98 * SweepQueueUtils.TS_COARSE_GRANULARITY,
100 * SweepQueueUtils.TS_COARSE_GRANULARITY,
100 * SweepQueueUtils.TS_COARSE_GRANULARITY + 1
})
public void returnsClampedAndCappedTimestampIfPuncherTimestampInsufficientlyFarAndLatestFreshTimestampIsTooFarAhead(
long puncherTimestamp) {
long startTimestamp = 100 * SweepQueueUtils.TS_COARSE_GRANULARITY;
when(puncherStore.getMillisForTimestamp(startTimestamp)).thenReturn(clock.millis());
Expand Down

0 comments on commit dbeb7ab

Please sign in to comment.