diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java index b8201daa01..94e4b2ba84 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/DefaultSweepAssignedBucketStore.java @@ -221,6 +221,15 @@ public Set getSweepableBuckets(Set startBuckets) { return readSweepableBucketRows(rows); } + @Override + public Optional getSweepableBucket(Bucket bucket) { + Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketsCell(bucket); + return readCell( + cell, + value -> SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue( + cell, value, timestampRangePersister)); + } + @Override public void putTimestampRangeForBucket( Bucket bucket, Optional oldTimestampRange, TimestampRange newTimestampRange) { @@ -288,7 +297,7 @@ private Set readSweepableBucketRows(List SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue( - entry.getKey(), entry.getValue(), timestampRangePersister)) + entry.getKey(), entry.getValue().getContents(), timestampRangePersister)) .collect(Collectors.toSet()); } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersister.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersister.java index 6be4a50478..9897caf7df 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersister.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersister.java @@ -17,7 +17,6 @@ package com.palantir.atlasdb.sweep.asts.bucketingthings; import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.Value; import com.palantir.atlasdb.schema.generated.SweepAssignedBucketsTable.SweepAssignedBucketsColumn; import com.palantir.atlasdb.schema.generated.SweepAssignedBucketsTable.SweepAssignedBucketsRow; import com.palantir.atlasdb.sweep.asts.Bucket; @@ -76,11 +75,11 @@ SweepAssignedBucketsRow nextSweepBucketsRow(Bucket bucket) { } SweepableBucket fromSweepBucketCellAndValue( - Cell cell, Value value, ObjectPersister timestampRangePersister) { + Cell cell, byte[] value, ObjectPersister timestampRangePersister) { SweepAssignedBucketsRow row = SweepAssignedBucketsRow.BYTES_HYDRATOR.hydrateFromBytes(cell.getRowName()); SweepAssignedBucketsColumn column = SweepAssignedBucketsColumn.BYTES_HYDRATOR.hydrateFromBytes(cell.getColumnName()); - TimestampRange timestampRange = timestampRangePersister.tryDeserialize(value.getContents()); + TimestampRange timestampRange = timestampRangePersister.tryDeserialize(value); int shard = Math.toIntExact(row.getShard()); // throws if invalid shard return SweepableBucket.of( Bucket.of( diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java index a86f28b019..bc6da52cd3 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java @@ -29,6 +29,8 @@ public interface SweepBucketsTable { */ Set getSweepableBuckets(Set startBuckets); + Optional getSweepableBucket(Bucket bucket); + void putTimestampRangeForBucket( Bucket bucket, Optional oldTimestampRange, TimestampRange newTimestampRange); diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersisterTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersisterTest.java index 74ae8d8414..4dc285d72c 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersisterTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersisterTest.java @@ -20,7 +20,6 @@ import com.google.common.io.BaseEncoding; import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.Value; import com.palantir.atlasdb.schema.generated.SweepAssignedBucketsTable.SweepAssignedBucketsRow; import com.palantir.atlasdb.sweep.asts.Bucket; import com.palantir.atlasdb.sweep.asts.SweepableBucket; @@ -59,14 +58,11 @@ public final class SweepAssignedBucketStoreKeyPersisterTest { Bucket.of(ShardAndStrategy.nonSweepable(), Long.MAX_VALUE), fromBase64("PpljSwhiGMuA/4FHrhR64UeuAg==", "hw==")); - private static final Map GOLDEN_TIMESTAMP_RANGES = Map.of( + private static final Map GOLDEN_TIMESTAMP_RANGES = Map.of( TimestampRange.of(912301923, Long.MAX_VALUE), - Value.create( - BaseEncoding.base64() - .decode("OikKBfqLZW5kRXhjbHVzaXZlJQN/f39/f39/f76Nc3RhcnRJbmNsdXNpdmUkDUwJe4b7"), - -1), + BaseEncoding.base64().decode("OikKBfqLZW5kRXhjbHVzaXZlJQN/f39/f39/f76Nc3RhcnRJbmNsdXNpdmUkDUwJe4b7"), TimestampRange.openBucket(13123), - Value.create(BaseEncoding.base64().decode("OikKBfqLZW5kRXhjbHVzaXZlwY1zdGFydEluY2x1c2l2ZSQDGob7"), -1)); + BaseEncoding.base64().decode("OikKBfqLZW5kRXhjbHVzaXZlwY1zdGFydEluY2x1c2l2ZSQDGob7")); private static final Cell GOLDEN_SWEEP_BUCKET_ASSIGNER_STATE_MACHINE_CELL = Cell.create( BaseEncoding.base64().decode("GTH5RX8BW6N/f/8="), @@ -127,9 +123,7 @@ public void nextSweepBucketsRowGetsNextRowByMajorBucketIdentifier(Bucket bucket) @ParameterizedTest @MethodSource("sweepableBuckets") public void canDeserializeCellsAndValuesBackToSweepableBucket(SweepableBucket sweepableBucket) { - Value value = Value.create( - TIMESTAMP_RANGE_PERSISTER.trySerialize(sweepableBucket.timestampRange()), - -1); // Timestamp does not matter + byte[] value = TIMESTAMP_RANGE_PERSISTER.trySerialize(sweepableBucket.timestampRange()); Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketsCell(sweepableBucket.bucket()); SweepableBucket deserialisedSweepableBucket = SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue( @@ -141,7 +135,7 @@ public void canDeserializeCellsAndValuesBackToSweepableBucket(SweepableBucket sw @ParameterizedTest @MethodSource("goldenSweepBucketCellsAndValues") public void canDeserializeHistoricCellsAndValuesBackToSweepableBucket( - SweepableBucket sweepableBucket, Cell cell, Value value) { + SweepableBucket sweepableBucket, Cell cell, byte[] value) { SweepableBucket deserialisedSweepableBucket = SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue( cell, value, TIMESTAMP_RANGE_PERSISTER); diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java index 7e1cfa2247..958d1d462a 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/AbstractDefaultSweepAssignedBucketStoreTest.java @@ -255,6 +255,20 @@ public void getSweepableBucketsReturnsBucketsPerShardAndStrategy() { assertThat(store.getSweepableBuckets(new HashSet<>(startBuckets))).containsExactlyInAnyOrderElementsOf(buckets); } + @Test + public void getSweepableBucketReturnsSweepableBucketIfExists() { + Bucket bucket = Bucket.of(ShardAndStrategy.of(12, SweeperStrategy.THOROUGH), 21); + TimestampRange range = TimestampRange.of(1, 4); + store.putTimestampRangeForBucket(bucket, Optional.empty(), range); + assertThat(store.getSweepableBucket(bucket)).contains(SweepableBucket.of(bucket, range)); + } + + @Test + public void getSweepableBucketReturnsEmptyIfSweepableBucketDoesNotExist() { + Bucket bucket = Bucket.of(ShardAndStrategy.of(12, SweeperStrategy.THOROUGH), 21); + assertThat(store.getSweepableBucket(bucket)).isEmpty(); + } + @Test public void putTimestampRangeForBucketFailsIfOldTimestampRangeDoesNotMatchCurrent() { Bucket bucket = Bucket.of(ShardAndStrategy.of(12, SweeperStrategy.THOROUGH), 512); @@ -272,8 +286,7 @@ public void putTimestampRangeForBucketSucceedsIfOldTimestampRangeMatchesCurrent( TimestampRange newTimestampRange = TimestampRange.of(1, 2); store.putTimestampRangeForBucket(bucket, Optional.empty(), newTimestampRange); - Set sweepableBuckets = store.getSweepableBuckets(Set.of(bucket)); - assertThat(sweepableBuckets).containsExactly(SweepableBucket.of(bucket, newTimestampRange)); + assertThat(store.getSweepableBucket(bucket)).contains(SweepableBucket.of(bucket, newTimestampRange)); } @Test @@ -287,8 +300,10 @@ public void deleteBucketEntryDeletesBucket() { Bucket bucket = Bucket.of(ShardAndStrategy.of(12, SweeperStrategy.THOROUGH), 512); TimestampRange timestampRange = TimestampRange.of(1, 2); store.putTimestampRangeForBucket(bucket, Optional.empty(), timestampRange); + assertThat(store.getSweepableBucket(bucket)).hasValue(SweepableBucket.of(bucket, timestampRange)); + store.deleteBucketEntry(bucket); - assertThat(store.getSweepableBuckets(Set.of(bucket))).isEmpty(); + assertThat(store.getSweepableBucket(bucket)).isEmpty(); } @Test