Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[ASTS] Foreground Task I: The Ramp (#7270)
Browse files Browse the repository at this point in the history
We implement SingleBucketSweepTask, which uses the existing components to sweep a specific bucket.
  • Loading branch information
jeremyk-91 authored Oct 3, 2024
1 parent 91ed1f8 commit 0057a86
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep.asts;

import com.palantir.atlasdb.sweep.Sweeper;
import com.palantir.atlasdb.sweep.asts.bucketingthings.BucketCompletionListener;
import com.palantir.atlasdb.sweep.asts.bucketingthings.CompletelyClosedSweepBucketBoundRetriever;
import com.palantir.atlasdb.sweep.asts.progress.BucketProgress;
import com.palantir.atlasdb.sweep.asts.progress.BucketProgressStore;
import com.palantir.atlasdb.sweep.metrics.TargetedSweepMetrics;
import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
import com.palantir.atlasdb.sweep.queue.SweepBatch;
import com.palantir.atlasdb.sweep.queue.SweepBatchWithPartitionInfo;
import com.palantir.atlasdb.sweep.queue.SweepQueueCleaner;
import com.palantir.atlasdb.sweep.queue.SweepQueueDeleter;
import com.palantir.atlasdb.sweep.queue.SweepQueueReader;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.function.LongSupplier;

public class DefaultSingleBucketSweepTask implements SingleBucketSweepTask {
private static final SafeLogger log = SafeLoggerFactory.get(DefaultSingleBucketSweepTask.class);

private final BucketProgressStore bucketProgressStore;
private final SweepQueueReader sweepQueueReader;
private final SweepQueueDeleter sweepQueueDeleter;
private final SweepQueueCleaner sweepQueueCleaner;
private final LongSupplier sweepTimestampSupplier;
private final TargetedSweepMetrics targetedSweepMetrics;
private final BucketCompletionListener bucketCompletionListener;
private final CompletelyClosedSweepBucketBoundRetriever completelyClosedSweepBucketBoundRetriever;

public DefaultSingleBucketSweepTask(
BucketProgressStore bucketProgressStore,
SweepQueueReader sweepQueueReader,
SweepQueueDeleter sweepQueueDeleter,
SweepQueueCleaner sweepQueueCleaner,
LongSupplier sweepTimestampSupplier,
TargetedSweepMetrics targetedSweepMetrics,
BucketCompletionListener bucketCompletionListener,
CompletelyClosedSweepBucketBoundRetriever completelyClosedSweepBucketBoundRetriever) {
this.bucketProgressStore = bucketProgressStore;
this.sweepQueueReader = sweepQueueReader;
this.sweepQueueDeleter = sweepQueueDeleter;
this.sweepQueueCleaner = sweepQueueCleaner;
this.sweepTimestampSupplier = sweepTimestampSupplier;
this.targetedSweepMetrics = targetedSweepMetrics;
this.bucketCompletionListener = bucketCompletionListener;
this.completelyClosedSweepBucketBoundRetriever = completelyClosedSweepBucketBoundRetriever;
}

@Override
public long runOneIteration(SweepableBucket sweepableBucket) {
long sweepTimestampForIteration = sweepTimestampSupplier.getAsLong();
long bucketStartTimestamp = sweepableBucket.timestampRange().startInclusive();
if (sweepTimestampForIteration <= bucketStartTimestamp) {
// This means that the sweep timestamp has not entered this partition yet, so we do not need to process
// anything. Note that sweep timestamps are exclusive, so <= is correct.
return 0L;
}

BucketProgress existingBucketProgress =
bucketProgressStore.getBucketProgress(sweepableBucket.bucket()).orElse(BucketProgress.INITIAL_PROGRESS);

// This is inclusive.
long lastSweptTimestampInBucket =
sweepableBucket.timestampRange().startInclusive() + existingBucketProgress.timestampProgress();
if (isCompletelySwept(sweepableBucket.timestampRange().endExclusive(), lastSweptTimestampInBucket)) {
// The bucket is fully swept; it might still be returned here if we thought it was a candidate, or if
// the bucket state machine is still doing things
markBucketCompleteIfEligible(sweepableBucket);
return 0L;
}

if (sweepTimestampForIteration <= lastSweptTimestampInBucket) {
// The sweep timestamp has made progress in this partition, but we've swept everything up to it.
return 0L;
}

// TODO (jkong): Make use of the partial progress within a timestamp.
SweepBatchWithPartitionInfo sweepBatchWithPartitionInfo = sweepQueueReader.getNextBatchToSweep(
sweepableBucket.bucket().shardAndStrategy(),
lastSweptTimestampInBucket,
getEndOfSweepRange(sweepableBucket, sweepTimestampForIteration));
SweepBatch sweepBatch = sweepBatchWithPartitionInfo.sweepBatch();

ShardAndStrategy shardAndStrategy = sweepableBucket.bucket().shardAndStrategy();
sweepQueueDeleter.sweep(sweepBatch.writes(), Sweeper.of(shardAndStrategy));
targetedSweepMetrics.registerEntriesReadInBatch(shardAndStrategy, sweepBatch.entriesRead());

if (!sweepBatch.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug(
"Put {} ranged tombstones and swept up to timestamp {} for {}.",
SafeArg.of("tombstones", sweepBatch.writes().size()),
SafeArg.of("lastSweptTs", sweepBatch.lastSweptTimestamp()),
SafeArg.of("shardStrategy", shardAndStrategy.toText()));
}
}

long lastTs = sweepBatch.lastSweptTimestamp();
sweepQueueCleaner.clean(
shardAndStrategy,
sweepBatchWithPartitionInfo.partitionsForPreviousLastSweptTsWithMinimumBound(
lastSweptTimestampInBucket,
sweepableBucket.timestampRange().startInclusive()),
lastTs,
sweepBatch.dedicatedRows());

long lastTsOffset = lastTs - sweepableBucket.timestampRange().startInclusive();

bucketProgressStore.updateBucketProgressToAtLeast(
sweepableBucket.bucket(), BucketProgress.createForTimestampProgress(lastTsOffset));
if (isCompletelySwept(sweepableBucket.timestampRange().endExclusive(), lastTs)) {
// we've finished the bucket!
markBucketCompleteIfEligible(sweepableBucket);
}

targetedSweepMetrics.updateNumberOfTombstones(
shardAndStrategy, sweepBatch.writes().size());

// No updating of overall progress; that's a responsibility of the background updating task
return sweepBatch.entriesRead();
}

private void markBucketCompleteIfEligible(SweepableBucket sweepableBucket) {
if (sweepableBucket.bucket().bucketIdentifier()
< completelyClosedSweepBucketBoundRetriever.getStrictUpperBoundForCompletelyClosedBuckets()) {
bucketCompletionListener.markBucketCompleteAndRemoveFromScheduling(sweepableBucket.bucket());
}
}

private static long getEndOfSweepRange(SweepableBucket sweepableBucket, long sweepTimestampForIteration) {
if (sweepableBucket.timestampRange().endExclusive() == -1) {
return sweepTimestampForIteration;
}
return Math.min(
sweepTimestampForIteration, sweepableBucket.timestampRange().endExclusive());
}

private static boolean isCompletelySwept(long rangeEndExclusive, long lastSweptTimestampInBucket) {
if (rangeEndExclusive == -1) {
// The bucket's not complete, in which case it is not completely swept.
return false;
}
return lastSweptTimestampInBucket >= rangeEndExclusive - 1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep.asts;

/**
* Given a single {@link SweepableBucket}, performs an iteration of sweep on the relevant bucket (resuming from
* existing partial progress and updating progress when done, as appropriate).
*/
public interface SingleBucketSweepTask {
/**
* Returns the number of cells read by the sweep task.
*/
long runOneIteration(SweepableBucket sweepableBucket);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep.asts.bucketingthings;

import com.palantir.atlasdb.sweep.asts.Bucket;

public interface BucketCompletionListener {
/**
* Marks a bucket as complete. This method should ONLY be called once we are certain the bucket no longer needs
* to be swept.
*/
void markBucketCompleteAndRemoveFromScheduling(Bucket bucket);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep.asts.bucketingthings;

public interface CompletelyClosedSweepBucketBoundRetriever {
/**
* It is guaranteed that all sweep buckets up to, BUT NOT including this number, are closed, and that we will
* not add new entries to the buckets table with numbers before a value returned by this method.
*/
long getStrictUpperBoundForCompletelyClosedBuckets();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public interface SweepBucketsTable {

void putTimestampRangeForBucket(
Bucket bucket, Optional<TimestampRange> oldTimestampRange, TimestampRange newTimestampRange);

void deleteBucketEntry(Bucket bucket);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
* Describes partial progress of Sweep within the context of a bucket.
*/
public interface BucketProgress extends Comparable<BucketProgress> {
BucketProgress INITIAL_PROGRESS = createForTimestampProgress(-1L);

/**
* Within this bucket, timestamps starting from 0 up to {@link #timestampProgress()} inclusive have been fully swept.
* -1 can be used to indicate that no timestamps are fully swept yet (e.g., if we are just starting this bucket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.util.Set;
import java.util.function.LongPredicate;
import org.immutables.value.Value;

@Value.Immutable
Expand All @@ -28,13 +29,17 @@ public interface SweepBatchWithPartitionInfo {
Set<Long> finePartitions();

default Set<Long> partitionsForPreviousLastSweptTs(long previousLastSweptTs) {
Set<Long> encounteredPartitions = SweepQueueUtils.firstSweep(previousLastSweptTs)
? finePartitions()
: Sets.union(finePartitions(), ImmutableSet.of(SweepQueueUtils.tsPartitionFine(previousLastSweptTs)));
return partitionsForPreviousLastSweptTsInternal(previousLastSweptTs, SweepQueueUtils::firstSweep);
}

return Sets.difference(
encounteredPartitions,
ImmutableSet.of(SweepQueueUtils.tsPartitionFine(sweepBatch().lastSweptTimestamp() + 1)));
/**
* Determines the partitions that were completed from a previously swept timestamp until the end of this batch.
* Differently from {@link #partitionsForPreviousLastSweptTs(long)}, this method applies a minimum bound to
* the partition range, which may be useful if we want to consider a sub-range of the sweep queue (which,
* in particular, may itself not contain previousLastSweptTs).
*/
default Set<Long> partitionsForPreviousLastSweptTsWithMinimumBound(long previousLastSweptTs, long minimumBound) {
return partitionsForPreviousLastSweptTsInternal(previousLastSweptTs, value -> value < minimumBound);
}

static SweepBatchWithPartitionInfo of(SweepBatch sweepBatch, Set<Long> finePartitions) {
Expand All @@ -43,4 +48,15 @@ static SweepBatchWithPartitionInfo of(SweepBatch sweepBatch, Set<Long> fineParti
.finePartitions(finePartitions)
.build();
}

private Set<Long> partitionsForPreviousLastSweptTsInternal(
long previousLastSweptTs, LongPredicate criteriaForExcludingPreviousTimestamp) {
Set<Long> encounteredPartitions = criteriaForExcludingPreviousTimestamp.test(previousLastSweptTs)
? finePartitions()
: Sets.union(finePartitions(), ImmutableSet.of(SweepQueueUtils.tsPartitionFine(previousLastSweptTs)));

return Sets.difference(
encounteredPartitions,
ImmutableSet.of(SweepQueueUtils.tsPartitionFine(sweepBatch().lastSweptTimestamp() + 1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public long sweepNextBatch(ShardAndStrategy shardStrategy, long sweepTs) {
SafeArg.of("shardStrategy", shardStrategy.toText()));
}

cleaner.clean(
cleaner.cleanAndUpdateProgress(
shardStrategy,
batchWithInfo.partitionsForPreviousLastSweptTs(lastSweptTs),
sweepBatch.lastSweptTimestamp(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,19 @@ public SweepQueueCleaner(SweepableCells cells, SweepableTimestamps timestamps, S
* @param lastTs last swept timestamp for this iteration of sweep.
* @param dedicatedRows the dedicated rows that have now been swept that should now be removed.
*/
public void cleanAndUpdateProgress(
ShardAndStrategy shardStrategy, Set<Long> partitions, long lastTs, DedicatedRows dedicatedRows) {
clean(shardStrategy, partitions, lastTs, dedicatedRows);
progressTo(shardStrategy, lastTs);
}

/**
* Cleans up all the sweep queue data corresponding to the partitions and dedicated rows indicated.
*/
public void clean(ShardAndStrategy shardStrategy, Set<Long> partitions, long lastTs, DedicatedRows dedicatedRows) {
cleanDedicatedRows(dedicatedRows);
cleanSweepableCells(shardStrategy, partitions);
cleanSweepableTimestamps(shardStrategy, partitions, lastTs);
progressTo(shardStrategy, lastTs);
}

private void cleanSweepableCells(ShardAndStrategy shardStrategy, Set<Long> partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public class SweepQueueReader {
this.runtime = runtime;
}

SweepBatchWithPartitionInfo getNextBatchToSweep(ShardAndStrategy shardStrategy, long lastSweptTs, long sweepTs) {
public SweepBatchWithPartitionInfo getNextBatchToSweep(
ShardAndStrategy shardStrategy, long lastSweptTs, long sweepTs) {
SweepBatchAccumulator accumulator =
new SweepBatchAccumulator(sweepTs, runtime.cellsThreshold().getAsInt(), lastSweptTs);
long previousProgress = lastSweptTs;
Expand Down

0 comments on commit 0057a86

Please sign in to comment.