From 0057a868d3b612370677b069f77d2da0c67a92a6 Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Thu, 3 Oct 2024 14:49:25 +0100 Subject: [PATCH] [ASTS] Foreground Task I: The Ramp (#7270) We implement SingleBucketSweepTask, which uses the existing components to sweep a specific bucket. --- .../asts/DefaultSingleBucketSweepTask.java | 163 ++++++++++++++++++ .../sweep/asts/SingleBucketSweepTask.java | 28 +++ .../BucketCompletionListener.java | 27 +++ ...letelyClosedSweepBucketBoundRetriever.java | 25 +++ .../bucketingthings/SweepBucketsTable.java | 2 + .../sweep/asts/progress/BucketProgress.java | 2 + .../queue/SweepBatchWithPartitionInfo.java | 28 ++- .../atlasdb/sweep/queue/SweepQueue.java | 2 +- .../sweep/queue/SweepQueueCleaner.java | 10 +- .../atlasdb/sweep/queue/SweepQueueReader.java | 3 +- 10 files changed, 281 insertions(+), 9 deletions(-) create mode 100644 atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSingleBucketSweepTask.java create mode 100644 atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SingleBucketSweepTask.java create mode 100644 atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/BucketCompletionListener.java create mode 100644 atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/CompletelyClosedSweepBucketBoundRetriever.java diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSingleBucketSweepTask.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSingleBucketSweepTask.java new file mode 100644 index 00000000000..678a4f6ac00 --- /dev/null +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSingleBucketSweepTask.java @@ -0,0 +1,163 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.sweep.asts; + +import com.palantir.atlasdb.sweep.Sweeper; +import com.palantir.atlasdb.sweep.asts.bucketingthings.BucketCompletionListener; +import com.palantir.atlasdb.sweep.asts.bucketingthings.CompletelyClosedSweepBucketBoundRetriever; +import com.palantir.atlasdb.sweep.asts.progress.BucketProgress; +import com.palantir.atlasdb.sweep.asts.progress.BucketProgressStore; +import com.palantir.atlasdb.sweep.metrics.TargetedSweepMetrics; +import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; +import com.palantir.atlasdb.sweep.queue.SweepBatch; +import com.palantir.atlasdb.sweep.queue.SweepBatchWithPartitionInfo; +import com.palantir.atlasdb.sweep.queue.SweepQueueCleaner; +import com.palantir.atlasdb.sweep.queue.SweepQueueDeleter; +import com.palantir.atlasdb.sweep.queue.SweepQueueReader; +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.logger.SafeLogger; +import com.palantir.logsafe.logger.SafeLoggerFactory; +import java.util.function.LongSupplier; + +public class DefaultSingleBucketSweepTask implements SingleBucketSweepTask { + private static final SafeLogger log = SafeLoggerFactory.get(DefaultSingleBucketSweepTask.class); + + private final BucketProgressStore bucketProgressStore; + private final SweepQueueReader sweepQueueReader; + private final SweepQueueDeleter sweepQueueDeleter; + private final SweepQueueCleaner sweepQueueCleaner; + private final LongSupplier sweepTimestampSupplier; + private final TargetedSweepMetrics targetedSweepMetrics; + private final BucketCompletionListener bucketCompletionListener; + private final CompletelyClosedSweepBucketBoundRetriever completelyClosedSweepBucketBoundRetriever; + + public DefaultSingleBucketSweepTask( + BucketProgressStore bucketProgressStore, + SweepQueueReader sweepQueueReader, + SweepQueueDeleter sweepQueueDeleter, + SweepQueueCleaner sweepQueueCleaner, + LongSupplier sweepTimestampSupplier, + TargetedSweepMetrics targetedSweepMetrics, + BucketCompletionListener bucketCompletionListener, + CompletelyClosedSweepBucketBoundRetriever completelyClosedSweepBucketBoundRetriever) { + this.bucketProgressStore = bucketProgressStore; + this.sweepQueueReader = sweepQueueReader; + this.sweepQueueDeleter = sweepQueueDeleter; + this.sweepQueueCleaner = sweepQueueCleaner; + this.sweepTimestampSupplier = sweepTimestampSupplier; + this.targetedSweepMetrics = targetedSweepMetrics; + this.bucketCompletionListener = bucketCompletionListener; + this.completelyClosedSweepBucketBoundRetriever = completelyClosedSweepBucketBoundRetriever; + } + + @Override + public long runOneIteration(SweepableBucket sweepableBucket) { + long sweepTimestampForIteration = sweepTimestampSupplier.getAsLong(); + long bucketStartTimestamp = sweepableBucket.timestampRange().startInclusive(); + if (sweepTimestampForIteration <= bucketStartTimestamp) { + // This means that the sweep timestamp has not entered this partition yet, so we do not need to process + // anything. Note that sweep timestamps are exclusive, so <= is correct. + return 0L; + } + + BucketProgress existingBucketProgress = + bucketProgressStore.getBucketProgress(sweepableBucket.bucket()).orElse(BucketProgress.INITIAL_PROGRESS); + + // This is inclusive. + long lastSweptTimestampInBucket = + sweepableBucket.timestampRange().startInclusive() + existingBucketProgress.timestampProgress(); + if (isCompletelySwept(sweepableBucket.timestampRange().endExclusive(), lastSweptTimestampInBucket)) { + // The bucket is fully swept; it might still be returned here if we thought it was a candidate, or if + // the bucket state machine is still doing things + markBucketCompleteIfEligible(sweepableBucket); + return 0L; + } + + if (sweepTimestampForIteration <= lastSweptTimestampInBucket) { + // The sweep timestamp has made progress in this partition, but we've swept everything up to it. + return 0L; + } + + // TODO (jkong): Make use of the partial progress within a timestamp. + SweepBatchWithPartitionInfo sweepBatchWithPartitionInfo = sweepQueueReader.getNextBatchToSweep( + sweepableBucket.bucket().shardAndStrategy(), + lastSweptTimestampInBucket, + getEndOfSweepRange(sweepableBucket, sweepTimestampForIteration)); + SweepBatch sweepBatch = sweepBatchWithPartitionInfo.sweepBatch(); + + ShardAndStrategy shardAndStrategy = sweepableBucket.bucket().shardAndStrategy(); + sweepQueueDeleter.sweep(sweepBatch.writes(), Sweeper.of(shardAndStrategy)); + targetedSweepMetrics.registerEntriesReadInBatch(shardAndStrategy, sweepBatch.entriesRead()); + + if (!sweepBatch.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug( + "Put {} ranged tombstones and swept up to timestamp {} for {}.", + SafeArg.of("tombstones", sweepBatch.writes().size()), + SafeArg.of("lastSweptTs", sweepBatch.lastSweptTimestamp()), + SafeArg.of("shardStrategy", shardAndStrategy.toText())); + } + } + + long lastTs = sweepBatch.lastSweptTimestamp(); + sweepQueueCleaner.clean( + shardAndStrategy, + sweepBatchWithPartitionInfo.partitionsForPreviousLastSweptTsWithMinimumBound( + lastSweptTimestampInBucket, + sweepableBucket.timestampRange().startInclusive()), + lastTs, + sweepBatch.dedicatedRows()); + + long lastTsOffset = lastTs - sweepableBucket.timestampRange().startInclusive(); + + bucketProgressStore.updateBucketProgressToAtLeast( + sweepableBucket.bucket(), BucketProgress.createForTimestampProgress(lastTsOffset)); + if (isCompletelySwept(sweepableBucket.timestampRange().endExclusive(), lastTs)) { + // we've finished the bucket! + markBucketCompleteIfEligible(sweepableBucket); + } + + targetedSweepMetrics.updateNumberOfTombstones( + shardAndStrategy, sweepBatch.writes().size()); + + // No updating of overall progress; that's a responsibility of the background updating task + return sweepBatch.entriesRead(); + } + + private void markBucketCompleteIfEligible(SweepableBucket sweepableBucket) { + if (sweepableBucket.bucket().bucketIdentifier() + < completelyClosedSweepBucketBoundRetriever.getStrictUpperBoundForCompletelyClosedBuckets()) { + bucketCompletionListener.markBucketCompleteAndRemoveFromScheduling(sweepableBucket.bucket()); + } + } + + private static long getEndOfSweepRange(SweepableBucket sweepableBucket, long sweepTimestampForIteration) { + if (sweepableBucket.timestampRange().endExclusive() == -1) { + return sweepTimestampForIteration; + } + return Math.min( + sweepTimestampForIteration, sweepableBucket.timestampRange().endExclusive()); + } + + private static boolean isCompletelySwept(long rangeEndExclusive, long lastSweptTimestampInBucket) { + if (rangeEndExclusive == -1) { + // The bucket's not complete, in which case it is not completely swept. + return false; + } + return lastSweptTimestampInBucket >= rangeEndExclusive - 1; + } +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SingleBucketSweepTask.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SingleBucketSweepTask.java new file mode 100644 index 00000000000..488c1d911ad --- /dev/null +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SingleBucketSweepTask.java @@ -0,0 +1,28 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.sweep.asts; + +/** + * Given a single {@link SweepableBucket}, performs an iteration of sweep on the relevant bucket (resuming from + * existing partial progress and updating progress when done, as appropriate). + */ +public interface SingleBucketSweepTask { + /** + * Returns the number of cells read by the sweep task. + */ + long runOneIteration(SweepableBucket sweepableBucket); +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/BucketCompletionListener.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/BucketCompletionListener.java new file mode 100644 index 00000000000..80c964f1ec9 --- /dev/null +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/BucketCompletionListener.java @@ -0,0 +1,27 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.sweep.asts.bucketingthings; + +import com.palantir.atlasdb.sweep.asts.Bucket; + +public interface BucketCompletionListener { + /** + * Marks a bucket as complete. This method should ONLY be called once we are certain the bucket no longer needs + * to be swept. + */ + void markBucketCompleteAndRemoveFromScheduling(Bucket bucket); +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/CompletelyClosedSweepBucketBoundRetriever.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/CompletelyClosedSweepBucketBoundRetriever.java new file mode 100644 index 00000000000..9149024b1c8 --- /dev/null +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/CompletelyClosedSweepBucketBoundRetriever.java @@ -0,0 +1,25 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.sweep.asts.bucketingthings; + +public interface CompletelyClosedSweepBucketBoundRetriever { + /** + * It is guaranteed that all sweep buckets up to, BUT NOT including this number, are closed, and that we will + * not add new entries to the buckets table with numbers before a value returned by this method. + */ + long getStrictUpperBoundForCompletelyClosedBuckets(); +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java index 031e8096034..a86f28b0192 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java @@ -31,4 +31,6 @@ public interface SweepBucketsTable { void putTimestampRangeForBucket( Bucket bucket, Optional oldTimestampRange, TimestampRange newTimestampRange); + + void deleteBucketEntry(Bucket bucket); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/progress/BucketProgress.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/progress/BucketProgress.java index a0047a8842f..fd890259102 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/progress/BucketProgress.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/progress/BucketProgress.java @@ -29,6 +29,8 @@ * Describes partial progress of Sweep within the context of a bucket. */ public interface BucketProgress extends Comparable { + BucketProgress INITIAL_PROGRESS = createForTimestampProgress(-1L); + /** * Within this bucket, timestamps starting from 0 up to {@link #timestampProgress()} inclusive have been fully swept. * -1 can be used to indicate that no timestamps are fully swept yet (e.g., if we are just starting this bucket, diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepBatchWithPartitionInfo.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepBatchWithPartitionInfo.java index f4771c4f96a..fa5a7c70760 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepBatchWithPartitionInfo.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepBatchWithPartitionInfo.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import java.util.Set; +import java.util.function.LongPredicate; import org.immutables.value.Value; @Value.Immutable @@ -28,13 +29,17 @@ public interface SweepBatchWithPartitionInfo { Set finePartitions(); default Set partitionsForPreviousLastSweptTs(long previousLastSweptTs) { - Set encounteredPartitions = SweepQueueUtils.firstSweep(previousLastSweptTs) - ? finePartitions() - : Sets.union(finePartitions(), ImmutableSet.of(SweepQueueUtils.tsPartitionFine(previousLastSweptTs))); + return partitionsForPreviousLastSweptTsInternal(previousLastSweptTs, SweepQueueUtils::firstSweep); + } - return Sets.difference( - encounteredPartitions, - ImmutableSet.of(SweepQueueUtils.tsPartitionFine(sweepBatch().lastSweptTimestamp() + 1))); + /** + * Determines the partitions that were completed from a previously swept timestamp until the end of this batch. + * Differently from {@link #partitionsForPreviousLastSweptTs(long)}, this method applies a minimum bound to + * the partition range, which may be useful if we want to consider a sub-range of the sweep queue (which, + * in particular, may itself not contain previousLastSweptTs). + */ + default Set partitionsForPreviousLastSweptTsWithMinimumBound(long previousLastSweptTs, long minimumBound) { + return partitionsForPreviousLastSweptTsInternal(previousLastSweptTs, value -> value < minimumBound); } static SweepBatchWithPartitionInfo of(SweepBatch sweepBatch, Set finePartitions) { @@ -43,4 +48,15 @@ static SweepBatchWithPartitionInfo of(SweepBatch sweepBatch, Set fineParti .finePartitions(finePartitions) .build(); } + + private Set partitionsForPreviousLastSweptTsInternal( + long previousLastSweptTs, LongPredicate criteriaForExcludingPreviousTimestamp) { + Set encounteredPartitions = criteriaForExcludingPreviousTimestamp.test(previousLastSweptTs) + ? finePartitions() + : Sets.union(finePartitions(), ImmutableSet.of(SweepQueueUtils.tsPartitionFine(previousLastSweptTs))); + + return Sets.difference( + encounteredPartitions, + ImmutableSet.of(SweepQueueUtils.tsPartitionFine(sweepBatch().lastSweptTimestamp() + 1))); + } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java index aed9699588c..163999fbb09 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java @@ -169,7 +169,7 @@ public long sweepNextBatch(ShardAndStrategy shardStrategy, long sweepTs) { SafeArg.of("shardStrategy", shardStrategy.toText())); } - cleaner.clean( + cleaner.cleanAndUpdateProgress( shardStrategy, batchWithInfo.partitionsForPreviousLastSweptTs(lastSweptTs), sweepBatch.lastSweptTimestamp(), diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueueCleaner.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueueCleaner.java index 1d0f36cd80e..37dbcacd0d5 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueueCleaner.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueueCleaner.java @@ -43,11 +43,19 @@ public SweepQueueCleaner(SweepableCells cells, SweepableTimestamps timestamps, S * @param lastTs last swept timestamp for this iteration of sweep. * @param dedicatedRows the dedicated rows that have now been swept that should now be removed. */ + public void cleanAndUpdateProgress( + ShardAndStrategy shardStrategy, Set partitions, long lastTs, DedicatedRows dedicatedRows) { + clean(shardStrategy, partitions, lastTs, dedicatedRows); + progressTo(shardStrategy, lastTs); + } + + /** + * Cleans up all the sweep queue data corresponding to the partitions and dedicated rows indicated. + */ public void clean(ShardAndStrategy shardStrategy, Set partitions, long lastTs, DedicatedRows dedicatedRows) { cleanDedicatedRows(dedicatedRows); cleanSweepableCells(shardStrategy, partitions); cleanSweepableTimestamps(shardStrategy, partitions, lastTs); - progressTo(shardStrategy, lastTs); } private void cleanSweepableCells(ShardAndStrategy shardStrategy, Set partitions) { diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueueReader.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueueReader.java index cd891bd3247..c4de19b4ddf 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueueReader.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueueReader.java @@ -33,7 +33,8 @@ public class SweepQueueReader { this.runtime = runtime; } - SweepBatchWithPartitionInfo getNextBatchToSweep(ShardAndStrategy shardStrategy, long lastSweptTs, long sweepTs) { + public SweepBatchWithPartitionInfo getNextBatchToSweep( + ShardAndStrategy shardStrategy, long lastSweptTs, long sweepTs) { SweepBatchAccumulator accumulator = new SweepBatchAccumulator(sweepTs, runtime.cellsThreshold().getAsInt(), lastSweptTs); long previousProgress = lastSweptTs;