Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
refactor: Moving around some bucket types (#7260)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdaudali authored Sep 5, 2024
1 parent aeb1b4f commit 8f7a800
Show file tree
Hide file tree
Showing 14 changed files with 108 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep.asts;

import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
import com.palantir.logsafe.Safe;
import org.immutables.value.Value;

@Value.Immutable
public abstract class Bucket {
@Value.Parameter
public abstract ShardAndStrategy shardAndStrategy();

@Value.Parameter
public abstract long bucketIdentifier();

@Safe
@Override
public String toString() {
return shardAndStrategy().toText() + " and partition " + bucketIdentifier();
}

public static Bucket of(ShardAndStrategy shardAndStrategy, long bucketIdentifier) {
return ImmutableBucket.of(shardAndStrategy, bucketIdentifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.palantir.atlasdb.sweep.asts;

import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket;
import com.palantir.refreshable.Disposable;
import java.util.Set;
import java.util.function.Consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.palantir.atlasdb.sweep.asts;

import com.google.common.annotations.VisibleForTesting;
import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket;
import com.palantir.common.concurrent.CoalescingSupplier;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private void updateBuckets(Set<SweepableBucket> newBuckets) {
Map<Integer, List<SweepableBucket>> partition = newBuckets.stream()
.sorted(SweepableBucketComparator.INSTANCE)
.collect(Collectors.groupingBy(
bucket -> bucket.shardAndStrategy().shard()));
bucket -> bucket.bucket().shardAndStrategy().shard()));

List<Lockable<SweepableBucket>> firstBucketsOfEachShard = partition.values().stream()
.filter(list -> !list.isEmpty())
Expand Down Expand Up @@ -148,12 +148,15 @@ private enum SweepableBucketComparator implements Comparator<SweepableBucket> {
@Override
public int compare(SweepableBucket firstBucket, SweepableBucket secondBucket) {
int shardComparison = Integer.compare(
firstBucket.shardAndStrategy().shard(),
secondBucket.shardAndStrategy().shard());
firstBucket.bucket().shardAndStrategy().shard(),
secondBucket.bucket().shardAndStrategy().shard());
if (shardComparison != 0) {
return shardComparison;
}
return Long.compare(firstBucket.bucketIdentifier(), secondBucket.bucketIdentifier());
return Long.compare(
firstBucket.bucket().bucketIdentifier(),
secondBucket.bucket().bucketIdentifier());
// We're explicitly not comparing timestamp range, because it's irrelevant to the algorithm
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.common.collect.ImmutableMap;
import com.palantir.atlasdb.sweep.asts.ShardedSweepTimestampManager.SweepTimestamps;
import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket;
import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.palantir.atlasdb.sweep.asts;

import com.palantir.atlasdb.sweep.asts.ShardedSweepTimestampManager.SweepTimestamps;
import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket;
import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.palantir.atlasdb.sweep.asts.ShardedSweepTimestampManager.SweepTimestamps;
import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket;
import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
import com.palantir.atlasdb.table.description.SweeperStrategy;
import com.palantir.common.base.RunnableCheckedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

package com.palantir.atlasdb.sweep.asts;

import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
import com.palantir.logsafe.Safe;
import java.util.function.Consumer;
import org.immutables.value.Value;

public interface SweepStateCoordinator {
SweepOutcome tryRunTaskWithBucket(Consumer<SweepableBucket> task);
Expand All @@ -29,35 +26,4 @@ enum SweepOutcome {
NOTHING_TO_SWEEP,
SWEPT;
}

@Value.Immutable
@Safe
abstract class SweepableBucket implements Comparable<SweepableBucket> {
@Value.Parameter
abstract ShardAndStrategy shardAndStrategy();

// It's really just the fine partition, but we make it opaque so we can change it in the future
@Value.Parameter
abstract long bucketIdentifier();

@Safe
@Override
public String toString() {
return shardAndStrategy().toText() + " and partition " + bucketIdentifier();
}

@Override
public int compareTo(SweepableBucket other) {
int shardComparison = Integer.compare(
shardAndStrategy().shard(), other.shardAndStrategy().shard());
if (shardComparison != 0) {
return shardComparison;
}
return Long.compare(bucketIdentifier(), other.bucketIdentifier());
}

static SweepableBucket of(ShardAndStrategy shardAndStrategy, long bucketIdentifier) {
return ImmutableSweepableBucket.of(shardAndStrategy, bucketIdentifier);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep.asts;

import org.immutables.value.Value;

@Value.Immutable
public interface SweepableBucket {
@Value.Parameter
Bucket bucket();

@Value.Parameter
TimestampRange timestampRange();

static SweepableBucket of(Bucket bucket, TimestampRange timestampRange) {
return ImmutableSweepableBucket.of(bucket, timestampRange);
}

@Value.Immutable
interface TimestampRange {
@Value.Parameter
long startInclusive();

@Value.Parameter
long endExclusive();

static TimestampRange of(long startInclusive, long endExclusive) {
return ImmutableTimestampRange.of(startInclusive, endExclusive);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.palantir.atlasdb.sweep.asts;

import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket;
import java.util.Set;

public interface SweepableBucketRetriever {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.lenient;

import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket;
import com.palantir.atlasdb.sweep.asts.SweepableBucket.TimestampRange;
import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
import com.palantir.atlasdb.table.description.SweeperStrategy;
import com.palantir.refreshable.Disposable;
Expand Down Expand Up @@ -48,8 +48,10 @@
@ExtendWith(MockitoExtension.class)
class DefaultCandidateSweepableBucketRetrieverTest {
private static final Set<SweepableBucket> BUCKETS = Set.of(
SweepableBucket.of(ShardAndStrategy.of(1, SweeperStrategy.CONSERVATIVE), 1),
SweepableBucket.of(ShardAndStrategy.of(2, SweeperStrategy.THOROUGH), 2));
SweepableBucket.of(
Bucket.of(ShardAndStrategy.of(1, SweeperStrategy.CONSERVATIVE), 1), TimestampRange.of(1, 3)),
SweepableBucket.of(
Bucket.of(ShardAndStrategy.of(2, SweeperStrategy.THOROUGH), 2), TimestampRange.of(4, 6)));
private static final SweepableBucketRetriever WITH_BUCKETS = () -> BUCKETS;

private final SettableRefreshable<Duration> minimumDurationBetweenRefresh = Refreshable.create(Duration.ZERO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepOutcome;
import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket;
import com.palantir.atlasdb.sweep.asts.SweepableBucket.TimestampRange;
import com.palantir.atlasdb.sweep.asts.locks.Lockable;
import com.palantir.atlasdb.sweep.asts.locks.Lockable.LockedItem;
import com.palantir.atlasdb.sweep.asts.locks.LockableFactory;
Expand Down Expand Up @@ -267,7 +267,9 @@ private boolean isBucketLocked(SweepableBucket bucket) {
}

private static SweepableBucket bucket(int shard, int identifier) {
return SweepableBucket.of(ShardAndStrategy.of(shard, SweeperStrategy.CONSERVATIVE), identifier);
return SweepableBucket.of(
Bucket.of(ShardAndStrategy.of(shard, SweeperStrategy.CONSERVATIVE), identifier),
TimestampRange.of(1, 3));
}

// When we have assertions _inside_ tryRunTaskWithBucket, it's possible for those tests to spuriously pass if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static org.mockito.Mockito.when;

import com.palantir.atlasdb.sweep.asts.ShardedSweepTimestampManager.SweepTimestamps;
import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket;
import com.palantir.atlasdb.sweep.asts.SweepableBucket.TimestampRange;
import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
import com.palantir.atlasdb.table.description.SweeperStrategy;
import java.util.List;
Expand Down Expand Up @@ -55,7 +55,8 @@ public void setup() {

@Test
public void passesThroughSuccessfulRequest() {
List<SweepableBucket> buckets = List.of(SweepableBucket.of(SHARD_AND_STRATEGY, 123L));
List<SweepableBucket> buckets =
List.of(SweepableBucket.of(Bucket.of(SHARD_AND_STRATEGY, 1), TimestampRange.of(1, 3)));
when(delegate.getSweepableBucketsForShard(SHARD_AND_STRATEGY, SWEEP_TIMESTAMPS))
.thenReturn(buckets);
assertThat(strategy.getSweepableBucketsForShard(SHARD_AND_STRATEGY, SWEEP_TIMESTAMPS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.palantir.atlasdb.sweep.asts.ShardedSweepTimestampManager.SweepTimestamps;
import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket;
import com.palantir.atlasdb.sweep.asts.SweepableBucket.TimestampRange;
import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
import com.palantir.atlasdb.table.description.SweeperStrategy;
import com.palantir.common.concurrent.PTExecutors;
Expand Down Expand Up @@ -153,7 +153,9 @@ public Set<Integer> getRequestedShards() {

private static List<SweepableBucket> generateList(int shard) {
return IntStream.range(0, new Random().nextInt(10))
.mapToObj(i -> SweepableBucket.of(ShardAndStrategy.of(shard, SweeperStrategy.CONSERVATIVE), i))
.mapToObj(i -> SweepableBucket.of(
Bucket.of(ShardAndStrategy.of(shard, SweeperStrategy.CONSERVATIVE), i),
TimestampRange.of(1, 3)))
.collect(Collectors.toList());
}
}
Expand Down

0 comments on commit 8f7a800

Please sign in to comment.