Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[DNM - RC ONLY][ASTS] Changes to background task
Browse files Browse the repository at this point in the history
  • Loading branch information
mdaudali committed Nov 5, 2024
1 parent 4ebbacb commit 9d992be
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ private static ShardProgressUpdater createShardProgressUpdater(
bucketProgressStore,
new SweepQueueProgressUpdater(cleaner),
sweepAssignedBucketStore,
sweepAssignedBucketStore,
sweepAssignedBucketStore);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Iterables;
import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketPointerTable;
import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketRecordsTable;
import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketsTable;
import com.palantir.atlasdb.sweep.asts.progress.BucketProgress;
import com.palantir.atlasdb.sweep.asts.progress.BucketProgressStore;
import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
Expand All @@ -43,23 +44,35 @@ public class DefaultShardProgressUpdater implements ShardProgressUpdater {
private final BucketProgressStore bucketProgressStore;
private final SweepQueueProgressUpdater sweepQueueProgressUpdater;
private final SweepBucketRecordsTable recordsTable;
private final SweepBucketsTable sweepBucketsTable;
private final SweepBucketPointerTable sweepBucketPointerTable;

public DefaultShardProgressUpdater(
BucketProgressStore bucketProgressStore,
SweepQueueProgressUpdater sweepQueueProgressUpdater,
SweepBucketRecordsTable recordsTable,
SweepBucketsTable sweepBucketsTable,
SweepBucketPointerTable sweepBucketPointerTable) {
this.bucketProgressStore = bucketProgressStore;
this.sweepQueueProgressUpdater = sweepQueueProgressUpdater;
this.recordsTable = recordsTable;
this.sweepBucketsTable = sweepBucketsTable;
this.sweepBucketPointerTable = sweepBucketPointerTable;
}

@Override
public void updateProgress(ShardAndStrategy shardAndStrategy) {
long bucketPointer = getStrictUpperBoundForSweptBuckets(shardAndStrategy);
log.info(
"Bucket pointer",
SafeArg.of("bucketPointer", bucketPointer),
SafeArg.of("shardAndStrategy", shardAndStrategy));
BucketProbeResult bucketProbeResult = findCompletedBuckets(shardAndStrategy, bucketPointer);
log.info(
"Found completed buckets",
SafeArg.of("endExclusive", bucketProbeResult.endExclusive()),
SafeArg.of("shardAndStrategy", shardAndStrategy),
SafeArg.of("knownSweepProgress", bucketProbeResult.knownSweepProgress()));

// This order of clearing the metadata is intentional:
// (1) if bucket progress is deleted but the pointer is not updated, we might sweep the relevant buckets
Expand All @@ -72,9 +85,12 @@ public void updateProgress(ShardAndStrategy shardAndStrategy) {
for (long bucket = bucketPointer; bucket < bucketProbeResult.endExclusive(); bucket++) {
bucketProgressStore.deleteBucketProgress(Bucket.of(shardAndStrategy, bucket));
}
log.info("Deleted bucket progress where necessary", SafeArg.of("shardAndStrategy", shardAndStrategy));
sweepBucketPointerTable.updateStartingBucketForShardAndStrategy(
Bucket.of(shardAndStrategy, bucketProbeResult.endExclusive()));
log.info("Updated starting bucket", SafeArg.of("shardAndStrategy", shardAndStrategy));
sweepQueueProgressUpdater.progressTo(shardAndStrategy, bucketProbeResult.knownSweepProgress());
log.info("recorded new progress", SafeArg.of("shardAndStrategy", shardAndStrategy));
}

/**
Expand All @@ -87,49 +103,98 @@ private BucketProbeResult findCompletedBuckets(ShardAndStrategy shardAndStrategy
long currentBucket = searchStart + offset;
Optional<BucketProgress> bucketProgress =
bucketProgressStore.getBucketProgress(Bucket.of(shardAndStrategy, currentBucket));
if (bucketProgress.isPresent()) {
BucketProgress presentBucketProgress = bucketProgress.get();
TimestampRange requiredRange = getTimestampRangeRecord(currentBucket);
if (presentBucketProgress.timestampProgress()
!= requiredRange.endExclusive() - requiredRange.startInclusive() - 1) {
// Bucket still has progress to go, so we can stop here.
Optional<TimestampRange> record = getTimestampRangeRecord(currentBucket);
Optional<SweepableBucket> writtenSweepableBucket =
sweepBucketsTable.getSweepableBucket(Bucket.of(shardAndStrategy, currentBucket));
log.info(
"Existing bucket progress",
SafeArg.of("shardAndStrategy", shardAndStrategy),
SafeArg.of("currentBucket", currentBucket),
SafeArg.of("bucketProgress", bucketProgress),
SafeArg.of("record", record));
if (record.isPresent()) { // bucket has to have been closed
TimestampRange presentRecord = record.get();
// If there's progress, and it's not at the end, then it's incomplete.
// If there's progress, and it's at the end, it's finished
// If there's no progress and the sweepable bucket is present, then it's not started
// If there's no progress and the sweepable bucket is not present, then it's finished
// (all assuming the record is present, since the record is written at the end)

if (bucketProgress.isPresent()) {
BucketProgress presentBucketProgress = bucketProgress.get();
if (presentBucketProgress.timestampProgress()
!= presentRecord.endExclusive() - presentRecord.startInclusive() - 1) {
log.info(
"Incomplete bucket",
SafeArg.of("shardAndStrategy", shardAndStrategy),
SafeArg.of("currentBucket", currentBucket),
SafeArg.of(
"knownSweepProgress",
presentRecord.startInclusive() + presentBucketProgress.timestampProgress()));
return BucketProbeResult.builder()
.endExclusive(currentBucket)
.knownSweepProgress(
presentRecord.startInclusive() + presentBucketProgress.timestampProgress())
.build();
} else {
if (offset == MAX_BUCKETS_TO_CHECK_PER_ITERATION - 1) {
log.info("max buckets checked");
return BucketProbeResult.builder()
.endExclusive(currentBucket + 1)
.knownSweepProgress(presentRecord.endExclusive() + 1)
.build();
}
}
} else if (writtenSweepableBucket.isPresent()) {
log.info("Unstarted bucket");
return BucketProbeResult.builder()
.endExclusive(currentBucket)
.knownSweepProgress(
requiredRange.startInclusive() + presentBucketProgress.timestampProgress())
.knownSweepProgress(presentRecord.startInclusive() - 1L)
.build();
} else {
// Bucket fully processed, keep going.
if (offset == MAX_BUCKETS_TO_CHECK_PER_ITERATION - 1) {
// We finished the maximum number of buckets to check, and all were completed.
log.info("max buckets checked");
return BucketProbeResult.builder()
.endExclusive(currentBucket + 1)
.knownSweepProgress(requiredRange.endExclusive() + 1)
.knownSweepProgress(presentRecord.endExclusive() + 1)
.build();
}
}
} else {
// No progress; we're ahead of the read pointer, so interpret as unstarted.
return BucketProbeResult.builder()
.endExclusive(currentBucket)
.knownSweepProgress(
getTimestampRangeRecord(currentBucket).startInclusive() - 1L)
.build();
log.info("No record found, therefore this is an open bucket");
// No record; we're possibly in an open bucket, or not created yet.

// TODO: Do nicely.
SweepableBucket bucket = writtenSweepableBucket.orElseThrow(() -> new SafeIllegalStateException(
"This is likely bucket 0 or starting the next bucket had a transient failure,"
+ " otherwise the state machine would have opened a new bucket."));

// No progress, then we haven't started yet.
if (bucketProgress.isEmpty()) {
log.info("No progress found, therefore this is an unstarted open bucket");
return BucketProbeResult.builder()
.endExclusive(currentBucket)
.knownSweepProgress(bucket.timestampRange().startInclusive() - 1L)
.build();
} else {
// Progress in the open bucket!
BucketProgress presentBucketProgress = bucketProgress.get();
return BucketProbeResult.builder()
.endExclusive(currentBucket)
.knownSweepProgress(bucket.timestampRange().startInclusive()
+ presentBucketProgress.timestampProgress())
.build();
}
}
}
throw new SafeIllegalStateException("Didn't expect to get here");
}

private TimestampRange getTimestampRangeRecord(long queriedBucket) {
private Optional<TimestampRange> getTimestampRangeRecord(long queriedBucket) {
try {
return recordsTable.getTimestampRangeRecord(queriedBucket);
return Optional.of(recordsTable.getTimestampRangeRecord(queriedBucket));
} catch (NoSuchElementException exception) {
throw new SafeIllegalStateException(
"Timestamp range record not found. If this has happened for bucket 0, this is possible when"
+ " autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of a"
+ " bug in auto-scaling sweep. In either case, we will retry.",
exception,
SafeArg.of("queriedBucket", queriedBucket));
return Optional.empty(); // TODO(mdaudali): Note down the guarantees.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public long runOneIteration(SweepableBucket sweepableBucket) {

BucketProgress existingBucketProgress =
bucketProgressStore.getBucketProgress(sweepableBucket.bucket()).orElse(BucketProgress.INITIAL_PROGRESS);
log.info(
"Existing bucket progress",
SafeArg.of("bucket", sweepableBucket.bucket()),
SafeArg.of("progress", existingBucketProgress));

// This is inclusive.
long lastSweptTimestampInBucket =
Expand Down Expand Up @@ -159,8 +163,13 @@ private long sweepBucket(

long lastTsOffset = lastTs - sweepableBucket.timestampRange().startInclusive();

log.info(
"Updating bucket progress",
SafeArg.of("bucket", sweepableBucket.bucket()),
SafeArg.of("progress", lastTsOffset));
bucketProgressStore.updateBucketProgressToAtLeast(
sweepableBucket.bucket(), BucketProgress.createForTimestampProgress(lastTsOffset));

if (isCompletelySwept(sweepableBucket.timestampRange().endExclusive(), lastTs)) {
// we've finished the bucket!
markBucketCompleteIfEligible(sweepableBucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,15 @@ public Set<SweepableBucket> getSweepableBuckets(Set<Bucket> startBuckets) {
return readSweepableBucketRows(rows);
}

@Override
public Optional<SweepableBucket> getSweepableBucket(Bucket bucket) {
Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketsCell(bucket);
return readCell(
cell,
(byte[] bytes) -> SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue(
cell, Value.create(bytes, -1), timestampRangePersister));
}

@Override
public void putTimestampRangeForBucket(
Bucket bucket, Optional<TimestampRange> oldTimestampRange, TimestampRange newTimestampRange) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public interface SweepBucketsTable {
*/
Set<SweepableBucket> getSweepableBuckets(Set<Bucket> startBuckets);

Optional<SweepableBucket> getSweepableBucket(Bucket bucket);

void putTimestampRangeForBucket(
Bucket bucket, Optional<TimestampRange> oldTimestampRange, TimestampRange newTimestampRange);

Expand Down
Loading

0 comments on commit 9d992be

Please sign in to comment.