diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersister.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersister.java new file mode 100644 index 00000000000..6be4a504785 --- /dev/null +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersister.java @@ -0,0 +1,128 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.sweep.asts.bucketingthings; + +import com.palantir.atlasdb.keyvalue.api.Cell; +import com.palantir.atlasdb.keyvalue.api.Value; +import com.palantir.atlasdb.schema.generated.SweepAssignedBucketsTable.SweepAssignedBucketsColumn; +import com.palantir.atlasdb.schema.generated.SweepAssignedBucketsTable.SweepAssignedBucketsRow; +import com.palantir.atlasdb.sweep.asts.Bucket; +import com.palantir.atlasdb.sweep.asts.SweepableBucket; +import com.palantir.atlasdb.sweep.asts.TimestampRange; +import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; +import com.palantir.atlasdb.table.description.SweeperStrategy; + +/** + * The layout for this table is: + * + * +--------------------------+---------------+-----------------+-----------------+-----------------+ + * | Row | Col -1 | 0 | 1 | X | + * +--------------------------+---------------+-----------------+-----------------+-----------------+ + * | (-1, -1, -1) | STATE_MACHINE | | | | + * | (shard, -1, strategy) | START_BUCKET | | | | + * | (-1, major, -1) | | TIMESTAMP_RANGE | TIMESTAMP_RANGE | TIMESTAMP_RANGE | + * | (shard, major, strategy) | | BUCKET | BUCKET | BUCKET | + * +--------------------------+---------------+-----------------+-----------------+-----------------+ + * Where shard, strategy, major are non-negative. + * + * + */ +enum SweepAssignedBucketStoreKeyPersister { + INSTANCE; + + private static final long ROW_LENGTH = 100; + private static final byte RESERVED_IDENTIFIER = -1; + + private static final byte[] RESERVED_STRATEGY = new byte[] {RESERVED_IDENTIFIER}; + private static final byte[] RESERVED_COLUMN = + SweepAssignedBucketsColumn.of(RESERVED_IDENTIFIER).persistToBytes(); + private static final Cell SWEEP_BUCKET_ASSIGNER_STATE_MACHINE_CELL = Cell.create( + SweepAssignedBucketsRow.of(RESERVED_IDENTIFIER, RESERVED_IDENTIFIER, RESERVED_STRATEGY) + .persistToBytes(), + RESERVED_COLUMN); + + Cell sweepBucketsCell(Bucket bucket) { + SweepAssignedBucketsRow row = sweepBucketsRow(bucket); + SweepAssignedBucketsColumn column = + SweepAssignedBucketsColumn.of(minorBucketIdentifier(bucket.bucketIdentifier())); + return Cell.create(row.persistToBytes(), column.persistToBytes()); + } + + SweepAssignedBucketsRow sweepBucketsRow(Bucket bucket) { + return SweepAssignedBucketsRow.of( + bucket.shardAndStrategy().shard(), + majorBucketIdentifier(bucket.bucketIdentifier()), + bucket.shardAndStrategy().strategy().persistToBytes()); + } + + SweepAssignedBucketsRow nextSweepBucketsRow(Bucket bucket) { + SweepAssignedBucketsRow currentRow = sweepBucketsRow(bucket); + return SweepAssignedBucketsRow.of( + currentRow.getShard(), currentRow.getMajorBucketIdentifier() + 1, currentRow.getStrategy()); + } + + SweepableBucket fromSweepBucketCellAndValue( + Cell cell, Value value, ObjectPersister timestampRangePersister) { + SweepAssignedBucketsRow row = SweepAssignedBucketsRow.BYTES_HYDRATOR.hydrateFromBytes(cell.getRowName()); + SweepAssignedBucketsColumn column = + SweepAssignedBucketsColumn.BYTES_HYDRATOR.hydrateFromBytes(cell.getColumnName()); + TimestampRange timestampRange = timestampRangePersister.tryDeserialize(value.getContents()); + int shard = Math.toIntExact(row.getShard()); // throws if invalid shard + return SweepableBucket.of( + Bucket.of( + ShardAndStrategy.of(shard, SweeperStrategy.BYTES_HYDRATOR.hydrateFromBytes(row.getStrategy())), + bucketIdentifier(row.getMajorBucketIdentifier(), column.getMinorBucketIdentifier())), + timestampRange); + } + + Cell sweepBucketAssignerStateMachineCell() { + return SWEEP_BUCKET_ASSIGNER_STATE_MACHINE_CELL; + } + + Cell sweepBucketPointerCell(ShardAndStrategy shardAndStrategy) { + SweepAssignedBucketsRow row = SweepAssignedBucketsRow.of( + shardAndStrategy.shard(), + RESERVED_IDENTIFIER, + shardAndStrategy.strategy().persistToBytes()); + return Cell.create(row.persistToBytes(), RESERVED_COLUMN); + } + + // This is _not_ keyed on the shard and strategy, which does make clean up harder, because you need to ensure + // that no one, across all of the shards and strategies, will use the value. + // We estimate this to be around 25MB of data per keyspace per year, which we believe is fine to abandon. + // If you do want to have the ability to retention the data, consider a one off task to clean up old data, or + // to _not_ delete the bucket in the foreground cleaner, but instead to mark it as deleted and actually delete it + // in the background task + Cell sweepBucketRecordsCell(long bucketIdentifier) { + SweepAssignedBucketsRow row = SweepAssignedBucketsRow.of( + RESERVED_IDENTIFIER, majorBucketIdentifier(bucketIdentifier), RESERVED_STRATEGY); + SweepAssignedBucketsColumn column = SweepAssignedBucketsColumn.of(minorBucketIdentifier(bucketIdentifier)); + return Cell.create(row.persistToBytes(), column.persistToBytes()); + } + + private static long bucketIdentifier(long majorBucketIdentifier, long minorBucketIdentifier) { + return majorBucketIdentifier * ROW_LENGTH + minorBucketIdentifier; + } + + private static long majorBucketIdentifier(long bucketIdentifier) { + return bucketIdentifier / ROW_LENGTH; + } + + private static long minorBucketIdentifier(long bucketIdentifier) { + return bucketIdentifier % ROW_LENGTH; + } +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/progress/DefaultBucketKeySerializer.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/progress/DefaultBucketKeySerializer.java index 2090648a000..eb3bbe70443 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/progress/DefaultBucketKeySerializer.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/progress/DefaultBucketKeySerializer.java @@ -21,9 +21,6 @@ import com.palantir.atlasdb.schema.generated.SweepBucketProgressTable.SweepBucketProgressNamedColumn; import com.palantir.atlasdb.schema.generated.SweepBucketProgressTable.SweepBucketProgressRow; import com.palantir.atlasdb.sweep.asts.Bucket; -import com.palantir.atlasdb.table.description.SweeperStrategy; -import com.palantir.logsafe.SafeArg; -import com.palantir.logsafe.exceptions.SafeIllegalStateException; enum DefaultBucketKeySerializer { INSTANCE; @@ -32,21 +29,7 @@ Cell bucketToCell(Bucket bucket) { SweepBucketProgressTable.SweepBucketProgressRow row = SweepBucketProgressRow.of( bucket.shardAndStrategy().shard(), bucket.bucketIdentifier(), - persistStrategy(bucket.shardAndStrategy().strategy())); + bucket.shardAndStrategy().strategy().persistToBytes()); return Cell.create(row.persistToBytes(), SweepBucketProgressNamedColumn.BUCKET_PROGRESS.getShortName()); } - - private static byte[] persistStrategy(SweeperStrategy strategy) { - switch (strategy) { - case THOROUGH: - return new byte[] {0}; - case CONSERVATIVE: - return new byte[] {1}; - case NON_SWEEPABLE: - return new byte[] {2}; - default: - throw new SafeIllegalStateException( - "Unexpected sweeper strategy", SafeArg.of("sweeperStrategy", strategy)); - } - } } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersisterTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersisterTest.java new file mode 100644 index 00000000000..66ac0c18a3a --- /dev/null +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepAssignedBucketStoreKeyPersisterTest.java @@ -0,0 +1,273 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.sweep.asts.bucketingthings; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.io.BaseEncoding; +import com.palantir.atlasdb.keyvalue.api.Cell; +import com.palantir.atlasdb.keyvalue.api.Value; +import com.palantir.atlasdb.schema.generated.SweepAssignedBucketsTable.SweepAssignedBucketsRow; +import com.palantir.atlasdb.sweep.asts.Bucket; +import com.palantir.atlasdb.sweep.asts.SweepableBucket; +import com.palantir.atlasdb.sweep.asts.TimestampRange; +import com.palantir.atlasdb.sweep.asts.bucketingthings.ObjectPersister.LogSafety; +import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; +import com.palantir.atlasdb.table.description.SweeperStrategy; +import com.palantir.conjure.java.serialization.ObjectMappers; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.eclipse.collections.impl.factory.Sets; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public final class SweepAssignedBucketStoreKeyPersisterTest { + private static final ObjectPersister TIMESTAMP_RANGE_PERSISTER = + ObjectPersister.of(ObjectMappers.newServerSmileMapper(), TimestampRange.class, LogSafety.SAFE); + + private static final Map GOLDEN_BUCKETS = Map.of( + Bucket.of(ShardAndStrategy.conservative(0), 0), + fromBase64("95dIh9y7UDyAgAE=", "gA=="), + Bucket.of(ShardAndStrategy.conservative(0), 1), + fromBase64("95dIh9y7UDyAgAE=", "gQ=="), + Bucket.of(ShardAndStrategy.conservative(1), 0), + fromBase64("HdZZiImVreeBgAE=", "gA=="), + Bucket.of(ShardAndStrategy.thorough(0), 10000000), + fromBase64("S75hl43DqVyA4YagAA==", "gA=="), + Bucket.of(ShardAndStrategy.thorough(0), 1), + fromBase64("uOHLXMd0L9KAgAA=", "gQ=="), + Bucket.of(ShardAndStrategy.thorough(1), 1245123123), + fromBase64("jAxmaZeQfpWB8L39nwA=", "lw=="), + Bucket.of(ShardAndStrategy.nonSweepable(), Long.MAX_VALUE), + fromBase64("PpljSwhiGMuA/4FHrhR64UeuAg==", "hw==")); + + private static final Map GOLDEN_TIMESTAMP_RANGES = Map.of( + TimestampRange.of(912301923, Long.MAX_VALUE), + Value.create( + BaseEncoding.base64() + .decode("OikKBfqNc3RhcnRJbmNsdXNpdmUkDUwJe4aLZW5kRXhjbHVzaXZlJQN/f39/f39/f777"), + -1), + TimestampRange.openBucket(13123), + Value.create(BaseEncoding.base64().decode("OikKBfqNc3RhcnRJbmNsdXNpdmUkAxqGi2VuZEV4Y2x1c2l2ZcH7"), -1)); + + private static final Cell GOLDEN_SWEEP_BUCKET_ASSIGNER_STATE_MACHINE_CELL = Cell.create( + BaseEncoding.base64().decode("GTH5RX8BW6N/f/8="), + BaseEncoding.base64().decode("fw==")); + + private static final Map GOLDEN_SWEEP_BUCKET_POINTER_CELLS = Map.of( + ShardAndStrategy.conservative(0), + fromBase64("S0/IrI4nnp+AfwE=", "fw=="), + ShardAndStrategy.conservative(1), + fromBase64("s1rdA0gzO/CBfwE=", "fw=="), + ShardAndStrategy.thorough(0), + fromBase64("SovRDVViVQKAfwA=", "fw=="), + ShardAndStrategy.thorough(1), + fromBase64("uPBDJgpXMfeBfwA=", "fw=="), + ShardAndStrategy.nonSweepable(), + fromBase64("pWhFqaJi6q2AfwI=", "fw==")); + + private static final Map GOLDEN_SWEEP_BUCKET_RECORDS_CELLS = Map.of( + 0L, + fromBase64("odL6eJE/9Q9/gP8=", "gA=="), + 1L, + fromBase64("odL6eJE/9Q9/gP8=", "gQ=="), + 2L, + fromBase64("odL6eJE/9Q9/gP8=", "gg=="), + 311421L, + fromBase64("6ty/CriS/L1/zCr/", "lQ=="), + Long.MAX_VALUE, + fromBase64("C4tS7h1RgGV//4FHrhR64Ueu/w==", "hw==")); + + @Test + public void differentBucketsMapToDifferentSweepBucketsCells() { + List cells = buckets().stream() + .map(SweepAssignedBucketStoreKeyPersister.INSTANCE::sweepBucketsCell) + .collect(Collectors.toList()); + + assertThat(cells).doesNotHaveDuplicates(); + } + + @ParameterizedTest + @MethodSource("goldenSweepBucketsCells") + public void bucketMapsToHistoricSweepBucketsCell(Bucket bucket, Cell expectedCell) { + Cell actualCell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketsCell(bucket); + assertThat(actualCell).isEqualTo(expectedCell); + } + + // The tests for checking buckets map to historic rows are omitted as this is covered by the cell tests above. + @ParameterizedTest + @MethodSource("buckets") + public void nextSweepBucketsRowGetsNextRowByMajorBucketIdentifier(Bucket bucket) { + SweepAssignedBucketsRow currentRow = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketsRow(bucket); + SweepAssignedBucketsRow nextRow = SweepAssignedBucketStoreKeyPersister.INSTANCE.nextSweepBucketsRow(bucket); + + SweepAssignedBucketsRow expectedNextRow = SweepAssignedBucketsRow.of( + currentRow.getShard(), currentRow.getMajorBucketIdentifier() + 1, currentRow.getStrategy()); + assertThat(nextRow).isEqualTo(expectedNextRow); + } + + @ParameterizedTest + @MethodSource("sweepableBuckets") + public void canDeserializeCellsAndValuesBackToSweepableBucket(SweepableBucket sweepableBucket) { + Value value = Value.create( + TIMESTAMP_RANGE_PERSISTER.trySerialize(sweepableBucket.timestampRange()), + -1); // Timestamp does not matter + Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketsCell(sweepableBucket.bucket()); + SweepableBucket deserialisedSweepableBucket = + SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue( + cell, value, TIMESTAMP_RANGE_PERSISTER); + + assertThat(deserialisedSweepableBucket).isEqualTo(sweepableBucket); + } + + @ParameterizedTest + @MethodSource("goldenSweepBucketCellsAndValues") + public void canDeserializeHistoricCellsAndValuesBackToSweepableBucket( + SweepableBucket sweepableBucket, Cell cell, Value value) { + SweepableBucket deserialisedSweepableBucket = + SweepAssignedBucketStoreKeyPersister.INSTANCE.fromSweepBucketCellAndValue( + cell, value, TIMESTAMP_RANGE_PERSISTER); + + assertThat(deserialisedSweepableBucket).isEqualTo(sweepableBucket); + } + + @Test + public void sweepBucketAssignerStateMachineCellMatchesHistoricCell() { + Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketAssignerStateMachineCell(); + assertThat(cell).isEqualTo(GOLDEN_SWEEP_BUCKET_ASSIGNER_STATE_MACHINE_CELL); + } + + @Test + public void differentShardsAndStrategiesMapToDifferentSweepBucketPointerCells() { + Stream shardsAndStrategies = Stream.of( + ShardAndStrategy.conservative(0), + ShardAndStrategy.conservative(1), + ShardAndStrategy.conservative(255), + ShardAndStrategy.thorough(0), + ShardAndStrategy.thorough(1), + ShardAndStrategy.thorough(212), + ShardAndStrategy.nonSweepable()); + + List cells = shardsAndStrategies + .map(SweepAssignedBucketStoreKeyPersister.INSTANCE::sweepBucketPointerCell) + .collect(Collectors.toList()); + + assertThat(cells).doesNotHaveDuplicates(); + } + + @ParameterizedTest + @MethodSource("goldenSweepBucketPointerCells") + public void shardsAndStrategiesMapToHistoricSweepBucketPointerCells( + ShardAndStrategy shardAndStrategy, Cell expectedCell) { + Cell actualCell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketPointerCell(shardAndStrategy); + assertThat(actualCell).isEqualTo(expectedCell); + } + + @Test + public void differentBucketIdentifiersMapToDifferentSweepBucketRecordsCells() { + Set bucketIdentifiers = Set.of(0, 1, 123123, Integer.MAX_VALUE); + List cells = bucketIdentifiers.stream() + .map(SweepAssignedBucketStoreKeyPersister.INSTANCE::sweepBucketRecordsCell) + .collect(Collectors.toList()); + assertThat(cells).doesNotHaveDuplicates(); + } + + @ParameterizedTest + @MethodSource("goldenSweepBucketRecordsCells") + public void bucketIdentifiersMapToHistoricSweepBucketRecordsCells(long bucketIdentifier, Cell expectedCell) { + Cell actualCell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketRecordsCell(bucketIdentifier); + assertThat(actualCell).isEqualTo(expectedCell); + } + + // TODO(mdaudali): Add validation to ensure that bucket identifier for bucket and passed in above >= 0 in a + // separate PR. + + @Test + // A more rigorous test would likely involve some generator for the various inputs into each cell + // Perhaps an interesting place to leverage property-based testing + public void cellsDoNotOverlap() { + List cells = Stream.concat( + Stream.concat( + Stream.of( + SweepAssignedBucketStoreKeyPersister.INSTANCE + .sweepBucketAssignerStateMachineCell(), + SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketRecordsCell(0)), + buckets().stream() + .map(SweepAssignedBucketStoreKeyPersister.INSTANCE::sweepBucketsCell)), + Stream.of(SweeperStrategy.values()) + .map(v -> SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketPointerCell( + ShardAndStrategy.of(0, v)))) + .collect(Collectors.toList()); + assertThat(cells).doesNotHaveDuplicates(); + } + + private static Set buckets() { + Set shardsAndBucketIdentifiers = Set.of(0, 99, 100, Integer.MAX_VALUE); + Set strategies = Set.of(SweeperStrategy.values()); + Set shardAndStrategies = Sets.cartesianProduct(shardsAndBucketIdentifiers, strategies) + .collect(pair -> ShardAndStrategy.of(pair.getOne(), pair.getTwo())) + .toSet(); + return Sets.cartesianProduct(shardAndStrategies, shardsAndBucketIdentifiers) + .collect(pair -> Bucket.of(pair.getOne(), pair.getTwo())) + .toSet(); + } + + private static Set sweepableBuckets() { + Set timestampRanges = + Set.of(TimestampRange.openBucket(13123), TimestampRange.of(901273, Long.MAX_VALUE)); + return Sets.cartesianProduct(buckets(), timestampRanges) + .collect(pair -> SweepableBucket.of(pair.getOne(), pair.getTwo())) + .toSet(); + } + + private static Stream goldenSweepBucketsCells() { + return GOLDEN_BUCKETS.entrySet().stream().map(entry -> Arguments.of(entry.getKey(), entry.getValue())); + } + + private static Stream goldenSweepBucketCellsAndValues() { + return GOLDEN_BUCKETS.entrySet().stream().flatMap(entry -> { + Bucket bucket = entry.getKey(); + Cell cell = entry.getValue(); + return GOLDEN_TIMESTAMP_RANGES.entrySet().stream() + .map(timestampRangeEntry -> Arguments.of( + SweepableBucket.of(bucket, timestampRangeEntry.getKey()), + cell, + timestampRangeEntry.getValue())); + }); + } + + private static Stream goldenSweepBucketPointerCells() { + return GOLDEN_SWEEP_BUCKET_POINTER_CELLS.entrySet().stream() + .map(entry -> Arguments.of(entry.getKey(), entry.getValue())); + } + + // TODO(mdaudali): consider making a custom MapSource for parameterized tests + private static Stream goldenSweepBucketRecordsCells() { + return GOLDEN_SWEEP_BUCKET_RECORDS_CELLS.entrySet().stream() + .map(entry -> Arguments.of(entry.getKey(), entry.getValue())); + } + + private static Cell fromBase64(String rowName, String columnName) { + return Cell.create( + BaseEncoding.base64().decode(rowName), BaseEncoding.base64().decode(columnName)); + } +}