From 008f0c6d2a8e29e4e8b85c9a91a7fe857bc8cc1b Mon Sep 17 00:00:00 2001 From: Mohammed Daudali Date: Tue, 5 Nov 2024 14:31:49 +0000 Subject: [PATCH] [DNM - RC ONLY][ASTS] Changes to background task --- .../asts/BucketBasedTargetedSweeper.java | 1 + .../asts/DefaultShardProgressUpdater.java | 115 ++++++++++++++---- .../asts/DefaultSingleBucketSweepTask.java | 9 ++ .../DefaultSweepAssignedBucketStore.java | 9 ++ .../bucketingthings/SweepBucketsTable.java | 2 + .../asts/DefaultShardProgressUpdaterTest.java | 101 +++++++++++---- ...ctDefaultSweepAssignedBucketStoreTest.java | 10 +- 7 files changed, 195 insertions(+), 52 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/BucketBasedTargetedSweeper.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/BucketBasedTargetedSweeper.java index 0bb0af2c25..14f8a5bee6 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/BucketBasedTargetedSweeper.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/BucketBasedTargetedSweeper.java @@ -149,6 +149,7 @@ private static ShardProgressUpdater createShardProgressUpdater( bucketProgressStore, new SweepQueueProgressUpdater(cleaner), sweepAssignedBucketStore, + sweepAssignedBucketStore, sweepAssignedBucketStore); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdater.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdater.java index cc078753f3..917f1d7832 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdater.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdater.java @@ -21,6 +21,7 @@ import com.google.common.collect.Iterables; import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketPointerTable; import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketRecordsTable; +import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketsTable; import com.palantir.atlasdb.sweep.asts.progress.BucketProgress; import com.palantir.atlasdb.sweep.asts.progress.BucketProgressStore; import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; @@ -43,23 +44,35 @@ public class DefaultShardProgressUpdater implements ShardProgressUpdater { private final BucketProgressStore bucketProgressStore; private final SweepQueueProgressUpdater sweepQueueProgressUpdater; private final SweepBucketRecordsTable recordsTable; + private final SweepBucketsTable sweepBucketsTable; private final SweepBucketPointerTable sweepBucketPointerTable; public DefaultShardProgressUpdater( BucketProgressStore bucketProgressStore, SweepQueueProgressUpdater sweepQueueProgressUpdater, SweepBucketRecordsTable recordsTable, + SweepBucketsTable sweepBucketsTable, SweepBucketPointerTable sweepBucketPointerTable) { this.bucketProgressStore = bucketProgressStore; this.sweepQueueProgressUpdater = sweepQueueProgressUpdater; this.recordsTable = recordsTable; + this.sweepBucketsTable = sweepBucketsTable; this.sweepBucketPointerTable = sweepBucketPointerTable; } @Override public void updateProgress(ShardAndStrategy shardAndStrategy) { long bucketPointer = getStrictUpperBoundForSweptBuckets(shardAndStrategy); + log.info( + "Bucket pointer", + SafeArg.of("bucketPointer", bucketPointer), + SafeArg.of("shardAndStrategy", shardAndStrategy)); BucketProbeResult bucketProbeResult = findCompletedBuckets(shardAndStrategy, bucketPointer); + log.info( + "Found completed buckets", + SafeArg.of("endExclusive", bucketProbeResult.endExclusive()), + SafeArg.of("shardAndStrategy", shardAndStrategy), + SafeArg.of("knownSweepProgress", bucketProbeResult.knownSweepProgress())); // This order of clearing the metadata is intentional: // (1) if bucket progress is deleted but the pointer is not updated, we might sweep the relevant buckets @@ -72,9 +85,12 @@ public void updateProgress(ShardAndStrategy shardAndStrategy) { for (long bucket = bucketPointer; bucket < bucketProbeResult.endExclusive(); bucket++) { bucketProgressStore.deleteBucketProgress(Bucket.of(shardAndStrategy, bucket)); } + log.info("Deleted bucket progress where necessary", SafeArg.of("shardAndStrategy", shardAndStrategy)); sweepBucketPointerTable.updateStartingBucketForShardAndStrategy( Bucket.of(shardAndStrategy, bucketProbeResult.endExclusive())); + log.info("Updated starting bucket", SafeArg.of("shardAndStrategy", shardAndStrategy)); sweepQueueProgressUpdater.progressTo(shardAndStrategy, bucketProbeResult.knownSweepProgress()); + log.info("recorded new progress", SafeArg.of("shardAndStrategy", shardAndStrategy)); } /** @@ -87,49 +103,98 @@ private BucketProbeResult findCompletedBuckets(ShardAndStrategy shardAndStrategy long currentBucket = searchStart + offset; Optional bucketProgress = bucketProgressStore.getBucketProgress(Bucket.of(shardAndStrategy, currentBucket)); - if (bucketProgress.isPresent()) { - BucketProgress presentBucketProgress = bucketProgress.get(); - TimestampRange requiredRange = getTimestampRangeRecord(currentBucket); - if (presentBucketProgress.timestampProgress() - != requiredRange.endExclusive() - requiredRange.startInclusive() - 1) { - // Bucket still has progress to go, so we can stop here. + Optional record = getTimestampRangeRecord(currentBucket); + Optional writtenSweepableBucket = + sweepBucketsTable.getSweepableBucket(Bucket.of(shardAndStrategy, currentBucket)); + log.info( + "Existing bucket progress", + SafeArg.of("shardAndStrategy", shardAndStrategy), + SafeArg.of("currentBucket", currentBucket), + SafeArg.of("bucketProgress", bucketProgress), + SafeArg.of("record", record)); + if (record.isPresent()) { // bucket has to have been closed + TimestampRange presentRecord = record.get(); + // If there's progress, and it's not at the end, then it's incomplete. + // If there's progress, and it's at the end, it's finished + // If there's no progress and the sweepable bucket is present, then it's not started + // If there's no progress and the sweepable bucket is not present, then it's finished + // (all assuming the record is present, since the record is written at the end) + + if (bucketProgress.isPresent()) { + BucketProgress presentBucketProgress = bucketProgress.get(); + if (presentBucketProgress.timestampProgress() + != presentRecord.endExclusive() - presentRecord.startInclusive() - 1) { + log.info( + "Incomplete bucket", + SafeArg.of("shardAndStrategy", shardAndStrategy), + SafeArg.of("currentBucket", currentBucket), + SafeArg.of( + "knownSweepProgress", + presentRecord.startInclusive() + presentBucketProgress.timestampProgress())); + return BucketProbeResult.builder() + .endExclusive(currentBucket) + .knownSweepProgress( + presentRecord.startInclusive() + presentBucketProgress.timestampProgress()) + .build(); + } else { + if (offset == MAX_BUCKETS_TO_CHECK_PER_ITERATION - 1) { + log.info("max buckets checked"); + return BucketProbeResult.builder() + .endExclusive(currentBucket + 1) + .knownSweepProgress(presentRecord.endExclusive() + 1) + .build(); + } + } + } else if (writtenSweepableBucket.isPresent()) { + log.info("Unstarted bucket"); return BucketProbeResult.builder() .endExclusive(currentBucket) - .knownSweepProgress( - requiredRange.startInclusive() + presentBucketProgress.timestampProgress()) + .knownSweepProgress(presentRecord.startInclusive() - 1L) .build(); } else { - // Bucket fully processed, keep going. if (offset == MAX_BUCKETS_TO_CHECK_PER_ITERATION - 1) { - // We finished the maximum number of buckets to check, and all were completed. + log.info("max buckets checked"); return BucketProbeResult.builder() .endExclusive(currentBucket + 1) - .knownSweepProgress(requiredRange.endExclusive() + 1) + .knownSweepProgress(presentRecord.endExclusive() + 1) .build(); } } } else { - // No progress; we're ahead of the read pointer, so interpret as unstarted. - return BucketProbeResult.builder() - .endExclusive(currentBucket) - .knownSweepProgress( - getTimestampRangeRecord(currentBucket).startInclusive() - 1L) - .build(); + log.info("No record found, therefore this is an open bucket"); + // No record; we're possibly in an open bucket, or not created yet. + + // TODO: Do nicely. + SweepableBucket bucket = writtenSweepableBucket.orElseThrow(() -> new SafeIllegalStateException( + "This is likely bucket 0 or starting the next bucket had a transient failure," + + " otherwise the state machine would have opened a new bucket.")); + + // No progress, then we haven't started yet. + if (bucketProgress.isEmpty()) { + log.info("No progress found, therefore this is an unstarted open bucket"); + return BucketProbeResult.builder() + .endExclusive(currentBucket) + .knownSweepProgress(bucket.timestampRange().startInclusive() - 1L) + .build(); + } else { + // Progress in the open bucket! + BucketProgress presentBucketProgress = bucketProgress.get(); + return BucketProbeResult.builder() + .endExclusive(currentBucket) + .knownSweepProgress(bucket.timestampRange().startInclusive() + + presentBucketProgress.timestampProgress()) + .build(); + } } } throw new SafeIllegalStateException("Didn't expect to get here"); } - private TimestampRange getTimestampRangeRecord(long queriedBucket) { + private Optional getTimestampRangeRecord(long queriedBucket) { try { - return recordsTable.getTimestampRangeRecord(queriedBucket); + return Optional.of(recordsTable.getTimestampRangeRecord(queriedBucket)); } catch (NoSuchElementException exception) { - throw new SafeIllegalStateException( - "Timestamp range record not found. If this has happened for bucket 0, this is possible when" - + " autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of a" - + " bug in auto-scaling sweep. In either case, we will retry.", - exception, - SafeArg.of("queriedBucket", queriedBucket)); + return Optional.empty(); // TODO(mdaudali): Note down the guarantees. } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSingleBucketSweepTask.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSingleBucketSweepTask.java index 4d8d093f08..bd8050bcf6 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSingleBucketSweepTask.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSingleBucketSweepTask.java @@ -84,6 +84,10 @@ public long runOneIteration(SweepableBucket sweepableBucket) { BucketProgress existingBucketProgress = bucketProgressStore.getBucketProgress(sweepableBucket.bucket()).orElse(BucketProgress.INITIAL_PROGRESS); + log.info( + "Existing bucket progress", + SafeArg.of("bucket", sweepableBucket.bucket()), + SafeArg.of("progress", existingBucketProgress)); // This is inclusive. long lastSweptTimestampInBucket = @@ -159,8 +163,13 @@ private long sweepBucket( long lastTsOffset = lastTs - sweepableBucket.timestampRange().startInclusive(); + log.info( + "Updating bucket progress", + SafeArg.of("bucket", sweepableBucket.bucket()), + SafeArg.of("progress", lastTsOffset)); bucketProgressStore.updateBucketProgressToAtLeast( sweepableBucket.bucket(), BucketProgress.createForTimestampProgress(lastTsOffset)); + if (isCompletelySwept(sweepableBucket.timestampRange().endExclusive(), lastTs)) { // we've finished the bucket! markBucketCompleteIfEligible(sweepableBucket); diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java index 3b494b5045..69ed9b248e 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java @@ -222,6 +222,15 @@ public Set getSweepableBuckets(Set startBuckets) { return readSweepableBucketRows(rows); } + @Override + public Optional getSweepableBucket(Bucket bucket) { + Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketsCell(bucket); + return readCell( + cell, + (byte[] bytes) -> SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue( + cell, Value.create(bytes, -1), timestampRangePersister)); + } + @Override public void putTimestampRangeForBucket( Bucket bucket, Optional oldTimestampRange, TimestampRange newTimestampRange) { diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java index a86f28b019..bc6da52cd3 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java @@ -29,6 +29,8 @@ public interface SweepBucketsTable { */ Set getSweepableBuckets(Set startBuckets); + Optional getSweepableBucket(Bucket bucket); + void putTimestampRangeForBucket( Bucket bucket, Optional oldTimestampRange, TimestampRange newTimestampRange); diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdaterTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdaterTest.java index 379b67370d..1893a96a31 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdaterTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultShardProgressUpdaterTest.java @@ -16,10 +16,8 @@ package com.palantir.atlasdb.sweep.asts; -import static com.palantir.logsafe.testing.Assertions.assertThatLoggableExceptionThrownBy; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -28,13 +26,12 @@ import com.google.common.collect.Sets; import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketPointerTable; import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketRecordsTable; +import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketsTable; import com.palantir.atlasdb.sweep.asts.progress.BucketProgress; import com.palantir.atlasdb.sweep.asts.progress.BucketProgressStore; import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; import com.palantir.atlasdb.sweep.queue.SweepQueueProgressUpdater; import com.palantir.atlasdb.sweep.queue.SweepQueueUtils; -import com.palantir.logsafe.SafeArg; -import com.palantir.logsafe.exceptions.SafeIllegalStateException; import java.util.List; import java.util.NoSuchElementException; import java.util.Optional; @@ -61,6 +58,9 @@ public class DefaultShardProgressUpdaterTest { @Mock private SweepBucketRecordsTable recordsTable; + @Mock + private SweepBucketsTable sweepBucketsTable; + @Mock private SweepBucketPointerTable sweepBucketPointerTable; @@ -69,39 +69,45 @@ public class DefaultShardProgressUpdaterTest { @BeforeEach public void setUp() { shardProgressUpdater = new DefaultShardProgressUpdater( - bucketProgressStore, sweepQueueProgressUpdater, recordsTable, sweepBucketPointerTable); + bucketProgressStore, + sweepQueueProgressUpdater, + recordsTable, + sweepBucketsTable, + sweepBucketPointerTable); } @ParameterizedTest @MethodSource("buckets") - public void wrapsAndRethrowsExceptionOnAbsenceOfTimestampRangeRecords(Bucket bucket) { + public void doesNotUpdateProgressOnUnstartedOpenBucket(Bucket bucket) { when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(bucket.shardAndStrategy()))) .thenReturn(ImmutableSet.of(bucket)); - NoSuchElementException underlyingException = new NoSuchElementException(); - when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenThrow(underlyingException); - - assertThatLoggableExceptionThrownBy(() -> shardProgressUpdater.updateProgress(bucket.shardAndStrategy())) - .isInstanceOf(SafeIllegalStateException.class) - .hasLogMessage("Timestamp range record not found. If this has happened for bucket 0, this is possible" - + " when autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of" - + " a bug in auto-scaling sweep. In either case, we will retry.") - .hasExactlyArgs(SafeArg.of("queriedBucket", bucket.bucketIdentifier())) - .hasCause(underlyingException); - - verify(sweepBucketPointerTable, never()).updateStartingBucketForShardAndStrategy(bucket); - verify(sweepQueueProgressUpdater, never()).progressTo(eq(bucket.shardAndStrategy()), anyLong()); + when(bucketProgressStore.getBucketProgress(bucket)).thenReturn(Optional.empty()); + when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenThrow(new NoSuchElementException()); + + TimestampRange timestampRange = TimestampRange.openBucket(SweepQueueUtils.minTsForCoarsePartition(3)); + when(sweepBucketsTable.getSweepableBucket(bucket)) + .thenReturn(Optional.of(SweepableBucket.of(bucket, timestampRange))); + + shardProgressUpdater.updateProgress(bucket.shardAndStrategy()); + + verify(sweepBucketPointerTable).updateStartingBucketForShardAndStrategy(bucket); + verify(sweepQueueProgressUpdater) + .progressTo(bucket.shardAndStrategy(), SweepQueueUtils.minTsForCoarsePartition(3) - 1L); verify(bucketProgressStore, never()).deleteBucketProgress(any()); } @ParameterizedTest @MethodSource("buckets") - public void doesNotUpdateProgressOnUnstartedBucket(Bucket bucket) { + public void doesNotUpdateProgressOnUnstartedClosedBucket(Bucket bucket) { when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(bucket.shardAndStrategy()))) .thenReturn(ImmutableSet.of(bucket)); when(bucketProgressStore.getBucketProgress(bucket)).thenReturn(Optional.empty()); - when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())) - .thenReturn(TimestampRange.of( - SweepQueueUtils.minTsForCoarsePartition(3), SweepQueueUtils.minTsForCoarsePartition(8))); + TimestampRange timestampRange = TimestampRange.of( + SweepQueueUtils.minTsForCoarsePartition(3), SweepQueueUtils.minTsForCoarsePartition(8)); + when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenReturn(timestampRange); + + when(sweepBucketsTable.getSweepableBucket(bucket)) + .thenReturn(Optional.of(SweepableBucket.of(bucket, timestampRange))); shardProgressUpdater.updateProgress(bucket.shardAndStrategy()); @@ -111,9 +117,47 @@ public void doesNotUpdateProgressOnUnstartedBucket(Bucket bucket) { verify(bucketProgressStore, never()).deleteBucketProgress(any()); } + @ParameterizedTest + @MethodSource("buckets") + public void throwsIfOpenBucketHasNoBucketEntry(Bucket bucket) { + when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(bucket.shardAndStrategy()))) + .thenReturn(ImmutableSet.of(bucket)); + when(bucketProgressStore.getBucketProgress(bucket)).thenReturn(Optional.empty()); + when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenThrow(new NoSuchElementException()); + + when(sweepBucketsTable.getSweepableBucket(bucket)).thenReturn(Optional.empty()); + + assertThatThrownBy(() -> shardProgressUpdater.updateProgress(bucket.shardAndStrategy())); + } + + @ParameterizedTest + @MethodSource("sweepableBuckets") + public void updatesProgressOnStartedButNotCompletedOpenBucket(SweepableBucket sweepableBucket) { + Bucket bucket = sweepableBucket.bucket(); + when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(bucket.shardAndStrategy()))) + .thenReturn(ImmutableSet.of(bucket)); + when(bucketProgressStore.getBucketProgress(bucket)) + .thenReturn(Optional.of(BucketProgress.createForTimestampProgress(1_234_567L))); + when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenThrow(new NoSuchElementException()); + when(sweepBucketsTable.getSweepableBucket(bucket)) + .thenReturn(Optional.of(SweepableBucket.of( + bucket, + TimestampRange.openBucket( + sweepableBucket.timestampRange().startInclusive())))); + + shardProgressUpdater.updateProgress(bucket.shardAndStrategy()); + + verify(sweepBucketPointerTable).updateStartingBucketForShardAndStrategy(bucket); + verify(sweepQueueProgressUpdater) + .progressTo( + bucket.shardAndStrategy(), + sweepableBucket.timestampRange().startInclusive() + 1_234_567L); + verify(bucketProgressStore, never()).deleteBucketProgress(any()); + } + @ParameterizedTest @MethodSource("sweepableBuckets") - public void updatesProgressOnStartedButNotCompletedBucket(SweepableBucket sweepableBucket) { + public void updatesProgressOnStartedButNotCompletedClosedBucket(SweepableBucket sweepableBucket) { Bucket bucket = sweepableBucket.bucket(); when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(bucket.shardAndStrategy()))) .thenReturn(ImmutableSet.of(bucket)); @@ -121,6 +165,7 @@ public void updatesProgressOnStartedButNotCompletedBucket(SweepableBucket sweepa .thenReturn(Optional.of(BucketProgress.createForTimestampProgress(1_234_567L))); when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())) .thenReturn(sweepableBucket.timestampRange()); + when(sweepBucketsTable.getSweepableBucket(bucket)).thenReturn(Optional.of(sweepableBucket)); shardProgressUpdater.updateProgress(bucket.shardAndStrategy()); @@ -155,7 +200,10 @@ public void progressesPastOneOrMoreCompletedBucketsAndStopsCorrectly( lastCompleteBucketTimestampRange.endExclusive(), lastCompleteBucketTimestampRange.endExclusive() + SweepQueueUtils.TS_COARSE_GRANULARITY); when(recordsTable.getTimestampRangeRecord(finalBucketIdentifier)).thenReturn(finalBucketTimestampRange); - + when(sweepBucketsTable.getSweepableBucket(Bucket.of(firstRawBucket.shardAndStrategy(), finalBucketIdentifier))) + .thenReturn(Optional.of(SweepableBucket.of( + Bucket.of(firstRawBucket.shardAndStrategy(), finalBucketIdentifier), + finalBucketTimestampRange))); shardProgressUpdater.updateProgress(firstRawBucket.shardAndStrategy()); verify(sweepBucketPointerTable) @@ -184,6 +232,7 @@ private void setupBucketAsComplete(SweepableBucket sweepableBucket) { sweepableBucket.timestampRange().endExclusive() - sweepableBucket.timestampRange().startInclusive() - 1L))); + when(sweepBucketsTable.getSweepableBucket(sweepableBucket.bucket())).thenReturn(Optional.of(sweepableBucket)); } // Creates a list of sweepable buckets following the provided bucket, each with a range of TS_COARSE_GRANULARITY diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java index 96a66abcc8..be03513cbf 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java @@ -256,6 +256,14 @@ public void getSweepableBucketsReturnsBucketsPerShardAndStrategy() { assertThat(store.getSweepableBuckets(new HashSet<>(startBuckets))).containsExactlyInAnyOrderElementsOf(buckets); } + @Test + public void getSweepableBucketReturnsSweepableBucketIfExists() { + Bucket bucket = Bucket.of(ShardAndStrategy.of(12, SweeperStrategy.THOROUGH), 21); + TimestampRange range = TimestampRange.of(1, 4); + store.putTimestampRangeForBucket(bucket, Optional.empty(), range); + assertThat(store.getSweepableBucket(bucket)).contains(SweepableBucket.of(bucket, range)); + } + @Test public void putTimestampRangeForBucketFailsIfOldTimestampRangeDoesNotMatchCurrent() { Bucket bucket = Bucket.of(ShardAndStrategy.of(12, SweeperStrategy.THOROUGH), 512); @@ -289,7 +297,7 @@ public void deleteBucketEntryDeletesBucket() { TimestampRange timestampRange = TimestampRange.of(1, 2); store.putTimestampRangeForBucket(bucket, Optional.empty(), timestampRange); store.deleteBucketEntry(bucket); - assertThat(store.getSweepableBuckets(Set.of(bucket))).isEmpty(); + assertThat(store.getSweepableBucket(bucket)).isEmpty(); } @Test