From 26515f997bf61723bf939ba41f53977068b11c03 Mon Sep 17 00:00:00 2001 From: Mohammed Daudali Date: Tue, 12 Nov 2024 14:52:58 +0000 Subject: [PATCH 1/3] [ASTS] Fix: Update getTimestampRangeRecord to return empty when record not present --- .../asts/DefaultShardProgressUpdater.java | 20 +++++++++---------- .../DefaultSweepAssignedBucketStore.java | 6 ++---- .../SweepBucketRecordsTable.java | 7 ++++--- .../asts/DefaultShardProgressUpdaterTest.java | 20 +++++++++---------- ...ctDefaultSweepAssignedBucketStoreTest.java | 15 +++++--------- 5 files changed, 29 insertions(+), 39 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdater.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdater.java index cc078753f3..959248862f 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdater.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdater.java @@ -29,7 +29,6 @@ import com.palantir.logsafe.exceptions.SafeIllegalStateException; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; -import java.util.NoSuchElementException; import java.util.Optional; import java.util.Set; import org.immutables.value.Value; @@ -120,17 +119,16 @@ private BucketProbeResult findCompletedBuckets(ShardAndStrategy shardAndStrategy throw new SafeIllegalStateException("Didn't expect to get here"); } + // TODO(mdaudali): This method is still incorrect (a record does not exist for an open bucket, not just pre-init + // bucket 0). A follow up PR will address this. private TimestampRange getTimestampRangeRecord(long queriedBucket) { - try { - return recordsTable.getTimestampRangeRecord(queriedBucket); - } catch (NoSuchElementException exception) { - throw new SafeIllegalStateException( - "Timestamp range record not found. If this has happened for bucket 0, this is possible when" - + " autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of a" - + " bug in auto-scaling sweep. In either case, we will retry.", - exception, - SafeArg.of("queriedBucket", queriedBucket)); - } + return recordsTable + .getTimestampRangeRecord(queriedBucket) + .orElseThrow(() -> new SafeIllegalStateException( + "Timestamp range record not found. If this has happened for bucket 0, this is possible when" + + " autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of" + + " a bug in auto-scaling sweep. In either case, we will retry.", + SafeArg.of("queriedBucket", queriedBucket))); } private long getStrictUpperBoundForSweptBuckets(ShardAndStrategy shardAndStrategy) { diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java index 3b494b5045..b8201daa01 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java @@ -37,7 +37,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -239,10 +238,9 @@ public void deleteBucketEntry(Bucket bucket) { } @Override - public TimestampRange getTimestampRangeRecord(long bucketIdentifier) { + public Optional getTimestampRangeRecord(long bucketIdentifier) { Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketRecordsCell(bucketIdentifier); - return readCell(cell, timestampRangePersister::tryDeserialize) - .orElseThrow(() -> new NoSuchElementException("No timestamp range record found for bucket identifier")); + return readCell(cell, timestampRangePersister::tryDeserialize); } @Override diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketRecordsTable.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketRecordsTable.java index 255eb1018e..1f5a6320f8 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketRecordsTable.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketRecordsTable.java @@ -17,13 +17,14 @@ package com.palantir.atlasdb.sweep.asts.bucketingthings; import com.palantir.atlasdb.sweep.asts.TimestampRange; +import java.util.Optional; public interface SweepBucketRecordsTable { /** - * Returns the {@link TimestampRange} for the given bucket identifier, throwing a - * {@link java.util.NoSuchElementException} if one is not present. + * Returns a {@link TimestampRange} for the given bucket identifier, if one exists. Iff a bucket is closed, then + * the corresponding record will be present. (If the bucket is open, no record will be present.) */ - TimestampRange getTimestampRangeRecord(long bucketIdentifier); + Optional getTimestampRangeRecord(long bucketIdentifier); void putTimestampRangeRecord(long bucketIdentifier, TimestampRange timestampRange); diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdaterTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdaterTest.java index 379b67370d..f82e4dd58f 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdaterTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdaterTest.java @@ -36,7 +36,6 @@ import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.exceptions.SafeIllegalStateException; import java.util.List; -import java.util.NoSuchElementException; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -74,19 +73,17 @@ public void setUp() { @ParameterizedTest @MethodSource("buckets") - public void wrapsAndRethrowsExceptionOnAbsenceOfTimestampRangeRecords(Bucket bucket) { + public void throwsExceptionOnAbsenceOfTimestampRangeRecords(Bucket bucket) { when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(bucket.shardAndStrategy()))) .thenReturn(ImmutableSet.of(bucket)); - NoSuchElementException underlyingException = new NoSuchElementException(); - when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenThrow(underlyingException); + when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenReturn(Optional.empty()); assertThatLoggableExceptionThrownBy(() -> shardProgressUpdater.updateProgress(bucket.shardAndStrategy())) .isInstanceOf(SafeIllegalStateException.class) .hasLogMessage("Timestamp range record not found. If this has happened for bucket 0, this is possible" + " when autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of" + " a bug in auto-scaling sweep. In either case, we will retry.") - .hasExactlyArgs(SafeArg.of("queriedBucket", bucket.bucketIdentifier())) - .hasCause(underlyingException); + .hasExactlyArgs(SafeArg.of("queriedBucket", bucket.bucketIdentifier())); verify(sweepBucketPointerTable, never()).updateStartingBucketForShardAndStrategy(bucket); verify(sweepQueueProgressUpdater, never()).progressTo(eq(bucket.shardAndStrategy()), anyLong()); @@ -100,8 +97,8 @@ public void doesNotUpdateProgressOnUnstartedBucket(Bucket bucket) { .thenReturn(ImmutableSet.of(bucket)); when(bucketProgressStore.getBucketProgress(bucket)).thenReturn(Optional.empty()); when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())) - .thenReturn(TimestampRange.of( - SweepQueueUtils.minTsForCoarsePartition(3), SweepQueueUtils.minTsForCoarsePartition(8))); + .thenReturn(Optional.of(TimestampRange.of( + SweepQueueUtils.minTsForCoarsePartition(3), SweepQueueUtils.minTsForCoarsePartition(8)))); shardProgressUpdater.updateProgress(bucket.shardAndStrategy()); @@ -120,7 +117,7 @@ public void updatesProgressOnStartedButNotCompletedBucket(SweepableBucket sweepa when(bucketProgressStore.getBucketProgress(bucket)) .thenReturn(Optional.of(BucketProgress.createForTimestampProgress(1_234_567L))); when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())) - .thenReturn(sweepableBucket.timestampRange()); + .thenReturn(Optional.of(sweepableBucket.timestampRange())); shardProgressUpdater.updateProgress(bucket.shardAndStrategy()); @@ -154,7 +151,8 @@ public void progressesPastOneOrMoreCompletedBucketsAndStopsCorrectly( TimestampRange finalBucketTimestampRange = TimestampRange.of( lastCompleteBucketTimestampRange.endExclusive(), lastCompleteBucketTimestampRange.endExclusive() + SweepQueueUtils.TS_COARSE_GRANULARITY); - when(recordsTable.getTimestampRangeRecord(finalBucketIdentifier)).thenReturn(finalBucketTimestampRange); + when(recordsTable.getTimestampRangeRecord(finalBucketIdentifier)) + .thenReturn(Optional.of(finalBucketTimestampRange)); shardProgressUpdater.updateProgress(firstRawBucket.shardAndStrategy()); @@ -204,7 +202,7 @@ private static List getSucceedingBuckets(SweepableBucket bucket private void setupBucketRecord(SweepableBucket sweepableBucket) { when(recordsTable.getTimestampRangeRecord(sweepableBucket.bucket().bucketIdentifier())) - .thenReturn(sweepableBucket.timestampRange()); + .thenReturn(Optional.of(sweepableBucket.timestampRange())); } static Stream buckets() { diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java index 96a66abcc8..7e1cfa2247 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java @@ -38,7 +38,6 @@ import com.palantir.logsafe.exceptions.SafeIllegalStateException; import java.util.HashSet; import java.util.List; -import java.util.NoSuchElementException; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -293,17 +292,15 @@ public void deleteBucketEntryDeletesBucket() { } @Test - public void getTimestampRangeRecordThrowsIfRecordNotPresent() { - assertThatThrownBy(() -> store.getTimestampRangeRecord(1)) - .isInstanceOf(NoSuchElementException.class) - .hasMessage("No timestamp range record found for bucket identifier"); + public void getTimestampRangeRecordReturnsEmptyIfRecordDoesNotExist() { + assertThat(store.getTimestampRangeRecord(1)).isEmpty(); } @Test public void putTimestampRangeRecordPutsRecord() { TimestampRange timestampRange = TimestampRange.of(1, 2); store.putTimestampRangeRecord(1, timestampRange); - assertThat(store.getTimestampRangeRecord(1)).isEqualTo(timestampRange); + assertThat(store.getTimestampRangeRecord(1)).hasValue(timestampRange); } @Test @@ -323,11 +320,9 @@ public void deleteTimestampRangeRecordDoesNotThrowIfRecordNotPresent() { public void deleteTimestampRangeRecordDeletesRecord() { TimestampRange timestampRange = TimestampRange.of(1, 2); store.putTimestampRangeRecord(1, timestampRange); - assertThat(store.getTimestampRangeRecord(1)).isEqualTo(timestampRange); + assertThat(store.getTimestampRangeRecord(1)).hasValue(timestampRange); store.deleteTimestampRangeRecord(1); - assertThatThrownBy(() -> store.getTimestampRangeRecord(1)) - .isInstanceOf(NoSuchElementException.class) - .hasMessage("No timestamp range record found for bucket identifier"); + assertThat(store.getTimestampRangeRecord(1)).isEmpty(); } } From b9e71cab40f4327f5d4f77acc7da47dbbdb9c9cb Mon Sep 17 00:00:00 2001 From: Mohammed Daudali Date: Tue, 12 Nov 2024 16:06:57 +0000 Subject: [PATCH 2/3] [ASTS] Add getSweepableBucket method --- .../DefaultSweepAssignedBucketStore.java | 11 +++++++++- .../SweepAssignedBucketStoreKeyPersister.java | 5 ++--- .../bucketingthings/SweepBucketsTable.java | 2 ++ ...epAssignedBucketStoreKeyPersisterTest.java | 16 +++++--------- ...ctDefaultSweepAssignedBucketStoreTest.java | 21 ++++++++++++++++--- 5 files changed, 37 insertions(+), 18 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java index b8201daa01..94e4b2ba84 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java @@ -221,6 +221,15 @@ public Set getSweepableBuckets(Set startBuckets) { return readSweepableBucketRows(rows); } + @Override + public Optional getSweepableBucket(Bucket bucket) { + Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketsCell(bucket); + return readCell( + cell, + value -> SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue( + cell, value, timestampRangePersister)); + } + @Override public void putTimestampRangeForBucket( Bucket bucket, Optional oldTimestampRange, TimestampRange newTimestampRange) { @@ -288,7 +297,7 @@ private Set readSweepableBucketRows(List SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue( - entry.getKey(), entry.getValue(), timestampRangePersister)) + entry.getKey(), entry.getValue().getContents(), timestampRangePersister)) .collect(Collectors.toSet()); } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersister.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersister.java index 6be4a50478..9897caf7df 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersister.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersister.java @@ -17,7 +17,6 @@ package com.palantir.atlasdb.sweep.asts.bucketingthings; import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.Value; import com.palantir.atlasdb.schema.generated.SweepAssignedBucketsTable.SweepAssignedBucketsColumn; import com.palantir.atlasdb.schema.generated.SweepAssignedBucketsTable.SweepAssignedBucketsRow; import com.palantir.atlasdb.sweep.asts.Bucket; @@ -76,11 +75,11 @@ SweepAssignedBucketsRow nextSweepBucketsRow(Bucket bucket) { } SweepableBucket fromSweepBucketCellAndValue( - Cell cell, Value value, ObjectPersister timestampRangePersister) { + Cell cell, byte[] value, ObjectPersister timestampRangePersister) { SweepAssignedBucketsRow row = SweepAssignedBucketsRow.BYTES_HYDRATOR.hydrateFromBytes(cell.getRowName()); SweepAssignedBucketsColumn column = SweepAssignedBucketsColumn.BYTES_HYDRATOR.hydrateFromBytes(cell.getColumnName()); - TimestampRange timestampRange = timestampRangePersister.tryDeserialize(value.getContents()); + TimestampRange timestampRange = timestampRangePersister.tryDeserialize(value); int shard = Math.toIntExact(row.getShard()); // throws if invalid shard return SweepableBucket.of( Bucket.of( diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java index a86f28b019..bc6da52cd3 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java @@ -29,6 +29,8 @@ public interface SweepBucketsTable { */ Set getSweepableBuckets(Set startBuckets); + Optional getSweepableBucket(Bucket bucket); + void putTimestampRangeForBucket( Bucket bucket, Optional oldTimestampRange, TimestampRange newTimestampRange); diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersisterTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersisterTest.java index 74ae8d8414..4dc285d72c 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersisterTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersisterTest.java @@ -20,7 +20,6 @@ import com.google.common.io.BaseEncoding; import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.Value; import com.palantir.atlasdb.schema.generated.SweepAssignedBucketsTable.SweepAssignedBucketsRow; import com.palantir.atlasdb.sweep.asts.Bucket; import com.palantir.atlasdb.sweep.asts.SweepableBucket; @@ -59,14 +58,11 @@ public final class SweepAssignedBucketStoreKeyPersisterTest { Bucket.of(ShardAndStrategy.nonSweepable(), Long.MAX_VALUE), fromBase64("PpljSwhiGMuA/4FHrhR64UeuAg==", "hw==")); - private static final Map GOLDEN_TIMESTAMP_RANGES = Map.of( + private static final Map GOLDEN_TIMESTAMP_RANGES = Map.of( TimestampRange.of(912301923, Long.MAX_VALUE), - Value.create( - BaseEncoding.base64() - .decode("OikKBfqLZW5kRXhjbHVzaXZlJQN/f39/f39/f76Nc3RhcnRJbmNsdXNpdmUkDUwJe4b7"), - -1), + BaseEncoding.base64().decode("OikKBfqLZW5kRXhjbHVzaXZlJQN/f39/f39/f76Nc3RhcnRJbmNsdXNpdmUkDUwJe4b7"), TimestampRange.openBucket(13123), - Value.create(BaseEncoding.base64().decode("OikKBfqLZW5kRXhjbHVzaXZlwY1zdGFydEluY2x1c2l2ZSQDGob7"), -1)); + BaseEncoding.base64().decode("OikKBfqLZW5kRXhjbHVzaXZlwY1zdGFydEluY2x1c2l2ZSQDGob7")); private static final Cell GOLDEN_SWEEP_BUCKET_ASSIGNER_STATE_MACHINE_CELL = Cell.create( BaseEncoding.base64().decode("GTH5RX8BW6N/f/8="), @@ -127,9 +123,7 @@ public void nextSweepBucketsRowGetsNextRowByMajorBucketIdentifier(Bucket bucket) @ParameterizedTest @MethodSource("sweepableBuckets") public void canDeserializeCellsAndValuesBackToSweepableBucket(SweepableBucket sweepableBucket) { - Value value = Value.create( - TIMESTAMP_RANGE_PERSISTER.trySerialize(sweepableBucket.timestampRange()), - -1); // Timestamp does not matter + byte[] value = TIMESTAMP_RANGE_PERSISTER.trySerialize(sweepableBucket.timestampRange()); Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketsCell(sweepableBucket.bucket()); SweepableBucket deserialisedSweepableBucket = SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue( @@ -141,7 +135,7 @@ public void canDeserializeCellsAndValuesBackToSweepableBucket(SweepableBucket sw @ParameterizedTest @MethodSource("goldenSweepBucketCellsAndValues") public void canDeserializeHistoricCellsAndValuesBackToSweepableBucket( - SweepableBucket sweepableBucket, Cell cell, Value value) { + SweepableBucket sweepableBucket, Cell cell, byte[] value) { SweepableBucket deserialisedSweepableBucket = SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue( cell, value, TIMESTAMP_RANGE_PERSISTER); diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java index 7e1cfa2247..958d1d462a 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java @@ -255,6 +255,20 @@ public void getSweepableBucketsReturnsBucketsPerShardAndStrategy() { assertThat(store.getSweepableBuckets(new HashSet<>(startBuckets))).containsExactlyInAnyOrderElementsOf(buckets); } + @Test + public void getSweepableBucketReturnsSweepableBucketIfExists() { + Bucket bucket = Bucket.of(ShardAndStrategy.of(12, SweeperStrategy.THOROUGH), 21); + TimestampRange range = TimestampRange.of(1, 4); + store.putTimestampRangeForBucket(bucket, Optional.empty(), range); + assertThat(store.getSweepableBucket(bucket)).contains(SweepableBucket.of(bucket, range)); + } + + @Test + public void getSweepableBucketReturnsEmptyIfSweepableBucketDoesNotExist() { + Bucket bucket = Bucket.of(ShardAndStrategy.of(12, SweeperStrategy.THOROUGH), 21); + assertThat(store.getSweepableBucket(bucket)).isEmpty(); + } + @Test public void putTimestampRangeForBucketFailsIfOldTimestampRangeDoesNotMatchCurrent() { Bucket bucket = Bucket.of(ShardAndStrategy.of(12, SweeperStrategy.THOROUGH), 512); @@ -272,8 +286,7 @@ public void putTimestampRangeForBucketSucceedsIfOldTimestampRangeMatchesCurrent( TimestampRange newTimestampRange = TimestampRange.of(1, 2); store.putTimestampRangeForBucket(bucket, Optional.empty(), newTimestampRange); - Set sweepableBuckets = store.getSweepableBuckets(Set.of(bucket)); - assertThat(sweepableBuckets).containsExactly(SweepableBucket.of(bucket, newTimestampRange)); + assertThat(store.getSweepableBucket(bucket)).contains(SweepableBucket.of(bucket, newTimestampRange)); } @Test @@ -287,8 +300,10 @@ public void deleteBucketEntryDeletesBucket() { Bucket bucket = Bucket.of(ShardAndStrategy.of(12, SweeperStrategy.THOROUGH), 512); TimestampRange timestampRange = TimestampRange.of(1, 2); store.putTimestampRangeForBucket(bucket, Optional.empty(), timestampRange); + assertThat(store.getSweepableBucket(bucket)).hasValue(SweepableBucket.of(bucket, timestampRange)); + store.deleteBucketEntry(bucket); - assertThat(store.getSweepableBuckets(Set.of(bucket))).isEmpty(); + assertThat(store.getSweepableBucket(bucket)).isEmpty(); } @Test From 9b2e1988a63a15af8db236180a5241b56cd8a7e0 Mon Sep 17 00:00:00 2001 From: Mohammed Daudali Date: Tue, 12 Nov 2024 18:12:02 +0000 Subject: [PATCH 3/3] [ASTS] Background Shard Progress Updater now correctly calculates last swept progress --- .../asts/BucketBasedTargetedSweeper.java | 1 + .../asts/DefaultShardProgressUpdater.java | 148 +++++++++++----- .../asts/DefaultShardProgressUpdaterTest.java | 164 +++++++++++++++--- 3 files changed, 241 insertions(+), 72 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/BucketBasedTargetedSweeper.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/BucketBasedTargetedSweeper.java index 0bb0af2c25..14f8a5bee6 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/BucketBasedTargetedSweeper.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/BucketBasedTargetedSweeper.java @@ -149,6 +149,7 @@ private static ShardProgressUpdater createShardProgressUpdater( bucketProgressStore, new SweepQueueProgressUpdater(cleaner), sweepAssignedBucketStore, + sweepAssignedBucketStore, sweepAssignedBucketStore); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdater.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdater.java index 959248862f..1f5ad9cdcd 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdater.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdater.java @@ -21,12 +21,14 @@ import com.google.common.collect.Iterables; import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketPointerTable; import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketRecordsTable; +import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketsTable; import com.palantir.atlasdb.sweep.asts.progress.BucketProgress; import com.palantir.atlasdb.sweep.asts.progress.BucketProgressStore; import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; import com.palantir.atlasdb.sweep.queue.SweepQueueProgressUpdater; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.exceptions.SafeIllegalStateException; +import com.palantir.logsafe.exceptions.SafeRuntimeException; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; import java.util.Optional; @@ -42,16 +44,19 @@ public class DefaultShardProgressUpdater implements ShardProgressUpdater { private final BucketProgressStore bucketProgressStore; private final SweepQueueProgressUpdater sweepQueueProgressUpdater; private final SweepBucketRecordsTable recordsTable; + private final SweepBucketsTable sweepBucketsTable; private final SweepBucketPointerTable sweepBucketPointerTable; public DefaultShardProgressUpdater( BucketProgressStore bucketProgressStore, SweepQueueProgressUpdater sweepQueueProgressUpdater, SweepBucketRecordsTable recordsTable, + SweepBucketsTable sweepBucketsTable, SweepBucketPointerTable sweepBucketPointerTable) { this.bucketProgressStore = bucketProgressStore; this.sweepQueueProgressUpdater = sweepQueueProgressUpdater; this.recordsTable = recordsTable; + this.sweepBucketsTable = sweepBucketsTable; this.sweepBucketPointerTable = sweepBucketPointerTable; } @@ -61,16 +66,9 @@ public void updateProgress(ShardAndStrategy shardAndStrategy) { BucketProbeResult bucketProbeResult = findCompletedBuckets(shardAndStrategy, bucketPointer); // This order of clearing the metadata is intentional: - // (1) if bucket progress is deleted but the pointer is not updated, we might sweep the relevant buckets - // again, but that is acceptable because sweepable cells and timestamps were already cleared, and - // these tables are not accessed via row range scans, so the number of tombstones we read will be - // reasonably bounded. - // (2) if the pointer is updated but progress is not, we will update progress to the right value on the - // next iteration (notice that we only use the pointer, and not the existing progress, to track where - // we are in the timeline). - for (long bucket = bucketPointer; bucket < bucketProbeResult.endExclusive(); bucket++) { - bucketProgressStore.deleteBucketProgress(Bucket.of(shardAndStrategy, bucket)); - } + // if the pointer is updated but progress is not, we will update progress to the right value on the + // next iteration (notice that we only use the pointer, and not the existing progress, to track where + // we are in the timeline). sweepBucketPointerTable.updateStartingBucketForShardAndStrategy( Bucket.of(shardAndStrategy, bucketProbeResult.endExclusive())); sweepQueueProgressUpdater.progressTo(shardAndStrategy, bucketProbeResult.knownSweepProgress()); @@ -82,53 +80,115 @@ public void updateProgress(ShardAndStrategy shardAndStrategy) { * if this is not the case, behaviour is undefined. */ private BucketProbeResult findCompletedBuckets(ShardAndStrategy shardAndStrategy, long searchStart) { + long lastEndExclusiveForCompleteBucket = -1; for (long offset = 0; offset < MAX_BUCKETS_TO_CHECK_PER_ITERATION; offset++) { long currentBucket = searchStart + offset; Optional bucketProgress = bucketProgressStore.getBucketProgress(Bucket.of(shardAndStrategy, currentBucket)); - if (bucketProgress.isPresent()) { - BucketProgress presentBucketProgress = bucketProgress.get(); - TimestampRange requiredRange = getTimestampRangeRecord(currentBucket); - if (presentBucketProgress.timestampProgress() - != requiredRange.endExclusive() - requiredRange.startInclusive() - 1) { - // Bucket still has progress to go, so we can stop here. + Optional record = recordsTable.getTimestampRangeRecord(currentBucket); + Optional writtenSweepableBucket = + sweepBucketsTable.getSweepableBucket(Bucket.of(shardAndStrategy, currentBucket)); + + if (record.isPresent()) { // bucket has to have been closed + Optional definitiveProbeResult = + getProbeResultForPotentiallyPartiallyCompleteClosedBucket( + record.orElseThrow(), bucketProgress, writtenSweepableBucket, currentBucket); + if (definitiveProbeResult.isPresent()) { + return definitiveProbeResult.get(); + } else { + lastEndExclusiveForCompleteBucket = record.get().endExclusive(); + } + } else { + // No record; we're possibly in an open bucket, or not created yet. + + if (writtenSweepableBucket.isEmpty()) { + // there's no open bucket. This should be rare - it'll happen if the bucket assigner failed to open + // the next bucket after closing the last, of it the bucket assigner is behind and hit the cap + // creating closed buckets. + + if (lastEndExclusiveForCompleteBucket == -1) { + throw new SafeRuntimeException( + "Failed to update shard progress as there are no buckets to" + + " check sweep's progress. This should be rare and transient. If this error" + + " occurs for more than 15 minutes and there are no new buckets being created," + + " it is likely that there is something preventing the bucket assigner" + + " from progressing.", + SafeArg.of("shardAndStrategy", shardAndStrategy), + SafeArg.of("currentBucket", currentBucket)); + } + return BucketProbeResult.builder() .endExclusive(currentBucket) - .knownSweepProgress( - requiredRange.startInclusive() + presentBucketProgress.timestampProgress()) + .knownSweepProgress(lastEndExclusiveForCompleteBucket - 1) .build(); - } else { - // Bucket fully processed, keep going. - if (offset == MAX_BUCKETS_TO_CHECK_PER_ITERATION - 1) { - // We finished the maximum number of buckets to check, and all were completed. - return BucketProbeResult.builder() - .endExclusive(currentBucket + 1) - .knownSweepProgress(requiredRange.endExclusive() + 1) - .build(); - } } - } else { - // No progress; we're ahead of the read pointer, so interpret as unstarted. - return BucketProbeResult.builder() + SweepableBucket bucket = writtenSweepableBucket.orElseThrow(); + return getProbeResultForOpenBucket(bucketProgress, bucket, currentBucket); + } + } + + if (lastEndExclusiveForCompleteBucket == -1) { + throw new SafeIllegalStateException("Didn't expect to get here"); + } + return BucketProbeResult.builder() + .endExclusive(searchStart + MAX_BUCKETS_TO_CHECK_PER_ITERATION) + .knownSweepProgress(lastEndExclusiveForCompleteBucket - 1) + .build(); + } + + private Optional getProbeResultForPotentiallyPartiallyCompleteClosedBucket( + TimestampRange presentRecord, + Optional bucketProgress, + Optional writtenSweepableBucket, + long currentBucket) { + // If there's progress, and it's not at the end, then it's incomplete. + // If there's progress, and it's at the end, it's finished + // If there's no progress and the sweepable bucket is present, then it's not started + // If there's no progress and the sweepable bucket is not present, then it's finished + // (all assuming the record is present, since the record is written at the end) + if (bucketProgress.isPresent()) { + BucketProgress presentBucketProgress = bucketProgress.get(); + if (presentBucketProgress.timestampProgress() + != presentRecord.endExclusive() - presentRecord.startInclusive() - 1) { + // Progress, but not at the end + return Optional.of(BucketProbeResult.builder() .endExclusive(currentBucket) - .knownSweepProgress( - getTimestampRangeRecord(currentBucket).startInclusive() - 1L) - .build(); + .knownSweepProgress(presentRecord.startInclusive() + presentBucketProgress.timestampProgress()) + .build()); + } else { + // progress and it's at the end (perhaps we caught the foreground task between updating + /// progress, and updating the record) + return Optional.empty(); } + } else if (writtenSweepableBucket.isPresent()) { + // no progress, record present _and_ sweepable bucket entry present implies we're unstarted + return Optional.of(BucketProbeResult.builder() + .endExclusive(currentBucket) + .knownSweepProgress(presentRecord.startInclusive() - 1L) + .build()); + } else { + // no progress and the sweepable bucket is not present, so it's finished. + return Optional.empty(); } - throw new SafeIllegalStateException("Didn't expect to get here"); } - // TODO(mdaudali): This method is still incorrect (a record does not exist for an open bucket, not just pre-init - // bucket 0). A follow up PR will address this. - private TimestampRange getTimestampRangeRecord(long queriedBucket) { - return recordsTable - .getTimestampRangeRecord(queriedBucket) - .orElseThrow(() -> new SafeIllegalStateException( - "Timestamp range record not found. If this has happened for bucket 0, this is possible when" - + " autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of" - + " a bug in auto-scaling sweep. In either case, we will retry.", - SafeArg.of("queriedBucket", queriedBucket))); + private BucketProbeResult getProbeResultForOpenBucket( + Optional bucketProgress, SweepableBucket bucket, long currentBucket) { + // No progress, then we haven't started yet. + if (bucketProgress.isEmpty()) { + return BucketProbeResult.builder() + .endExclusive(currentBucket) + .knownSweepProgress(bucket.timestampRange().startInclusive() - 1L) + .build(); + } else { + // Progress in the open bucket! + BucketProgress presentBucketProgress = bucketProgress.get(); + return BucketProbeResult.builder() + .endExclusive(currentBucket) + .knownSweepProgress( + bucket.timestampRange().startInclusive() + presentBucketProgress.timestampProgress()) + .build(); + } } private long getStrictUpperBoundForSweptBuckets(ShardAndStrategy shardAndStrategy) { diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdaterTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdaterTest.java index f82e4dd58f..e5941c9a17 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdaterTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdaterTest.java @@ -16,25 +16,23 @@ package com.palantir.atlasdb.sweep.asts; -import static com.palantir.logsafe.testing.Assertions.assertThatLoggableExceptionThrownBy; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketPointerTable; import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketRecordsTable; +import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketsTable; import com.palantir.atlasdb.sweep.asts.progress.BucketProgress; import com.palantir.atlasdb.sweep.asts.progress.BucketProgressStore; import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; import com.palantir.atlasdb.sweep.queue.SweepQueueProgressUpdater; import com.palantir.atlasdb.sweep.queue.SweepQueueUtils; -import com.palantir.logsafe.SafeArg; -import com.palantir.logsafe.exceptions.SafeIllegalStateException; import java.util.List; import java.util.Optional; import java.util.Set; @@ -42,6 +40,7 @@ import java.util.stream.LongStream; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -60,6 +59,9 @@ public class DefaultShardProgressUpdaterTest { @Mock private SweepBucketRecordsTable recordsTable; + @Mock + private SweepBucketsTable sweepBucketsTable; + @Mock private SweepBucketPointerTable sweepBucketPointerTable; @@ -68,37 +70,45 @@ public class DefaultShardProgressUpdaterTest { @BeforeEach public void setUp() { shardProgressUpdater = new DefaultShardProgressUpdater( - bucketProgressStore, sweepQueueProgressUpdater, recordsTable, sweepBucketPointerTable); + bucketProgressStore, + sweepQueueProgressUpdater, + recordsTable, + sweepBucketsTable, + sweepBucketPointerTable); } @ParameterizedTest @MethodSource("buckets") - public void throwsExceptionOnAbsenceOfTimestampRangeRecords(Bucket bucket) { + public void doesNotUpdateProgressOnUnstartedOpenBucket(Bucket bucket) { when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(bucket.shardAndStrategy()))) .thenReturn(ImmutableSet.of(bucket)); + when(bucketProgressStore.getBucketProgress(bucket)).thenReturn(Optional.empty()); when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenReturn(Optional.empty()); - assertThatLoggableExceptionThrownBy(() -> shardProgressUpdater.updateProgress(bucket.shardAndStrategy())) - .isInstanceOf(SafeIllegalStateException.class) - .hasLogMessage("Timestamp range record not found. If this has happened for bucket 0, this is possible" - + " when autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of" - + " a bug in auto-scaling sweep. In either case, we will retry.") - .hasExactlyArgs(SafeArg.of("queriedBucket", bucket.bucketIdentifier())); + TimestampRange timestampRange = TimestampRange.openBucket(SweepQueueUtils.minTsForCoarsePartition(3)); + when(sweepBucketsTable.getSweepableBucket(bucket)) + .thenReturn(Optional.of(SweepableBucket.of(bucket, timestampRange))); + + shardProgressUpdater.updateProgress(bucket.shardAndStrategy()); - verify(sweepBucketPointerTable, never()).updateStartingBucketForShardAndStrategy(bucket); - verify(sweepQueueProgressUpdater, never()).progressTo(eq(bucket.shardAndStrategy()), anyLong()); + verify(sweepBucketPointerTable).updateStartingBucketForShardAndStrategy(bucket); + verify(sweepQueueProgressUpdater) + .progressTo(bucket.shardAndStrategy(), SweepQueueUtils.minTsForCoarsePartition(3) - 1L); verify(bucketProgressStore, never()).deleteBucketProgress(any()); } @ParameterizedTest @MethodSource("buckets") - public void doesNotUpdateProgressOnUnstartedBucket(Bucket bucket) { + public void doesNotUpdateProgressOnUnstartedClosedBucket(Bucket bucket) { when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(bucket.shardAndStrategy()))) .thenReturn(ImmutableSet.of(bucket)); when(bucketProgressStore.getBucketProgress(bucket)).thenReturn(Optional.empty()); - when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())) - .thenReturn(Optional.of(TimestampRange.of( - SweepQueueUtils.minTsForCoarsePartition(3), SweepQueueUtils.minTsForCoarsePartition(8)))); + TimestampRange timestampRange = TimestampRange.of( + SweepQueueUtils.minTsForCoarsePartition(3), SweepQueueUtils.minTsForCoarsePartition(8)); + when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenReturn(Optional.of(timestampRange)); + + when(sweepBucketsTable.getSweepableBucket(bucket)) + .thenReturn(Optional.of(SweepableBucket.of(bucket, timestampRange))); shardProgressUpdater.updateProgress(bucket.shardAndStrategy()); @@ -108,9 +118,46 @@ public void doesNotUpdateProgressOnUnstartedBucket(Bucket bucket) { verify(bucketProgressStore, never()).deleteBucketProgress(any()); } + @ParameterizedTest + @MethodSource("buckets") + public void throwsIfOpenBucketHasNoBucketEntryAndNoBucketsToReadPrior(Bucket bucket) { + when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(bucket.shardAndStrategy()))) + .thenReturn(ImmutableSet.of(bucket)); + when(bucketProgressStore.getBucketProgress(bucket)).thenReturn(Optional.empty()); + when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenReturn(Optional.empty()); + when(sweepBucketsTable.getSweepableBucket(bucket)).thenReturn(Optional.empty()); + + assertThatThrownBy(() -> shardProgressUpdater.updateProgress(bucket.shardAndStrategy())); + } + + @ParameterizedTest + @MethodSource("sweepableBuckets") + public void updatesProgressOnStartedButNotCompletedOpenBucket(SweepableBucket sweepableBucket) { + Bucket bucket = sweepableBucket.bucket(); + when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(bucket.shardAndStrategy()))) + .thenReturn(ImmutableSet.of(bucket)); + when(bucketProgressStore.getBucketProgress(bucket)) + .thenReturn(Optional.of(BucketProgress.createForTimestampProgress(1_234_567L))); + when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenReturn(Optional.empty()); + when(sweepBucketsTable.getSweepableBucket(bucket)) + .thenReturn(Optional.of(SweepableBucket.of( + bucket, + TimestampRange.openBucket( + sweepableBucket.timestampRange().startInclusive())))); + + shardProgressUpdater.updateProgress(bucket.shardAndStrategy()); + + verify(sweepBucketPointerTable).updateStartingBucketForShardAndStrategy(bucket); + verify(sweepQueueProgressUpdater) + .progressTo( + bucket.shardAndStrategy(), + sweepableBucket.timestampRange().startInclusive() + 1_234_567L); + verify(bucketProgressStore, never()).deleteBucketProgress(any()); + } + @ParameterizedTest @MethodSource("sweepableBuckets") - public void updatesProgressOnStartedButNotCompletedBucket(SweepableBucket sweepableBucket) { + public void updatesProgressOnStartedButNotCompletedClosedBucket(SweepableBucket sweepableBucket) { Bucket bucket = sweepableBucket.bucket(); when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(bucket.shardAndStrategy()))) .thenReturn(ImmutableSet.of(bucket)); @@ -118,6 +165,7 @@ public void updatesProgressOnStartedButNotCompletedBucket(SweepableBucket sweepa .thenReturn(Optional.of(BucketProgress.createForTimestampProgress(1_234_567L))); when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())) .thenReturn(Optional.of(sweepableBucket.timestampRange())); + when(sweepBucketsTable.getSweepableBucket(bucket)).thenReturn(Optional.of(sweepableBucket)); shardProgressUpdater.updateProgress(bucket.shardAndStrategy()); @@ -131,7 +179,7 @@ public void updatesProgressOnStartedButNotCompletedBucket(SweepableBucket sweepa @ParameterizedTest @MethodSource("bucketProbeParameters") - public void progressesPastOneOrMoreCompletedBucketsAndStopsCorrectly( + public void progressesPastOneOrMoreCompletedBucketsAndStopsCorrectlyForPresentLastBucket( SweepableBucket firstBucket, long numAdditionalCompletedBuckets, Optional progressOnFinalBucket) { @@ -153,7 +201,10 @@ public void progressesPastOneOrMoreCompletedBucketsAndStopsCorrectly( lastCompleteBucketTimestampRange.endExclusive() + SweepQueueUtils.TS_COARSE_GRANULARITY); when(recordsTable.getTimestampRangeRecord(finalBucketIdentifier)) .thenReturn(Optional.of(finalBucketTimestampRange)); - + when(sweepBucketsTable.getSweepableBucket(Bucket.of(firstRawBucket.shardAndStrategy(), finalBucketIdentifier))) + .thenReturn(Optional.of(SweepableBucket.of( + Bucket.of(firstRawBucket.shardAndStrategy(), finalBucketIdentifier), + finalBucketTimestampRange))); shardProgressUpdater.updateProgress(firstRawBucket.shardAndStrategy()); verify(sweepBucketPointerTable) @@ -166,13 +217,69 @@ public void progressesPastOneOrMoreCompletedBucketsAndStopsCorrectly( + progressOnFinalBucket .map(BucketProgress::timestampProgress) .orElse(-1L)); + } + + @Test + public void doesNotLoadPastMaxBucketsForIteration() { + Bucket firstRawBucket = Bucket.of(ShardAndStrategy.conservative(0), 0L); + SweepableBucket firstBucket = + SweepableBucket.of(firstRawBucket, TimestampRange.of(0L, SweepQueueUtils.minTsForCoarsePartition(8L))); + when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(firstRawBucket.shardAndStrategy()))) + .thenReturn(ImmutableSet.of(firstRawBucket)); + setupBucketAsComplete(firstBucket); + + List succeedingBuckets = + getSucceedingBuckets(firstBucket, DefaultShardProgressUpdater.MAX_BUCKETS_TO_CHECK_PER_ITERATION - 2); + + // We have MAX_BUCKETS + 1 - the last succeeding bucket should not be used for calculating the progress + // so we don't want to set any mocks based off it. If we do load it, we'll fail the test from the mock failing. + succeedingBuckets.subList(0, succeedingBuckets.size() - 1).forEach(this::setupBucketAsComplete); + + shardProgressUpdater.updateProgress(firstRawBucket.shardAndStrategy()); + + SweepableBucket lastLoadedBucket = succeedingBuckets.get(succeedingBuckets.size() - 2); + verify(sweepBucketPointerTable) + .updateStartingBucketForShardAndStrategy(Bucket.of( + firstRawBucket.shardAndStrategy(), + lastLoadedBucket.bucket().bucketIdentifier() + 1)); + verify(sweepQueueProgressUpdater) + .progressTo( + firstRawBucket.shardAndStrategy(), + lastLoadedBucket.timestampRange().endExclusive() - 1); + } + + @ParameterizedTest + @MethodSource("bucketProbeParameters") + public void progressesPastOneOrMoreCompletedBucketsAndStopsCorrectlyEvenWhenFinalBucketDoesNotExist( + SweepableBucket firstBucket, + long numAdditionalCompletedBuckets, + Optional _progressOnFinalBucket) { + Bucket firstRawBucket = firstBucket.bucket(); + when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(firstRawBucket.shardAndStrategy()))) + .thenReturn(ImmutableSet.of(firstRawBucket)); - for (long bucketIdentifier = firstBucket.bucket().bucketIdentifier(); - bucketIdentifier < finalBucketIdentifier; - bucketIdentifier++) { - verify(bucketProgressStore) - .deleteBucketProgress(Bucket.of(firstRawBucket.shardAndStrategy(), bucketIdentifier)); - } + List succeedingBuckets = getSucceedingBuckets(firstBucket, numAdditionalCompletedBuckets); + List allSuccessfulBuckets = ImmutableList.builder() + .add(firstBucket) + .addAll(succeedingBuckets.subList(0, succeedingBuckets.size() - 1)) + .build(); + allSuccessfulBuckets.forEach(this::setupBucketAsComplete); + + long finalBucketIdentifier = firstRawBucket.bucketIdentifier() + numAdditionalCompletedBuckets; + when(bucketProgressStore.getBucketProgress(Bucket.of(firstRawBucket.shardAndStrategy(), finalBucketIdentifier))) + .thenReturn(Optional.empty()); + TimestampRange lastCompleteBucketTimestampRange = + allSuccessfulBuckets.get(allSuccessfulBuckets.size() - 1).timestampRange(); + when(recordsTable.getTimestampRangeRecord(finalBucketIdentifier)).thenReturn(Optional.empty()); + when(sweepBucketsTable.getSweepableBucket(Bucket.of(firstRawBucket.shardAndStrategy(), finalBucketIdentifier))) + .thenReturn(Optional.empty()); + shardProgressUpdater.updateProgress(firstRawBucket.shardAndStrategy()); + + verify(sweepBucketPointerTable) + .updateStartingBucketForShardAndStrategy(Bucket.of( + firstRawBucket.shardAndStrategy(), finalBucketIdentifier)); // all buckets before are complete + verify(sweepQueueProgressUpdater) + .progressTo(firstRawBucket.shardAndStrategy(), lastCompleteBucketTimestampRange.endExclusive() - 1); } private void setupBucketAsComplete(SweepableBucket sweepableBucket) { @@ -182,6 +289,7 @@ private void setupBucketAsComplete(SweepableBucket sweepableBucket) { sweepableBucket.timestampRange().endExclusive() - sweepableBucket.timestampRange().startInclusive() - 1L))); + when(sweepBucketsTable.getSweepableBucket(sweepableBucket.bucket())).thenReturn(Optional.of(sweepableBucket)); } // Creates a list of sweepable buckets following the provided bucket, each with a range of TS_COARSE_GRANULARITY