Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[ASTS] improvement: Initial buckets for sweeping should include strat…
Browse files Browse the repository at this point in the history
…egy in heuristic (#7423)
  • Loading branch information
mdaudali authored Nov 6, 2024
1 parent 6a9bbd2 commit 80002f0
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.palantir.atlasdb.sweep.asts.locks.Lockable.LockedItem;
import com.palantir.atlasdb.sweep.asts.locks.LockableFactory;
import com.palantir.atlasdb.sweep.metrics.TargetedSweepProgressMetrics;
import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
Expand All @@ -47,7 +48,7 @@ public final class DefaultSweepStateCoordinator implements SweepStateCoordinator

private volatile Set<Lockable<SweepableBucket>> seenBuckets = ConcurrentHashMap.newKeySet();
private volatile BucketsLists bucketsLists = ImmutableBucketsLists.builder()
.firstBucketsOfEachShard(List.of())
.firstBucketsOfEachShardAndStrategy(List.of())
.remainingBuckets(List.of())
.build();

Expand All @@ -64,7 +65,7 @@ public final class DefaultSweepStateCoordinator implements SweepStateCoordinator
candidateSweepableBucketRetriever.subscribeToChanges(this::updateBuckets);
progressMetrics.estimatedPendingNumberOfBucketsToBeSwept(() -> {
BucketsLists currentBucketsLists = bucketsLists;
return currentBucketsLists.firstBucketsOfEachShard().size()
return currentBucketsLists.firstBucketsOfEachShardAndStrategy().size()
+ currentBucketsLists.remainingBuckets().size()
- seenBuckets.size();
});
Expand Down Expand Up @@ -92,7 +93,7 @@ public SweepOutcome tryRunTaskWithBucket(Consumer<SweepableBucket> task) {
// candidates from the _latest_ seen bucket set.
BucketsLists currentBucketsLists = bucketsLists;
if (seenBuckets.size()
>= (currentBucketsLists.firstBucketsOfEachShard().size()
>= (currentBucketsLists.firstBucketsOfEachShardAndStrategy().size()
+ currentBucketsLists.remainingBuckets().size())) {
candidateSweepableBucketRetriever.requestUpdate();
return SweepOutcome.NOTHING_TO_SWEEP;
Expand All @@ -109,7 +110,7 @@ public SweepOutcome tryRunTaskWithBucket(Consumer<SweepableBucket> task) {
}

private Optional<LockedItem<SweepableBucket>> chooseBucket(BucketsLists currentBucketsLists) {
return getFirstUnlockedBucket(currentBucketsLists.firstBucketsOfEachShard().stream())
return getFirstUnlockedBucket(currentBucketsLists.firstBucketsOfEachShardAndStrategy().stream())
.or(() -> randomUnlockedBucket(currentBucketsLists));
}

Expand All @@ -134,34 +135,34 @@ private Optional<LockedItem<SweepableBucket>> getFirstUnlockedBucket(Stream<Lock
}

private void updateBuckets(Set<SweepableBucket> newBuckets) {
Map<Integer, List<SweepableBucket>> partition = newBuckets.stream()
Map<ShardAndStrategy, List<SweepableBucket>> partition = newBuckets.stream()
.sorted(SweepableBucketComparator.INSTANCE)
.collect(Collectors.groupingBy(
bucket -> bucket.bucket().shardAndStrategy().shard()));
.collect(Collectors.groupingBy(bucket -> bucket.bucket().shardAndStrategy()));

List<Lockable<SweepableBucket>> firstBucketsOfEachShard = partition.values().stream()
List<Lockable<SweepableBucket>> firstBucketsOfEachShardAndStrategy = partition.values().stream()
.filter(list -> !list.isEmpty())
.map(list -> list.get(0))
.map(lockableFactory::createLockable)
.collect(Collectors.toUnmodifiableList());
.toList();

List<Lockable<SweepableBucket>> remainingBuckets = partition.values().stream()
.flatMap(list -> list.stream().skip(1))
.map(lockableFactory::createLockable)
.collect(Collectors.toUnmodifiableList());
.toList();

// There's a delay between setting each variable, but we do not require (for correctness) that these two
// variables are updated atomically.

bucketsLists = ImmutableBucketsLists.builder()
.firstBucketsOfEachShard(firstBucketsOfEachShard)
.firstBucketsOfEachShardAndStrategy(firstBucketsOfEachShardAndStrategy)
.remainingBuckets(remainingBuckets)
.build();
seenBuckets = ConcurrentHashMap.newKeySet();
log.info(
"Updated sweepable buckets (first buckets per shard: {}, remaining buckets: {}).",
SafeArg.of("firstBucketsOfEachShard", firstBucketsOfEachShard),
SafeArg.of("remainingBuckets", remainingBuckets));
"Updated sweepable buckets (number of first buckets per shard and strategy: {},"
+ " number of remaining buckets: {}).",
SafeArg.of("firstBucketsOfEachShardAndStrategySize", firstBucketsOfEachShardAndStrategy.size()),
SafeArg.of("remainingBucketsSize", remainingBuckets.size()));
}

private enum SweepableBucketComparator implements Comparator<SweepableBucket> {
Expand All @@ -184,7 +185,7 @@ public int compare(SweepableBucket firstBucket, SweepableBucket secondBucket) {

@Value.Immutable
interface BucketsLists {
List<Lockable<SweepableBucket>> firstBucketsOfEachShard();
List<Lockable<SweepableBucket>> firstBucketsOfEachShardAndStrategy();

List<Lockable<SweepableBucket>> remainingBuckets();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,31 @@ public void beforeEach() {
}

@Test
public void selectsHeadOfShardsDeterminedAtRefresh() {
public void selectsHeadOfShardsAndStrategyDeterminedAtRefresh() {
int shards = 10;
Set<SweepableBucket> firstBucketPerShard =
Set<SweepableBucket> firstBucketPerShardConservative =
// i, i to show that it's not just counting 0 as the head.
IntStream.range(0, shards).mapToObj(i -> bucket(i, i)).collect(Collectors.toSet());
Set<SweepableBucket> remainingBuckets =
IntStream.range(0, shards).mapToObj(i -> bucket(i, i + 1)).collect(Collectors.toSet());
IntStream.range(0, shards)
.mapToObj(i -> bucket(i, i, SweeperStrategy.CONSERVATIVE))
.collect(Collectors.toSet());

// Same shards as conservative, but a later bucket. Should still appear as we should be factoring in strategy
// into the head bucket heuristic
Set<SweepableBucket> firstBucketPerShardThorough = IntStream.range(0, shards)
.mapToObj(i -> bucket(i, i + 1, SweeperStrategy.THOROUGH))
.collect(Collectors.toSet());

Set<SweepableBucket> firstBucketPerShard =
Sets.union(firstBucketPerShardConservative, firstBucketPerShardThorough);
Set<SweepableBucket> remainingBuckets = IntStream.range(0, shards)
.boxed()
.flatMap(i -> Stream.of(
bucket(i, i + 1, SweeperStrategy.CONSERVATIVE), bucket(i, i + 2, SweeperStrategy.THOROUGH)))
.collect(Collectors.toSet());

Set<SweepableBucket> chosenBuckets = new HashSet<>();
buckets.update(Sets.union(firstBucketPerShard, remainingBuckets));
for (int i = 0; i < shards; i++) {
for (int i = 0; i < shards * 2; i++) {
coordinator.tryRunTaskWithBucket(chosenBuckets::add);
}
assertThat(chosenBuckets).containsExactlyInAnyOrderElementsOf(firstBucketPerShard);
Expand Down Expand Up @@ -271,9 +285,11 @@ private boolean isBucketLocked(SweepableBucket bucket) {
}

private static SweepableBucket bucket(int shard, int identifier) {
return SweepableBucket.of(
Bucket.of(ShardAndStrategy.of(shard, SweeperStrategy.CONSERVATIVE), identifier),
TimestampRange.of(1, 3));
return bucket(shard, identifier, SweeperStrategy.CONSERVATIVE);
}

private static SweepableBucket bucket(int shard, int identifier, SweeperStrategy strategy) {
return SweepableBucket.of(Bucket.of(ShardAndStrategy.of(shard, strategy), identifier), TimestampRange.of(1, 3));
}

// When we have assertions _inside_ tryRunTaskWithBucket, it's possible for those tests to spuriously pass if
Expand Down

0 comments on commit 80002f0

Please sign in to comment.