diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinator.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinator.java index d62c51587b..8c251d9daf 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinator.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinator.java @@ -22,6 +22,7 @@ import com.palantir.atlasdb.sweep.asts.locks.Lockable.LockedItem; import com.palantir.atlasdb.sweep.asts.locks.LockableFactory; import com.palantir.atlasdb.sweep.metrics.TargetedSweepProgressMetrics; +import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; @@ -47,7 +48,7 @@ public final class DefaultSweepStateCoordinator implements SweepStateCoordinator private volatile Set> seenBuckets = ConcurrentHashMap.newKeySet(); private volatile BucketsLists bucketsLists = ImmutableBucketsLists.builder() - .firstBucketsOfEachShard(List.of()) + .firstBucketsOfEachShardAndStrategy(List.of()) .remainingBuckets(List.of()) .build(); @@ -64,7 +65,7 @@ public final class DefaultSweepStateCoordinator implements SweepStateCoordinator candidateSweepableBucketRetriever.subscribeToChanges(this::updateBuckets); progressMetrics.estimatedPendingNumberOfBucketsToBeSwept(() -> { BucketsLists currentBucketsLists = bucketsLists; - return currentBucketsLists.firstBucketsOfEachShard().size() + return currentBucketsLists.firstBucketsOfEachShardAndStrategy().size() + currentBucketsLists.remainingBuckets().size() - seenBuckets.size(); }); @@ -92,7 +93,7 @@ public SweepOutcome tryRunTaskWithBucket(Consumer task) { // candidates from the _latest_ seen bucket set. BucketsLists currentBucketsLists = bucketsLists; if (seenBuckets.size() - >= (currentBucketsLists.firstBucketsOfEachShard().size() + >= (currentBucketsLists.firstBucketsOfEachShardAndStrategy().size() + currentBucketsLists.remainingBuckets().size())) { candidateSweepableBucketRetriever.requestUpdate(); return SweepOutcome.NOTHING_TO_SWEEP; @@ -109,7 +110,7 @@ public SweepOutcome tryRunTaskWithBucket(Consumer task) { } private Optional> chooseBucket(BucketsLists currentBucketsLists) { - return getFirstUnlockedBucket(currentBucketsLists.firstBucketsOfEachShard().stream()) + return getFirstUnlockedBucket(currentBucketsLists.firstBucketsOfEachShardAndStrategy().stream()) .or(() -> randomUnlockedBucket(currentBucketsLists)); } @@ -134,34 +135,34 @@ private Optional> getFirstUnlockedBucket(Stream newBuckets) { - Map> partition = newBuckets.stream() + Map> partition = newBuckets.stream() .sorted(SweepableBucketComparator.INSTANCE) - .collect(Collectors.groupingBy( - bucket -> bucket.bucket().shardAndStrategy().shard())); + .collect(Collectors.groupingBy(bucket -> bucket.bucket().shardAndStrategy())); - List> firstBucketsOfEachShard = partition.values().stream() + List> firstBucketsOfEachShardAndStrategy = partition.values().stream() .filter(list -> !list.isEmpty()) .map(list -> list.get(0)) .map(lockableFactory::createLockable) - .collect(Collectors.toUnmodifiableList()); + .toList(); List> remainingBuckets = partition.values().stream() .flatMap(list -> list.stream().skip(1)) .map(lockableFactory::createLockable) - .collect(Collectors.toUnmodifiableList()); + .toList(); // There's a delay between setting each variable, but we do not require (for correctness) that these two // variables are updated atomically. bucketsLists = ImmutableBucketsLists.builder() - .firstBucketsOfEachShard(firstBucketsOfEachShard) + .firstBucketsOfEachShardAndStrategy(firstBucketsOfEachShardAndStrategy) .remainingBuckets(remainingBuckets) .build(); seenBuckets = ConcurrentHashMap.newKeySet(); log.info( - "Updated sweepable buckets (first buckets per shard: {}, remaining buckets: {}).", - SafeArg.of("firstBucketsOfEachShard", firstBucketsOfEachShard), - SafeArg.of("remainingBuckets", remainingBuckets)); + "Updated sweepable buckets (number of first buckets per shard and strategy: {}," + + " number of remaining buckets: {}).", + SafeArg.of("firstBucketsOfEachShardAndStrategySize", firstBucketsOfEachShardAndStrategy.size()), + SafeArg.of("remainingBucketsSize", remainingBuckets.size())); } private enum SweepableBucketComparator implements Comparator { @@ -184,7 +185,7 @@ public int compare(SweepableBucket firstBucket, SweepableBucket secondBucket) { @Value.Immutable interface BucketsLists { - List> firstBucketsOfEachShard(); + List> firstBucketsOfEachShardAndStrategy(); List> remainingBuckets(); } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinatorTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinatorTest.java index 5255d284c6..f8e195fec5 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinatorTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinatorTest.java @@ -102,17 +102,31 @@ public void beforeEach() { } @Test - public void selectsHeadOfShardsDeterminedAtRefresh() { + public void selectsHeadOfShardsAndStrategyDeterminedAtRefresh() { int shards = 10; - Set firstBucketPerShard = + Set firstBucketPerShardConservative = // i, i to show that it's not just counting 0 as the head. - IntStream.range(0, shards).mapToObj(i -> bucket(i, i)).collect(Collectors.toSet()); - Set remainingBuckets = - IntStream.range(0, shards).mapToObj(i -> bucket(i, i + 1)).collect(Collectors.toSet()); + IntStream.range(0, shards) + .mapToObj(i -> bucket(i, i, SweeperStrategy.CONSERVATIVE)) + .collect(Collectors.toSet()); + + // Same shards as conservative, but a later bucket. Should still appear as we should be factoring in strategy + // into the head bucket heuristic + Set firstBucketPerShardThorough = IntStream.range(0, shards) + .mapToObj(i -> bucket(i, i + 1, SweeperStrategy.THOROUGH)) + .collect(Collectors.toSet()); + + Set firstBucketPerShard = + Sets.union(firstBucketPerShardConservative, firstBucketPerShardThorough); + Set remainingBuckets = IntStream.range(0, shards) + .boxed() + .flatMap(i -> Stream.of( + bucket(i, i + 1, SweeperStrategy.CONSERVATIVE), bucket(i, i + 2, SweeperStrategy.THOROUGH))) + .collect(Collectors.toSet()); Set chosenBuckets = new HashSet<>(); buckets.update(Sets.union(firstBucketPerShard, remainingBuckets)); - for (int i = 0; i < shards; i++) { + for (int i = 0; i < shards * 2; i++) { coordinator.tryRunTaskWithBucket(chosenBuckets::add); } assertThat(chosenBuckets).containsExactlyInAnyOrderElementsOf(firstBucketPerShard); @@ -271,9 +285,11 @@ private boolean isBucketLocked(SweepableBucket bucket) { } private static SweepableBucket bucket(int shard, int identifier) { - return SweepableBucket.of( - Bucket.of(ShardAndStrategy.of(shard, SweeperStrategy.CONSERVATIVE), identifier), - TimestampRange.of(1, 3)); + return bucket(shard, identifier, SweeperStrategy.CONSERVATIVE); + } + + private static SweepableBucket bucket(int shard, int identifier, SweeperStrategy strategy) { + return SweepableBucket.of(Bucket.of(ShardAndStrategy.of(shard, strategy), identifier), TimestampRange.of(1, 3)); } // When we have assertions _inside_ tryRunTaskWithBucket, it's possible for those tests to spuriously pass if