Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[ASTS] Background Shard Progress Updater now correctly calculates last swept progress #7441

Open
wants to merge 3 commits into
base: mdaudali/11-12-_asts_add_getsweepablebucket_method
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ private static ShardProgressUpdater createShardProgressUpdater(
bucketProgressStore,
new SweepQueueProgressUpdater(cleaner),
sweepAssignedBucketStore,
sweepAssignedBucketStore,
sweepAssignedBucketStore);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
import com.google.common.collect.Iterables;
import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketPointerTable;
import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketRecordsTable;
import com.palantir.atlasdb.sweep.asts.bucketingthings.SweepBucketsTable;
import com.palantir.atlasdb.sweep.asts.progress.BucketProgress;
import com.palantir.atlasdb.sweep.asts.progress.BucketProgressStore;
import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
import com.palantir.atlasdb.sweep.queue.SweepQueueProgressUpdater;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import org.immutables.value.Value;
Expand All @@ -43,16 +44,19 @@ public class DefaultShardProgressUpdater implements ShardProgressUpdater {
private final BucketProgressStore bucketProgressStore;
private final SweepQueueProgressUpdater sweepQueueProgressUpdater;
private final SweepBucketRecordsTable recordsTable;
private final SweepBucketsTable sweepBucketsTable;
private final SweepBucketPointerTable sweepBucketPointerTable;

public DefaultShardProgressUpdater(
BucketProgressStore bucketProgressStore,
SweepQueueProgressUpdater sweepQueueProgressUpdater,
SweepBucketRecordsTable recordsTable,
SweepBucketsTable sweepBucketsTable,
SweepBucketPointerTable sweepBucketPointerTable) {
this.bucketProgressStore = bucketProgressStore;
this.sweepQueueProgressUpdater = sweepQueueProgressUpdater;
this.recordsTable = recordsTable;
this.sweepBucketsTable = sweepBucketsTable;
this.sweepBucketPointerTable = sweepBucketPointerTable;
}

Expand All @@ -62,16 +66,9 @@ public void updateProgress(ShardAndStrategy shardAndStrategy) {
BucketProbeResult bucketProbeResult = findCompletedBuckets(shardAndStrategy, bucketPointer);

// This order of clearing the metadata is intentional:
// (1) if bucket progress is deleted but the pointer is not updated, we might sweep the relevant buckets
// again, but that is acceptable because sweepable cells and timestamps were already cleared, and
// these tables are not accessed via row range scans, so the number of tombstones we read will be
// reasonably bounded.
// (2) if the pointer is updated but progress is not, we will update progress to the right value on the
// next iteration (notice that we only use the pointer, and not the existing progress, to track where
// we are in the timeline).
for (long bucket = bucketPointer; bucket < bucketProbeResult.endExclusive(); bucket++) {
bucketProgressStore.deleteBucketProgress(Bucket.of(shardAndStrategy, bucket));
}
// if the pointer is updated but progress is not, we will update progress to the right value on the
// next iteration (notice that we only use the pointer, and not the existing progress, to track where
// we are in the timeline).
sweepBucketPointerTable.updateStartingBucketForShardAndStrategy(
Bucket.of(shardAndStrategy, bucketProbeResult.endExclusive()));
sweepQueueProgressUpdater.progressTo(shardAndStrategy, bucketProbeResult.knownSweepProgress());
Expand All @@ -83,53 +80,114 @@ public void updateProgress(ShardAndStrategy shardAndStrategy) {
* if this is not the case, behaviour is undefined.
*/
private BucketProbeResult findCompletedBuckets(ShardAndStrategy shardAndStrategy, long searchStart) {
long lastEndExclusiveForCompleteBucket = -1;
for (long offset = 0; offset < MAX_BUCKETS_TO_CHECK_PER_ITERATION; offset++) {
long currentBucket = searchStart + offset;
Optional<BucketProgress> bucketProgress =
bucketProgressStore.getBucketProgress(Bucket.of(shardAndStrategy, currentBucket));
if (bucketProgress.isPresent()) {
BucketProgress presentBucketProgress = bucketProgress.get();
TimestampRange requiredRange = getTimestampRangeRecord(currentBucket);
if (presentBucketProgress.timestampProgress()
!= requiredRange.endExclusive() - requiredRange.startInclusive() - 1) {
// Bucket still has progress to go, so we can stop here.
Optional<TimestampRange> record = recordsTable.getTimestampRangeRecord(currentBucket);
Optional<SweepableBucket> writtenSweepableBucket =
sweepBucketsTable.getSweepableBucket(Bucket.of(shardAndStrategy, currentBucket));

if (record.isPresent()) { // bucket has to have been closed
Optional<BucketProbeResult> definitiveProbeResult =
getProbeResultForPotentiallyPartiallyCompleteClosedBucket(
record.orElseThrow(), bucketProgress, writtenSweepableBucket, currentBucket);
if (definitiveProbeResult.isPresent()) {
return definitiveProbeResult.get();
} else {
lastEndExclusiveForCompleteBucket = record.get().endExclusive();
}
} else {
// No record; we're possibly in an open bucket, or not created yet.

if (writtenSweepableBucket.isEmpty()) {
// there's no open bucket. This should be rare - it'll happen if the bucket assigner failed to open
// the next bucket after closing the last, of it the bucket assigner is behind and hit the cap
// creating closed buckets.

if (lastEndExclusiveForCompleteBucket == -1) {
throw new SafeRuntimeException(
"Failed to update shard progress as there are no buckets to"
+ " check sweep's progress. This should be rare and transient. If this error"
+ " occurs for more than 15 minutes and there are no new buckets being created,"
+ " it is likely that there is something preventing the bucket assigner"
+ " from progressing.",
SafeArg.of("shardAndStrategy", shardAndStrategy),
SafeArg.of("currentBucket", currentBucket));
}

return BucketProbeResult.builder()
.endExclusive(currentBucket)
.knownSweepProgress(
requiredRange.startInclusive() + presentBucketProgress.timestampProgress())
.knownSweepProgress(lastEndExclusiveForCompleteBucket - 1)
.build();
} else {
// Bucket fully processed, keep going.
if (offset == MAX_BUCKETS_TO_CHECK_PER_ITERATION - 1) {
// We finished the maximum number of buckets to check, and all were completed.
return BucketProbeResult.builder()
.endExclusive(currentBucket + 1)
.knownSweepProgress(requiredRange.endExclusive() + 1)
.build();
}
}
} else {
// No progress; we're ahead of the read pointer, so interpret as unstarted.
return BucketProbeResult.builder()
SweepableBucket bucket = writtenSweepableBucket.orElseThrow();
return getProbeResultForOpenBucket(bucketProgress, bucket, currentBucket);
}
}

if (lastEndExclusiveForCompleteBucket == -1) {
throw new SafeIllegalStateException("Didn't expect to get here");
}
return BucketProbeResult.builder()
.endExclusive(searchStart + MAX_BUCKETS_TO_CHECK_PER_ITERATION)
.knownSweepProgress(lastEndExclusiveForCompleteBucket - 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See concern - I'm pretty sure the old + 1 was wrong?

.build();
}

private Optional<BucketProbeResult> getProbeResultForPotentiallyPartiallyCompleteClosedBucket(
TimestampRange presentRecord,
Optional<BucketProgress> bucketProgress,
Optional<SweepableBucket> writtenSweepableBucket,
long currentBucket) {
// If there's progress, and it's not at the end, then it's incomplete.
// If there's progress, and it's at the end, it's finished
// If there's no progress and the sweepable bucket is present, then it's not started
// If there's no progress and the sweepable bucket is not present, then it's finished
// (all assuming the record is present, since the record is written at the end)
if (bucketProgress.isPresent()) {
BucketProgress presentBucketProgress = bucketProgress.get();
if (presentBucketProgress.timestampProgress()
!= presentRecord.endExclusive() - presentRecord.startInclusive() - 1) {
// Progress, but not at the end
return Optional.of(BucketProbeResult.builder()
.endExclusive(currentBucket)
.knownSweepProgress(
getTimestampRangeRecord(currentBucket).startInclusive() - 1L)
.build();
.knownSweepProgress(presentRecord.startInclusive() + presentBucketProgress.timestampProgress())
.build());
} else {
// progress and it's at the end (perhaps we caught the foreground task between updating
/// progress, and updating the record)
return Optional.empty();
}
} else if (writtenSweepableBucket.isPresent()) {
// no progress, record present _and_ sweepable bucket entry present implies we're unstarted
return Optional.of(BucketProbeResult.builder()
.endExclusive(currentBucket)
.knownSweepProgress(presentRecord.startInclusive() - 1L)
.build());
} else {
// no progress and the sweepable bucket is not present, so it's finished.
return Optional.empty();
}
throw new SafeIllegalStateException("Didn't expect to get here");
}

private TimestampRange getTimestampRangeRecord(long queriedBucket) {
try {
return recordsTable.getTimestampRangeRecord(queriedBucket);
} catch (NoSuchElementException exception) {
throw new SafeIllegalStateException(
"Timestamp range record not found. If this has happened for bucket 0, this is possible when"
+ " autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of a"
+ " bug in auto-scaling sweep. In either case, we will retry.",
exception,
SafeArg.of("queriedBucket", queriedBucket));
private BucketProbeResult getProbeResultForOpenBucket(
Optional<BucketProgress> bucketProgress, SweepableBucket bucket, long currentBucket) {
// No progress, then we haven't started yet.
if (bucketProgress.isEmpty()) {
return BucketProbeResult.builder()
.endExclusive(currentBucket)
.knownSweepProgress(bucket.timestampRange().startInclusive() - 1L)
.build();
} else {
// Progress in the open bucket!
BucketProgress presentBucketProgress = bucketProgress.get();
return BucketProbeResult.builder()
.endExclusive(currentBucket)
.knownSweepProgress(
bucket.timestampRange().startInclusive() + presentBucketProgress.timestampProgress())
.build();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -222,6 +221,15 @@ public Set<SweepableBucket> getSweepableBuckets(Set<Bucket> startBuckets) {
return readSweepableBucketRows(rows);
}

@Override
public Optional<SweepableBucket> getSweepableBucket(Bucket bucket) {
Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketsCell(bucket);
return readCell(
cell,
value -> SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue(
cell, value, timestampRangePersister));
}

@Override
public void putTimestampRangeForBucket(
Bucket bucket, Optional<TimestampRange> oldTimestampRange, TimestampRange newTimestampRange) {
Expand All @@ -239,10 +247,9 @@ public void deleteBucketEntry(Bucket bucket) {
}

@Override
public TimestampRange getTimestampRangeRecord(long bucketIdentifier) {
public Optional<TimestampRange> getTimestampRangeRecord(long bucketIdentifier) {
Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketRecordsCell(bucketIdentifier);
return readCell(cell, timestampRangePersister::tryDeserialize)
.orElseThrow(() -> new NoSuchElementException("No timestamp range record found for bucket identifier"));
return readCell(cell, timestampRangePersister::tryDeserialize);
}

@Override
Expand Down Expand Up @@ -290,7 +297,7 @@ private Set<SweepableBucket> readSweepableBucketRows(List<SweepAssignedBucketsRo
Long.MAX_VALUE);
return reads.entrySet().stream()
.map(entry -> SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue(
entry.getKey(), entry.getValue(), timestampRangePersister))
entry.getKey(), entry.getValue().getContents(), timestampRangePersister))
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.palantir.atlasdb.sweep.asts.bucketingthings;

import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.Value;
import com.palantir.atlasdb.schema.generated.SweepAssignedBucketsTable.SweepAssignedBucketsColumn;
import com.palantir.atlasdb.schema.generated.SweepAssignedBucketsTable.SweepAssignedBucketsRow;
import com.palantir.atlasdb.sweep.asts.Bucket;
Expand Down Expand Up @@ -76,11 +75,11 @@ SweepAssignedBucketsRow nextSweepBucketsRow(Bucket bucket) {
}

SweepableBucket fromSweepBucketCellAndValue(
Cell cell, Value value, ObjectPersister<TimestampRange> timestampRangePersister) {
Cell cell, byte[] value, ObjectPersister<TimestampRange> timestampRangePersister) {
SweepAssignedBucketsRow row = SweepAssignedBucketsRow.BYTES_HYDRATOR.hydrateFromBytes(cell.getRowName());
SweepAssignedBucketsColumn column =
SweepAssignedBucketsColumn.BYTES_HYDRATOR.hydrateFromBytes(cell.getColumnName());
TimestampRange timestampRange = timestampRangePersister.tryDeserialize(value.getContents());
TimestampRange timestampRange = timestampRangePersister.tryDeserialize(value);
int shard = Math.toIntExact(row.getShard()); // throws if invalid shard
return SweepableBucket.of(
Bucket.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package com.palantir.atlasdb.sweep.asts.bucketingthings;

import com.palantir.atlasdb.sweep.asts.TimestampRange;
import java.util.Optional;

public interface SweepBucketRecordsTable {
/**
* Returns the {@link TimestampRange} for the given bucket identifier, throwing a
* {@link java.util.NoSuchElementException} if one is not present.
* Returns a {@link TimestampRange} for the given bucket identifier, if one exists. Iff a bucket is closed, then
* the corresponding record will be present. (If the bucket is open, no record will be present.)
*/
TimestampRange getTimestampRangeRecord(long bucketIdentifier);
Optional<TimestampRange> getTimestampRangeRecord(long bucketIdentifier);

void putTimestampRangeRecord(long bucketIdentifier, TimestampRange timestampRange);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public interface SweepBucketsTable {
*/
Set<SweepableBucket> getSweepableBuckets(Set<Bucket> startBuckets);

Optional<SweepableBucket> getSweepableBucket(Bucket bucket);

void putTimestampRangeForBucket(
Bucket bucket, Optional<TimestampRange> oldTimestampRange, TimestampRange newTimestampRange);

Expand Down
Loading