diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/Bucket.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/Bucket.java new file mode 100644 index 00000000000..b0e27c545c3 --- /dev/null +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/Bucket.java @@ -0,0 +1,40 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.sweep.asts; + +import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; +import com.palantir.logsafe.Safe; +import org.immutables.value.Value; + +@Value.Immutable +public abstract class Bucket { + @Value.Parameter + public abstract ShardAndStrategy shardAndStrategy(); + + @Value.Parameter + public abstract long bucketIdentifier(); + + @Safe + @Override + public String toString() { + return shardAndStrategy().toText() + " and partition " + bucketIdentifier(); + } + + public static Bucket of(ShardAndStrategy shardAndStrategy, long bucketIdentifier) { + return ImmutableBucket.of(shardAndStrategy, bucketIdentifier); + } +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/CandidateSweepableBucketRetriever.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/CandidateSweepableBucketRetriever.java index 4a9513c49b8..5e4c5d52776 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/CandidateSweepableBucketRetriever.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/CandidateSweepableBucketRetriever.java @@ -16,7 +16,6 @@ package com.palantir.atlasdb.sweep.asts; -import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket; import com.palantir.refreshable.Disposable; import java.util.Set; import java.util.function.Consumer; diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultCandidateSweepableBucketRetriever.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultCandidateSweepableBucketRetriever.java index 1de5f919244..07bf1b3d618 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultCandidateSweepableBucketRetriever.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultCandidateSweepableBucketRetriever.java @@ -17,7 +17,6 @@ package com.palantir.atlasdb.sweep.asts; import com.google.common.annotations.VisibleForTesting; -import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket; import com.palantir.common.concurrent.CoalescingSupplier; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.logger.SafeLogger; diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinator.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinator.java index 22547d13d61..f4f5e6dd247 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinator.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinator.java @@ -119,7 +119,7 @@ private void updateBuckets(Set newBuckets) { Map> partition = newBuckets.stream() .sorted(SweepableBucketComparator.INSTANCE) .collect(Collectors.groupingBy( - bucket -> bucket.shardAndStrategy().shard())); + bucket -> bucket.bucket().shardAndStrategy().shard())); List> firstBucketsOfEachShard = partition.values().stream() .filter(list -> !list.isEmpty()) @@ -148,12 +148,15 @@ private enum SweepableBucketComparator implements Comparator { @Override public int compare(SweepableBucket firstBucket, SweepableBucket secondBucket) { int shardComparison = Integer.compare( - firstBucket.shardAndStrategy().shard(), - secondBucket.shardAndStrategy().shard()); + firstBucket.bucket().shardAndStrategy().shard(), + secondBucket.bucket().shardAndStrategy().shard()); if (shardComparison != 0) { return shardComparison; } - return Long.compare(firstBucket.bucketIdentifier(), secondBucket.bucketIdentifier()); + return Long.compare( + firstBucket.bucket().bucketIdentifier(), + secondBucket.bucket().bucketIdentifier()); + // We're explicitly not comparing timestamp range, because it's irrelevant to the algorithm } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/FaultTolerantShardedRetrievalStrategy.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/FaultTolerantShardedRetrievalStrategy.java index d9253dbcb3a..4a6cc019d21 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/FaultTolerantShardedRetrievalStrategy.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/FaultTolerantShardedRetrievalStrategy.java @@ -18,7 +18,6 @@ import com.google.common.collect.ImmutableMap; import com.palantir.atlasdb.sweep.asts.ShardedSweepTimestampManager.SweepTimestamps; -import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket; import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.logger.SafeLogger; diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/ShardedRetrievalStrategy.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/ShardedRetrievalStrategy.java index 40345b4b23e..7e8f3283772 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/ShardedRetrievalStrategy.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/ShardedRetrievalStrategy.java @@ -17,7 +17,6 @@ package com.palantir.atlasdb.sweep.asts; import com.palantir.atlasdb.sweep.asts.ShardedSweepTimestampManager.SweepTimestamps; -import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket; import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; import java.util.List; diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetriever.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetriever.java index 45a1003f841..5ad2c9761cf 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetriever.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetriever.java @@ -18,7 +18,6 @@ import com.google.common.annotations.VisibleForTesting; import com.palantir.atlasdb.sweep.asts.ShardedSweepTimestampManager.SweepTimestamps; -import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket; import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; import com.palantir.atlasdb.table.description.SweeperStrategy; import com.palantir.common.base.RunnableCheckedException; diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SweepStateCoordinator.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SweepStateCoordinator.java index 02298e0e998..5aecd963d0c 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SweepStateCoordinator.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SweepStateCoordinator.java @@ -16,10 +16,7 @@ package com.palantir.atlasdb.sweep.asts; -import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; -import com.palantir.logsafe.Safe; import java.util.function.Consumer; -import org.immutables.value.Value; public interface SweepStateCoordinator { SweepOutcome tryRunTaskWithBucket(Consumer task); @@ -29,35 +26,4 @@ enum SweepOutcome { NOTHING_TO_SWEEP, SWEPT; } - - @Value.Immutable - @Safe - abstract class SweepableBucket implements Comparable { - @Value.Parameter - abstract ShardAndStrategy shardAndStrategy(); - - // It's really just the fine partition, but we make it opaque so we can change it in the future - @Value.Parameter - abstract long bucketIdentifier(); - - @Safe - @Override - public String toString() { - return shardAndStrategy().toText() + " and partition " + bucketIdentifier(); - } - - @Override - public int compareTo(SweepableBucket other) { - int shardComparison = Integer.compare( - shardAndStrategy().shard(), other.shardAndStrategy().shard()); - if (shardComparison != 0) { - return shardComparison; - } - return Long.compare(bucketIdentifier(), other.bucketIdentifier()); - } - - static SweepableBucket of(ShardAndStrategy shardAndStrategy, long bucketIdentifier) { - return ImmutableSweepableBucket.of(shardAndStrategy, bucketIdentifier); - } - } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SweepableBucket.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SweepableBucket.java new file mode 100644 index 00000000000..a44993a48ed --- /dev/null +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SweepableBucket.java @@ -0,0 +1,45 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.sweep.asts; + +import org.immutables.value.Value; + +@Value.Immutable +public interface SweepableBucket { + @Value.Parameter + Bucket bucket(); + + @Value.Parameter + TimestampRange timestampRange(); + + static SweepableBucket of(Bucket bucket, TimestampRange timestampRange) { + return ImmutableSweepableBucket.of(bucket, timestampRange); + } + + @Value.Immutable + interface TimestampRange { + @Value.Parameter + long startInclusive(); + + @Value.Parameter + long endExclusive(); + + static TimestampRange of(long startInclusive, long endExclusive) { + return ImmutableTimestampRange.of(startInclusive, endExclusive); + } + } +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SweepableBucketRetriever.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SweepableBucketRetriever.java index 660e1e5f9d9..6431994d37a 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SweepableBucketRetriever.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SweepableBucketRetriever.java @@ -16,7 +16,6 @@ package com.palantir.atlasdb.sweep.asts; -import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket; import java.util.Set; public interface SweepableBucketRetriever { diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultCandidateSweepableBucketRetrieverTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultCandidateSweepableBucketRetrieverTest.java index 11315aab78e..c24a17830ef 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultCandidateSweepableBucketRetrieverTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultCandidateSweepableBucketRetrieverTest.java @@ -19,7 +19,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.lenient; -import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket; +import com.palantir.atlasdb.sweep.asts.SweepableBucket.TimestampRange; import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; import com.palantir.atlasdb.table.description.SweeperStrategy; import com.palantir.refreshable.Disposable; @@ -48,8 +48,10 @@ @ExtendWith(MockitoExtension.class) class DefaultCandidateSweepableBucketRetrieverTest { private static final Set BUCKETS = Set.of( - SweepableBucket.of(ShardAndStrategy.of(1, SweeperStrategy.CONSERVATIVE), 1), - SweepableBucket.of(ShardAndStrategy.of(2, SweeperStrategy.THOROUGH), 2)); + SweepableBucket.of( + Bucket.of(ShardAndStrategy.of(1, SweeperStrategy.CONSERVATIVE), 1), TimestampRange.of(1, 3)), + SweepableBucket.of( + Bucket.of(ShardAndStrategy.of(2, SweeperStrategy.THOROUGH), 2), TimestampRange.of(4, 6))); private static final SweepableBucketRetriever WITH_BUCKETS = () -> BUCKETS; private final SettableRefreshable minimumDurationBetweenRefresh = Refreshable.create(Duration.ZERO); diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinatorTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinatorTest.java index db8a07dea58..b3e3a2b0673 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinatorTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/DefaultSweepStateCoordinatorTest.java @@ -28,7 +28,7 @@ import com.google.common.collect.Sets; import com.google.common.collect.Streams; import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepOutcome; -import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket; +import com.palantir.atlasdb.sweep.asts.SweepableBucket.TimestampRange; import com.palantir.atlasdb.sweep.asts.locks.Lockable; import com.palantir.atlasdb.sweep.asts.locks.Lockable.LockedItem; import com.palantir.atlasdb.sweep.asts.locks.LockableFactory; @@ -267,7 +267,9 @@ private boolean isBucketLocked(SweepableBucket bucket) { } private static SweepableBucket bucket(int shard, int identifier) { - return SweepableBucket.of(ShardAndStrategy.of(shard, SweeperStrategy.CONSERVATIVE), identifier); + return SweepableBucket.of( + Bucket.of(ShardAndStrategy.of(shard, SweeperStrategy.CONSERVATIVE), identifier), + TimestampRange.of(1, 3)); } // When we have assertions _inside_ tryRunTaskWithBucket, it's possible for those tests to spuriously pass if diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/FaultTolerantShardedRetrievalStrategyTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/FaultTolerantShardedRetrievalStrategyTest.java index e83c2a15c61..1321ee9d5f7 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/FaultTolerantShardedRetrievalStrategyTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/FaultTolerantShardedRetrievalStrategyTest.java @@ -22,7 +22,7 @@ import static org.mockito.Mockito.when; import com.palantir.atlasdb.sweep.asts.ShardedSweepTimestampManager.SweepTimestamps; -import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket; +import com.palantir.atlasdb.sweep.asts.SweepableBucket.TimestampRange; import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; import com.palantir.atlasdb.table.description.SweeperStrategy; import java.util.List; @@ -55,7 +55,8 @@ public void setup() { @Test public void passesThroughSuccessfulRequest() { - List buckets = List.of(SweepableBucket.of(SHARD_AND_STRATEGY, 123L)); + List buckets = + List.of(SweepableBucket.of(Bucket.of(SHARD_AND_STRATEGY, 1), TimestampRange.of(1, 3))); when(delegate.getSweepableBucketsForShard(SHARD_AND_STRATEGY, SWEEP_TIMESTAMPS)) .thenReturn(buckets); assertThat(strategy.getSweepableBucketsForShard(SHARD_AND_STRATEGY, SWEEP_TIMESTAMPS)) diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetrieverTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetrieverTest.java index e4acbf6e6f9..f7db28131ec 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetrieverTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetrieverTest.java @@ -19,7 +19,7 @@ import static org.assertj.core.api.Assertions.assertThat; import com.palantir.atlasdb.sweep.asts.ShardedSweepTimestampManager.SweepTimestamps; -import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket; +import com.palantir.atlasdb.sweep.asts.SweepableBucket.TimestampRange; import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; import com.palantir.atlasdb.table.description.SweeperStrategy; import com.palantir.common.concurrent.PTExecutors; @@ -153,7 +153,9 @@ public Set getRequestedShards() { private static List generateList(int shard) { return IntStream.range(0, new Random().nextInt(10)) - .mapToObj(i -> SweepableBucket.of(ShardAndStrategy.of(shard, SweeperStrategy.CONSERVATIVE), i)) + .mapToObj(i -> SweepableBucket.of( + Bucket.of(ShardAndStrategy.of(shard, SweeperStrategy.CONSERVATIVE), i), + TimestampRange.of(1, 3))) .collect(Collectors.toList()); } }