From 34fdb3133fb2005ef6c1d765700e2aa98d31f989 Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Tue, 10 Sep 2024 16:38:55 +0100 Subject: [PATCH 01/12] Add new field to config --- .../queue/config/TargetedSweepInstallConfig.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/config/TargetedSweepInstallConfig.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/config/TargetedSweepInstallConfig.java index 7977a2a9249..7860f3ef261 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/config/TargetedSweepInstallConfig.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/config/TargetedSweepInstallConfig.java @@ -115,7 +115,18 @@ public boolean resetTargetedSweepQueueProgressAndStopSweep() { return false; } + @Value.Default + public SweepIndexResetProgressStage sweepIndexResetProgressStage() { + return SweepIndexResetProgressStage.NO_ACTIVE_RESET; + } + public static TargetedSweepInstallConfig defaultTargetedSweepConfig() { return ImmutableTargetedSweepInstallConfig.builder().build(); } + + public enum SweepIndexResetProgressStage { + NO_ACTIVE_RESET, + WRITE_IMMEDIATE_FORMAT, + INVALIDATE_OLD_MAPPINGS; + } } From 748f77093093b30c597d5162c20b8274b350ed89 Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Tue, 10 Sep 2024 18:45:01 +0100 Subject: [PATCH 02/12] milestone 1 --- .../keyvalue/api/WriteReferencePersister.java | 53 +++++++++++++++++-- .../atlasdb/sweep/queue/SweepQueue.java | 32 ++++++++--- .../atlasdb/sweep/queue/SweepableCells.java | 7 ++- .../atlasdb/sweep/queue/TargetedSweeper.java | 5 +- .../config/TargetedSweepInstallConfig.java | 37 +++++++++++-- .../api/WriteReferencePersisterTest.java | 52 ++++++++++++++---- .../sweep/queue/SweepableCellsTest.java | 4 +- .../sweep/queue/TargetedSweeperTest.java | 4 +- 8 files changed, 166 insertions(+), 28 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersister.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersister.java index f961aa88941..6ece510f664 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersister.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersister.java @@ -20,23 +20,40 @@ import com.google.common.primitives.Ints; import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.ptobject.EncodingUtils; +import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig; import com.palantir.atlasdb.sweep.queue.id.SweepTableIndices; import com.palantir.conjure.java.jackson.optimizations.ObjectMapperOptimizations; +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.exceptions.SafeIllegalStateException; import com.palantir.logsafe.exceptions.SafeRuntimeException; import java.io.IOException; import java.util.Optional; public final class WriteReferencePersister { - private static final byte[] writePrefix = {1}; + private static final byte[] ZERO_BYTE = {0}; + private static final byte[] ONE_BYTE = {1}; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() .registerModule(new Jdk8Module()) .registerModules(ObjectMapperOptimizations.createModules()); private static final StoredWriteReference DUMMY = ImmutableStoredWriteReference.of(PtBytes.EMPTY_BYTE_ARRAY); private final SweepTableIndices tableIndices; + private final WriteMethod writeMethod; - public WriteReferencePersister(SweepTableIndices tableIndices) { + WriteReferencePersister(SweepTableIndices tableIndices, WriteMethod writeMethod) { this.tableIndices = tableIndices; + this.writeMethod = writeMethod; + } + + public static WriteReferencePersister create( + SweepTableIndices sweepTableIndices, + TargetedSweepInstallConfig.SweepIndexResetProgressStage resetProgressStage) { + return new WriteReferencePersister( + sweepTableIndices, + resetProgressStage.shouldWriteImmediateFormat() + ? WriteMethod.TABLE_NAME_AS_STRING_BINARY + : WriteMethod.TABLE_ID_BINARY); } public Optional unpersist(StoredWriteReference writeReference) { @@ -98,10 +115,38 @@ public StoredWriteReference persist(Optional writeReference) { return DUMMY; } WriteReference writeRef = writeReference.get(); - byte[] tableId = EncodingUtils.encodeUnsignedVarLong(tableIndices.getTableId(writeRef.tableRef())); + byte[] tableIdentifier = getTableIdentifier(writeRef.tableRef()); byte[] row = EncodingUtils.encodeSizedBytes(writeRef.cell().getRowName()); byte[] column = EncodingUtils.encodeSizedBytes(writeRef.cell().getColumnName()); byte[] isTombstone = EncodingUtils.encodeUnsignedVarLong(writeRef.isTombstone() ? 1 : 0); - return ImmutableStoredWriteReference.of(EncodingUtils.add(writePrefix, tableId, row, column, isTombstone)); + return ImmutableStoredWriteReference.of( + EncodingUtils.add(writeMethod.getBytePrefix(), tableIdentifier, row, column, isTombstone)); + } + + private byte[] getTableIdentifier(TableReference tableReference) { + switch (writeMethod) { + case TABLE_ID_BINARY: + return EncodingUtils.encodeUnsignedVarLong(tableIndices.getTableId(tableReference)); + case TABLE_NAME_AS_STRING_BINARY: + return EncodingUtils.encodeVarString(tableReference.toString()); + default: + throw new SafeIllegalStateException("Unhandled write method", SafeArg.of("writeMethod", writeMethod)); + } + } + + @SuppressWarnings("ImmutableEnumChecker") // Overhead of needless wrapping is probably undesirable. + enum WriteMethod { + TABLE_NAME_AS_STRING_BINARY(ZERO_BYTE), + TABLE_ID_BINARY(ONE_BYTE); + + private final byte[] bytePrefix; + + WriteMethod(byte[] bytePrefix) { + this.bytePrefix = bytePrefix; + } + + byte[] getBytePrefix() { + return bytePrefix; + } } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java index aed9699588c..ec4a73d9843 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java @@ -26,6 +26,7 @@ import com.palantir.atlasdb.sweep.metrics.TargetedSweepMetrics; import com.palantir.atlasdb.sweep.queue.SweepQueueReader.ReadBatchingRuntimeContext; import com.palantir.atlasdb.sweep.queue.clear.DefaultTableClearer; +import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig.SweepIndexResetProgressStage; import com.palantir.atlasdb.table.description.Schemas; import com.palantir.atlasdb.table.description.SweeperStrategy; import com.palantir.atlasdb.transaction.impl.TimelockTimestampServiceAdapter; @@ -78,9 +79,17 @@ public static SweepQueue create( AbandonedTransactionConsumer abortedTransactionConsumer, TargetedSweepFollower follower, ReadBatchingRuntimeContext readBatchingRuntimeContext, - Function> tablesToTrackDeletions) { + Function> tablesToTrackDeletions, + SweepIndexResetProgressStage resetProgressStage) { SweepQueueFactory factory = SweepQueueFactory.create( - metrics, kvs, timelock, shardsConfig, transaction, readBatchingRuntimeContext, tablesToTrackDeletions); + metrics, + kvs, + timelock, + shardsConfig, + transaction, + readBatchingRuntimeContext, + tablesToTrackDeletions, + resetProgressStage); return new SweepQueue(factory, follower, abortedTransactionConsumer); } @@ -93,7 +102,13 @@ public static MultiTableSweepQueueWriter createWriter( TimelockService timelock, Supplier shardsConfig, ReadBatchingRuntimeContext readBatchingRuntimeContext) { - return SweepQueueFactory.create(metrics, kvs, timelock, shardsConfig, readBatchingRuntimeContext) + return SweepQueueFactory.create( + metrics, + kvs, + timelock, + shardsConfig, + readBatchingRuntimeContext, + SweepIndexResetProgressStage.NO_ACTIVE_RESET) .createWriter(); } @@ -266,7 +281,8 @@ static SweepQueueFactory create( KeyValueService kvs, TimelockService timelock, Supplier shardsConfig, - ReadBatchingRuntimeContext readBatchingRuntimeContext) { + ReadBatchingRuntimeContext readBatchingRuntimeContext, + SweepIndexResetProgressStage resetProgressStage) { // It is OK that the transaction service is different from the one used by the transaction manager, // as transaction services must not hold any local state in them that would affect correctness. TransactionService transaction = @@ -278,7 +294,8 @@ static SweepQueueFactory create( shardsConfig, transaction, readBatchingRuntimeContext, - _unused -> Optional.empty()); + _unused -> Optional.empty(), + resetProgressStage); } static SweepQueueFactory create( @@ -288,13 +305,14 @@ static SweepQueueFactory create( Supplier shardsConfig, TransactionService transaction, ReadBatchingRuntimeContext readBatchingRuntimeContext, - Function> tablesToTrackDeletions) { + Function> tablesToTrackDeletions, + SweepIndexResetProgressStage resetProgressStage) { Schemas.createTablesAndIndexes(TargetedSweepSchema.INSTANCE.getLatestSchema(), kvs); ShardProgress shardProgress = new ShardProgress(kvs); Supplier shards = createProgressUpdatingSupplier(shardsConfig, shardProgress, SweepQueueUtils.REFRESH_TIME); WriteInfoPartitioner partitioner = new WriteInfoPartitioner(kvs, shards); - SweepableCells cells = new SweepableCells(kvs, partitioner, metrics, transaction); + SweepableCells cells = new SweepableCells(kvs, partitioner, metrics, transaction, resetProgressStage); SweepableTimestamps timestamps = new SweepableTimestamps(kvs, partitioner); return new SweepQueueFactory( shardProgress, diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepableCells.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepableCells.java index d1c12af199c..0a760387b50 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepableCells.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepableCells.java @@ -43,6 +43,7 @@ import com.palantir.atlasdb.schema.generated.TargetedSweepTableFactory; import com.palantir.atlasdb.sweep.CommitTsCache; import com.palantir.atlasdb.sweep.metrics.TargetedSweepMetrics; +import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig.SweepIndexResetProgressStage; import com.palantir.atlasdb.sweep.queue.id.SweepTableIndices; import com.palantir.atlasdb.transaction.impl.TransactionConstants; import com.palantir.atlasdb.transaction.service.TransactionService; @@ -75,10 +76,12 @@ public SweepableCells( KeyValueService kvs, WriteInfoPartitioner partitioner, TargetedSweepMetrics metrics, - TransactionService transactionService) { + TransactionService transactionService, + SweepIndexResetProgressStage resetProgressStage) { super(kvs, TargetedSweepTableFactory.of().getSweepableCellsTable(null).getTableRef(), partitioner, metrics); this.commitTsCache = CommitTsCache.create(transactionService); - this.writeReferencePersister = new WriteReferencePersister(new SweepTableIndices(kvs)); + + this.writeReferencePersister = WriteReferencePersister.create(new SweepTableIndices(kvs), resetProgressStage); } @Override diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/TargetedSweeper.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/TargetedSweeper.java index 7c5a2a7f112..4d105dd0623 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/TargetedSweeper.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/TargetedSweeper.java @@ -68,6 +68,7 @@ public class TargetedSweeper implements MultiTableSweepQueueWriter, BackgroundSw private final BackgroundSweepScheduler noneScheduler; private final KeyValueService keyValueService; + private final TargetedSweepInstallConfig.SweepIndexResetProgressStage resetProgressStage; private LastSweptTimestampUpdater lastSweptTimestampUpdater; private TargetedSweepMetrics metrics; @@ -95,6 +96,7 @@ private TargetedSweeper( this.metricsConfiguration = install.metricsConfiguration(); this.abandonedTransactionConsumer = abandonedTransactionConsumer; this.keyValueService = keyValueService; + this.resetProgressStage = install.sweepIndexResetProgressStage(); } public boolean isInitialized() { @@ -191,7 +193,8 @@ public void initializeWithoutRunning( .maximumPartitions(this::getPartitionBatchLimit) .cellsThreshold(() -> runtime.get().batchCellThreshold()) .build(), - table -> runtime.get().tablesToTrackDeletions().apply(table)); + table -> runtime.get().tablesToTrackDeletions().apply(table), + resetProgressStage); timestampsSupplier = timestamps; timeLock = timelockService; lastSweptTimestampUpdater = new LastSweptTimestampUpdater( diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/config/TargetedSweepInstallConfig.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/config/TargetedSweepInstallConfig.java index 7860f3ef261..2bef507615d 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/config/TargetedSweepInstallConfig.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/config/TargetedSweepInstallConfig.java @@ -115,6 +115,14 @@ public boolean resetTargetedSweepQueueProgressAndStopSweep() { return false; } + /** + * This functionality exists to handle situations where the sweep index tables may have gotten out of sync with + * the sweep queue (e.g., database issues). The intended usage is to set to + * WRITE_IMMEDIATE_FORMAT_AND_SKIP_UNKNOWNS, ensure that all references to previous sweep IDs have been cleared + * from the sweep queue, and then set the config to INVALIDATE_OLD_MAPPINGS. To avoid race conditions between + * such invalidation, it is strongly advised to switch to WRITE_IMMEDIATE_FORMAT_AND_SKIP_UNKNOWNS post invalidation + * before switching back to NO_ACTIVE_RESET. + */ @Value.Default public SweepIndexResetProgressStage sweepIndexResetProgressStage() { return SweepIndexResetProgressStage.NO_ACTIVE_RESET; @@ -125,8 +133,31 @@ public static TargetedSweepInstallConfig defaultTargetedSweepConfig() { } public enum SweepIndexResetProgressStage { - NO_ACTIVE_RESET, - WRITE_IMMEDIATE_FORMAT, - INVALIDATE_OLD_MAPPINGS; + NO_ACTIVE_RESET(false, false, false), + WRITE_IMMEDIATE_FORMAT_AND_SKIP_UNKNOWNS(true, true, false), + INVALIDATE_OLD_MAPPINGS(true, true, true); + + private final boolean shouldWriteImmediateFormat; + private final boolean shouldSkipUnknowns; + private final boolean shouldInvalidateOldMappings; + + SweepIndexResetProgressStage( + boolean shouldWriteImmediateFormat, boolean shouldSkipUnknowns, boolean shouldInvalidateOldMappings) { + this.shouldWriteImmediateFormat = shouldWriteImmediateFormat; + this.shouldSkipUnknowns = shouldSkipUnknowns; + this.shouldInvalidateOldMappings = shouldInvalidateOldMappings; + } + + public boolean shouldWriteImmediateFormat() { + return shouldWriteImmediateFormat; + } + + public boolean shouldSkipUnknowns() { + return shouldSkipUnknowns; + } + + public boolean shouldInvalidateOldMappings() { + return shouldInvalidateOldMappings; + } } } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersisterTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersisterTest.java index 5c5c3baa427..79d08f930d3 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersisterTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersisterTest.java @@ -17,12 +17,16 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.palantir.atlasdb.keyvalue.api.WriteReferencePersister.WriteMethod; import com.palantir.atlasdb.keyvalue.impl.InMemoryKeyValueService; import com.palantir.atlasdb.ptobject.EncodingUtils; import com.palantir.atlasdb.sweep.queue.id.SweepTableIndices; import java.nio.charset.StandardCharsets; import java.util.Optional; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; public final class WriteReferencePersisterTest { private static final TableReference TABLE = TableReference.create(Namespace.create("test_ctx"), "test__table_name"); @@ -34,10 +38,11 @@ public final class WriteReferencePersisterTest { private final KeyValueService kvs = new InMemoryKeyValueService(true); private final SweepTableIndices tableIndices = new SweepTableIndices(kvs); - private final WriteReferencePersister persister = new WriteReferencePersister(tableIndices); - @Test - public void testCanUnpersistJsonValues() { + @ParameterizedTest + @MethodSource("writeMethods") + public void testCanUnpersistJsonValues(WriteMethod writeMethod) { + WriteReferencePersister persister = new WriteReferencePersister(tableIndices, writeMethod); String original = "{\"t\":{\"namespace\":{\"name\":\"test_ctx\"},\"tablename\":\"test__table_name\"},\"c\":{\"" + "rowName\":\"P7du\",\"columnName\":\"dg==\"},\"d\":true}"; StoredWriteReference stored = @@ -45,8 +50,10 @@ public void testCanUnpersistJsonValues() { assertThat(persister.unpersist(stored)).hasValue(WRITE_REFERENCE); } - @Test - public void testCanUnpersistBinary_tableNameAsString() { + @ParameterizedTest + @MethodSource("writeMethods") + public void testCanUnpersistBinary_tableNameAsString(WriteMethod writeMethod) { + WriteReferencePersister persister = new WriteReferencePersister(tableIndices, writeMethod); byte[] data = EncodingUtils.add( new byte[1], EncodingUtils.encodeVarString(TABLE.getQualifiedName()), @@ -59,13 +66,40 @@ public void testCanUnpersistBinary_tableNameAsString() { @Test public void testCanUnpersistBinary_id() { - assertThat(persister.unpersist(StoredWriteReference.BYTES_HYDRATOR.hydrateFromBytes( - persister.persist(Optional.of(WRITE_REFERENCE)).persistToBytes()))) + WriteReferencePersister persister = new WriteReferencePersister(tableIndices, WriteMethod.TABLE_ID_BINARY); + StoredWriteReference storedWriteReference = StoredWriteReference.BYTES_HYDRATOR.hydrateFromBytes( + persister.persist(Optional.of(WRITE_REFERENCE)).persistToBytes()); + assertThat(persister.unpersist(storedWriteReference)).hasValue(WRITE_REFERENCE); + + WriteReferencePersister stringPersister = + new WriteReferencePersister(tableIndices, WriteMethod.TABLE_NAME_AS_STRING_BINARY); + assertThat(stringPersister.unpersist(storedWriteReference)) + .as("the string persister, given a known ID, should be able to interpret it") .hasValue(WRITE_REFERENCE); } - @Test - public void canUnpersistEmpty() { + @ParameterizedTest + @MethodSource("writeMethods") + public void canUnpersistEmpty(WriteMethod writeMethod) { + WriteReferencePersister persister = new WriteReferencePersister(tableIndices, writeMethod); assertThat(persister.unpersist(persister.persist(Optional.empty()))).isEmpty(); } + + @Test + public void canPersistBinary_tableNameAsString() { + WriteReferencePersister persister = + new WriteReferencePersister(tableIndices, WriteMethod.TABLE_NAME_AS_STRING_BINARY); + byte[] data = EncodingUtils.add( + new byte[1], + EncodingUtils.encodeVarString(TABLE.getQualifiedName()), + EncodingUtils.encodeSizedBytes(row), + EncodingUtils.encodeSizedBytes(column), + EncodingUtils.encodeVarLong(1)); + assertThat(persister.persist(Optional.of(WRITE_REFERENCE)).persistToBytes()) + .isEqualTo(data); + } + + static Stream writeMethods() { + return Stream.of(WriteMethod.values()); + } } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/SweepableCellsTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/SweepableCellsTest.java index 8c999a9bf46..f1d74df8d58 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/SweepableCellsTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/SweepableCellsTest.java @@ -42,6 +42,7 @@ import com.palantir.atlasdb.schema.generated.TargetedSweepTableFactory; import com.palantir.atlasdb.sweep.metrics.SweepMetricsAssert; import com.palantir.atlasdb.sweep.metrics.TargetedSweepMetrics; +import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig.SweepIndexResetProgressStage; import com.palantir.lock.v2.TimelockService; import java.util.ArrayList; import java.util.Collection; @@ -73,7 +74,8 @@ public void setup() { .millisBetweenRecomputingMetrics(1) .build(), numShards); - sweepableCells = new SweepableCells(spiedKvs, partitioner, metrics, txnService); + sweepableCells = new SweepableCells( + spiedKvs, partitioner, metrics, txnService, SweepIndexResetProgressStage.NO_ACTIVE_RESET); shardCons = writeToDefaultCellCommitted(sweepableCells, TS, TABLE_CONS); shardThor = writeToDefaultCellCommitted(sweepableCells, TS2, TABLE_THOR); diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/TargetedSweeperTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/TargetedSweeperTest.java index 3d3d23346ec..55450ac274a 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/TargetedSweeperTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/TargetedSweeperTest.java @@ -70,6 +70,7 @@ import com.palantir.atlasdb.sweep.queue.config.ImmutableTargetedSweepInstallConfig; import com.palantir.atlasdb.sweep.queue.config.ImmutableTargetedSweepRuntimeConfig; import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig; +import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig.SweepIndexResetProgressStage; import com.palantir.atlasdb.sweep.queue.config.TargetedSweepRuntimeConfig; import com.palantir.atlasdb.table.description.SweeperStrategy; import com.palantir.atlasdb.transaction.service.TransactionServices; @@ -1293,7 +1294,8 @@ public void setup(int readBatchSize) { progress = new ShardProgress(spiedKvs); sweepableTimestamps = new SweepableTimestamps(spiedKvs, partitioner); - sweepableCells = new SweepableCells(spiedKvs, partitioner, null, txnService); + sweepableCells = new SweepableCells( + spiedKvs, partitioner, null, txnService, SweepIndexResetProgressStage.NO_ACTIVE_RESET); puncherStore = KeyValueServicePuncherStore.create(spiedKvs, false); } From 94a659e43a11622097210571564f03e42db96f99 Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Tue, 10 Sep 2024 19:15:34 +0100 Subject: [PATCH 03/12] skippables --- .../keyvalue/api/WriteReferencePersister.java | 35 ++++++++++-- .../atlasdb/sweep/queue/SweepableCells.java | 1 - .../api/WriteReferencePersisterTest.java | 55 +++++++++++++++---- 3 files changed, 75 insertions(+), 16 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersister.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersister.java index 6ece510f664..0b5ab6fd56a 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersister.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersister.java @@ -27,6 +27,7 @@ import com.palantir.logsafe.exceptions.SafeIllegalStateException; import com.palantir.logsafe.exceptions.SafeRuntimeException; import java.io.IOException; +import java.util.NoSuchElementException; import java.util.Optional; public final class WriteReferencePersister { @@ -40,10 +41,12 @@ public final class WriteReferencePersister { private final SweepTableIndices tableIndices; private final WriteMethod writeMethod; + private final UnknownIdentifierHandlingMethod unknownIdentifierHandlingMethod; - WriteReferencePersister(SweepTableIndices tableIndices, WriteMethod writeMethod) { + WriteReferencePersister(SweepTableIndices tableIndices, WriteMethod writeMethod, UnknownIdentifierHandlingMethod unknownIdentifierHandlingMethod) { this.tableIndices = tableIndices; this.writeMethod = writeMethod; + this.unknownIdentifierHandlingMethod = unknownIdentifierHandlingMethod; } public static WriteReferencePersister create( @@ -53,7 +56,8 @@ public static WriteReferencePersister create( sweepTableIndices, resetProgressStage.shouldWriteImmediateFormat() ? WriteMethod.TABLE_NAME_AS_STRING_BINARY - : WriteMethod.TABLE_ID_BINARY); + : WriteMethod.TABLE_ID_BINARY, + resetProgressStage.shouldSkipUnknowns() ? UnknownIdentifierHandlingMethod.IGNORE : UnknownIdentifierHandlingMethod.THROW); } public Optional unpersist(StoredWriteReference writeReference) { @@ -89,7 +93,10 @@ public Optional visitTableNameAsStringBinary(byte[] ref) { public Optional visitTableIdBinary(byte[] ref) { int offset = 1; int tableId = Ints.checkedCast(EncodingUtils.decodeUnsignedVarLong(ref, offset)); - TableReference tableReference = tableIndices.getTableReference(tableId); + Optional maybeTableReference = safeGetTableReference(tableId); + if (maybeTableReference.isEmpty()) { + return Optional.empty(); + } offset += EncodingUtils.sizeOfUnsignedVarLong(tableId); byte[] row = EncodingUtils.decodeSizedBytes(ref, offset); offset += EncodingUtils.sizeOfSizedBytes(row); @@ -97,12 +104,27 @@ public Optional visitTableIdBinary(byte[] ref) { offset += EncodingUtils.sizeOfSizedBytes(column); long isTombstone = EncodingUtils.decodeUnsignedVarLong(ref, offset); return Optional.of(ImmutableWriteReference.builder() - .tableRef(tableReference) + .tableRef(maybeTableReference.get()) .cell(Cell.create(row, column)) .isTombstone(isTombstone == 1) .build()); } + private Optional safeGetTableReference(int tableId) { + try { + return Optional.of(tableIndices.getTableReference(tableId)); + } catch (NoSuchElementException e) { + switch (unknownIdentifierHandlingMethod) { + case IGNORE: + return Optional.empty(); + case THROW: + throw e; + default: + throw new SafeIllegalStateException("Unexpected unknown identifier handling method", e); + } + } + } + @Override public Optional visitDummy() { return Optional.empty(); @@ -149,4 +171,9 @@ byte[] getBytePrefix() { return bytePrefix; } } + + enum UnknownIdentifierHandlingMethod { + THROW, + IGNORE; + } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepableCells.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepableCells.java index 0a760387b50..4978d8877e0 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepableCells.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepableCells.java @@ -80,7 +80,6 @@ public SweepableCells( SweepIndexResetProgressStage resetProgressStage) { super(kvs, TargetedSweepTableFactory.of().getSweepableCellsTable(null).getTableRef(), partitioner, metrics); this.commitTsCache = CommitTsCache.create(transactionService); - this.writeReferencePersister = WriteReferencePersister.create(new SweepTableIndices(kvs), resetProgressStage); } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersisterTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersisterTest.java index 79d08f930d3..fb895522c1c 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersisterTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersisterTest.java @@ -16,12 +16,15 @@ package com.palantir.atlasdb.keyvalue.api; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.palantir.atlasdb.keyvalue.api.WriteReferencePersister.UnknownIdentifierHandlingMethod; import com.palantir.atlasdb.keyvalue.api.WriteReferencePersister.WriteMethod; import com.palantir.atlasdb.keyvalue.impl.InMemoryKeyValueService; import com.palantir.atlasdb.ptobject.EncodingUtils; import com.palantir.atlasdb.sweep.queue.id.SweepTableIndices; import java.nio.charset.StandardCharsets; +import java.util.NoSuchElementException; import java.util.Optional; import java.util.stream.Stream; import org.junit.jupiter.api.Test; @@ -41,8 +44,8 @@ public final class WriteReferencePersisterTest { @ParameterizedTest @MethodSource("writeMethods") - public void testCanUnpersistJsonValues(WriteMethod writeMethod) { - WriteReferencePersister persister = new WriteReferencePersister(tableIndices, writeMethod); + void testCanUnpersistJsonValues(WriteMethod writeMethod) { + WriteReferencePersister persister = new WriteReferencePersister(tableIndices, writeMethod, UnknownIdentifierHandlingMethod.THROW); String original = "{\"t\":{\"namespace\":{\"name\":\"test_ctx\"},\"tablename\":\"test__table_name\"},\"c\":{\"" + "rowName\":\"P7du\",\"columnName\":\"dg==\"},\"d\":true}"; StoredWriteReference stored = @@ -52,8 +55,8 @@ public void testCanUnpersistJsonValues(WriteMethod writeMethod) { @ParameterizedTest @MethodSource("writeMethods") - public void testCanUnpersistBinary_tableNameAsString(WriteMethod writeMethod) { - WriteReferencePersister persister = new WriteReferencePersister(tableIndices, writeMethod); + void testCanUnpersistBinary_tableNameAsString(WriteMethod writeMethod) { + WriteReferencePersister persister = new WriteReferencePersister(tableIndices, writeMethod, UnknownIdentifierHandlingMethod.THROW); byte[] data = EncodingUtils.add( new byte[1], EncodingUtils.encodeVarString(TABLE.getQualifiedName()), @@ -65,14 +68,14 @@ public void testCanUnpersistBinary_tableNameAsString(WriteMethod writeMethod) { } @Test - public void testCanUnpersistBinary_id() { - WriteReferencePersister persister = new WriteReferencePersister(tableIndices, WriteMethod.TABLE_ID_BINARY); + void testCanUnpersistBinary_id() { + WriteReferencePersister persister = new WriteReferencePersister(tableIndices, WriteMethod.TABLE_ID_BINARY, UnknownIdentifierHandlingMethod.THROW); StoredWriteReference storedWriteReference = StoredWriteReference.BYTES_HYDRATOR.hydrateFromBytes( persister.persist(Optional.of(WRITE_REFERENCE)).persistToBytes()); assertThat(persister.unpersist(storedWriteReference)).hasValue(WRITE_REFERENCE); WriteReferencePersister stringPersister = - new WriteReferencePersister(tableIndices, WriteMethod.TABLE_NAME_AS_STRING_BINARY); + new WriteReferencePersister(tableIndices, WriteMethod.TABLE_NAME_AS_STRING_BINARY, UnknownIdentifierHandlingMethod.THROW); assertThat(stringPersister.unpersist(storedWriteReference)) .as("the string persister, given a known ID, should be able to interpret it") .hasValue(WRITE_REFERENCE); @@ -80,15 +83,15 @@ public void testCanUnpersistBinary_id() { @ParameterizedTest @MethodSource("writeMethods") - public void canUnpersistEmpty(WriteMethod writeMethod) { - WriteReferencePersister persister = new WriteReferencePersister(tableIndices, writeMethod); + void canUnpersistEmpty(WriteMethod writeMethod) { + WriteReferencePersister persister = new WriteReferencePersister(tableIndices, writeMethod, UnknownIdentifierHandlingMethod.THROW); assertThat(persister.unpersist(persister.persist(Optional.empty()))).isEmpty(); } @Test - public void canPersistBinary_tableNameAsString() { + void canPersistBinary_tableNameAsString() { WriteReferencePersister persister = - new WriteReferencePersister(tableIndices, WriteMethod.TABLE_NAME_AS_STRING_BINARY); + new WriteReferencePersister(tableIndices, WriteMethod.TABLE_NAME_AS_STRING_BINARY, UnknownIdentifierHandlingMethod.THROW); byte[] data = EncodingUtils.add( new byte[1], EncodingUtils.encodeVarString(TABLE.getQualifiedName()), @@ -99,7 +102,37 @@ public void canPersistBinary_tableNameAsString() { .isEqualTo(data); } + @ParameterizedTest + @MethodSource("writeMethods") + void ignoresUnknownIdentifiersIfConfigured(WriteMethod writeMethod) { + WriteReferencePersister persister = + new WriteReferencePersister(tableIndices, writeMethod, UnknownIdentifierHandlingMethod.IGNORE); + + byte[] data = createExpectedDataWithIdentifier(777666555); + assertThat(persister.unpersist(StoredWriteReference.BYTES_HYDRATOR.hydrateFromBytes(data))).isEmpty(); + } + + @ParameterizedTest + @MethodSource("writeMethods") + void throwsOnUnknownIdentifiersIfConfigured(WriteMethod writeMethod) { + WriteReferencePersister persister = + new WriteReferencePersister(tableIndices, writeMethod, UnknownIdentifierHandlingMethod.THROW); + + byte[] data = createExpectedDataWithIdentifier(314159265); + assertThatThrownBy(() -> persister.unpersist(StoredWriteReference.BYTES_HYDRATOR.hydrateFromBytes(data))) + .isInstanceOf(NoSuchElementException.class); + } + static Stream writeMethods() { return Stream.of(WriteMethod.values()); } + + private static byte[] createExpectedDataWithIdentifier(long identifier) { + return EncodingUtils.add( + new byte[]{1}, + EncodingUtils.encodeVarLong(identifier), + EncodingUtils.encodeSizedBytes(row), + EncodingUtils.encodeSizedBytes(column), + EncodingUtils.encodeVarLong(1)); + } } From cb71bb6dcabc235bc62d10fd8a4c324805536dcf Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Tue, 10 Sep 2024 19:15:47 +0100 Subject: [PATCH 04/12] skippy skip skip skip skip skip --- .../keyvalue/api/WriteReferencePersister.java | 9 +++++-- .../api/WriteReferencePersisterTest.java | 25 +++++++++++-------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersister.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersister.java index 0b5ab6fd56a..f08073147fa 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersister.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersister.java @@ -43,7 +43,10 @@ public final class WriteReferencePersister { private final WriteMethod writeMethod; private final UnknownIdentifierHandlingMethod unknownIdentifierHandlingMethod; - WriteReferencePersister(SweepTableIndices tableIndices, WriteMethod writeMethod, UnknownIdentifierHandlingMethod unknownIdentifierHandlingMethod) { + WriteReferencePersister( + SweepTableIndices tableIndices, + WriteMethod writeMethod, + UnknownIdentifierHandlingMethod unknownIdentifierHandlingMethod) { this.tableIndices = tableIndices; this.writeMethod = writeMethod; this.unknownIdentifierHandlingMethod = unknownIdentifierHandlingMethod; @@ -57,7 +60,9 @@ public static WriteReferencePersister create( resetProgressStage.shouldWriteImmediateFormat() ? WriteMethod.TABLE_NAME_AS_STRING_BINARY : WriteMethod.TABLE_ID_BINARY, - resetProgressStage.shouldSkipUnknowns() ? UnknownIdentifierHandlingMethod.IGNORE : UnknownIdentifierHandlingMethod.THROW); + resetProgressStage.shouldSkipUnknowns() + ? UnknownIdentifierHandlingMethod.IGNORE + : UnknownIdentifierHandlingMethod.THROW); } public Optional unpersist(StoredWriteReference writeReference) { diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersisterTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersisterTest.java index fb895522c1c..d3b5b1e85fa 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersisterTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/WriteReferencePersisterTest.java @@ -45,7 +45,8 @@ public final class WriteReferencePersisterTest { @ParameterizedTest @MethodSource("writeMethods") void testCanUnpersistJsonValues(WriteMethod writeMethod) { - WriteReferencePersister persister = new WriteReferencePersister(tableIndices, writeMethod, UnknownIdentifierHandlingMethod.THROW); + WriteReferencePersister persister = + new WriteReferencePersister(tableIndices, writeMethod, UnknownIdentifierHandlingMethod.THROW); String original = "{\"t\":{\"namespace\":{\"name\":\"test_ctx\"},\"tablename\":\"test__table_name\"},\"c\":{\"" + "rowName\":\"P7du\",\"columnName\":\"dg==\"},\"d\":true}"; StoredWriteReference stored = @@ -56,7 +57,8 @@ void testCanUnpersistJsonValues(WriteMethod writeMethod) { @ParameterizedTest @MethodSource("writeMethods") void testCanUnpersistBinary_tableNameAsString(WriteMethod writeMethod) { - WriteReferencePersister persister = new WriteReferencePersister(tableIndices, writeMethod, UnknownIdentifierHandlingMethod.THROW); + WriteReferencePersister persister = + new WriteReferencePersister(tableIndices, writeMethod, UnknownIdentifierHandlingMethod.THROW); byte[] data = EncodingUtils.add( new byte[1], EncodingUtils.encodeVarString(TABLE.getQualifiedName()), @@ -69,13 +71,14 @@ void testCanUnpersistBinary_tableNameAsString(WriteMethod writeMethod) { @Test void testCanUnpersistBinary_id() { - WriteReferencePersister persister = new WriteReferencePersister(tableIndices, WriteMethod.TABLE_ID_BINARY, UnknownIdentifierHandlingMethod.THROW); + WriteReferencePersister persister = new WriteReferencePersister( + tableIndices, WriteMethod.TABLE_ID_BINARY, UnknownIdentifierHandlingMethod.THROW); StoredWriteReference storedWriteReference = StoredWriteReference.BYTES_HYDRATOR.hydrateFromBytes( persister.persist(Optional.of(WRITE_REFERENCE)).persistToBytes()); assertThat(persister.unpersist(storedWriteReference)).hasValue(WRITE_REFERENCE); - WriteReferencePersister stringPersister = - new WriteReferencePersister(tableIndices, WriteMethod.TABLE_NAME_AS_STRING_BINARY, UnknownIdentifierHandlingMethod.THROW); + WriteReferencePersister stringPersister = new WriteReferencePersister( + tableIndices, WriteMethod.TABLE_NAME_AS_STRING_BINARY, UnknownIdentifierHandlingMethod.THROW); assertThat(stringPersister.unpersist(storedWriteReference)) .as("the string persister, given a known ID, should be able to interpret it") .hasValue(WRITE_REFERENCE); @@ -84,14 +87,15 @@ void testCanUnpersistBinary_id() { @ParameterizedTest @MethodSource("writeMethods") void canUnpersistEmpty(WriteMethod writeMethod) { - WriteReferencePersister persister = new WriteReferencePersister(tableIndices, writeMethod, UnknownIdentifierHandlingMethod.THROW); + WriteReferencePersister persister = + new WriteReferencePersister(tableIndices, writeMethod, UnknownIdentifierHandlingMethod.THROW); assertThat(persister.unpersist(persister.persist(Optional.empty()))).isEmpty(); } @Test void canPersistBinary_tableNameAsString() { - WriteReferencePersister persister = - new WriteReferencePersister(tableIndices, WriteMethod.TABLE_NAME_AS_STRING_BINARY, UnknownIdentifierHandlingMethod.THROW); + WriteReferencePersister persister = new WriteReferencePersister( + tableIndices, WriteMethod.TABLE_NAME_AS_STRING_BINARY, UnknownIdentifierHandlingMethod.THROW); byte[] data = EncodingUtils.add( new byte[1], EncodingUtils.encodeVarString(TABLE.getQualifiedName()), @@ -109,7 +113,8 @@ void ignoresUnknownIdentifiersIfConfigured(WriteMethod writeMethod) { new WriteReferencePersister(tableIndices, writeMethod, UnknownIdentifierHandlingMethod.IGNORE); byte[] data = createExpectedDataWithIdentifier(777666555); - assertThat(persister.unpersist(StoredWriteReference.BYTES_HYDRATOR.hydrateFromBytes(data))).isEmpty(); + assertThat(persister.unpersist(StoredWriteReference.BYTES_HYDRATOR.hydrateFromBytes(data))) + .isEmpty(); } @ParameterizedTest @@ -129,7 +134,7 @@ static Stream writeMethods() { private static byte[] createExpectedDataWithIdentifier(long identifier) { return EncodingUtils.add( - new byte[]{1}, + new byte[] {1}, EncodingUtils.encodeVarLong(identifier), EncodingUtils.encodeSizedBytes(row), EncodingUtils.encodeSizedBytes(column), From 7913d09e290d30e9509e79a924c4f96e03caac4d Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Tue, 10 Sep 2024 19:23:26 +0100 Subject: [PATCH 05/12] truncation time --- .../atlasdb/sweep/queue/SweepQueue.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java index ec4a73d9843..d13ac1a0eab 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java @@ -16,11 +16,13 @@ package com.palantir.atlasdb.sweep.queue; import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableSet; import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.keyvalue.api.KeyValueService; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.protos.generated.TableMetadataPersistence.LogSafety; import com.palantir.atlasdb.schema.TargetedSweepSchema; +import com.palantir.atlasdb.schema.generated.TargetedSweepTableFactory; import com.palantir.atlasdb.sweep.Sweeper; import com.palantir.atlasdb.sweep.metrics.SweepOutcome; import com.palantir.atlasdb.sweep.metrics.TargetedSweepMetrics; @@ -308,6 +310,26 @@ static SweepQueueFactory create( Function> tablesToTrackDeletions, SweepIndexResetProgressStage resetProgressStage) { Schemas.createTablesAndIndexes(TargetedSweepSchema.INSTANCE.getLatestSchema(), kvs); + if (resetProgressStage.shouldInvalidateOldMappings()) { + log.info("Invalidating old sweep mappings... now truncating sweep identifier tables."); + + TargetedSweepTableFactory tableFactory = TargetedSweepTableFactory.of(); + try { + kvs.truncateTables(ImmutableSet.of( + tableFactory.getSweepIdToNameTable(null).getTableRef(), + tableFactory.getSweepNameToIdTable(null).getTableRef())); + log.info("Successfully truncated the sweep identifier tables."); + } catch (Exception e) { + log.warn( + "A failure was observed when truncating the sweep identifier tables. If you are running" + + " this as part of a broader clearance task, you MUST make sure that the success" + + " message is logged BEFORE considering the reset to have been performed. Seeing this" + + " message is neither an indication that the operation was success, nor is it an" + + " indication that the operation was not a success.", + e); + throw e; + } + } ShardProgress shardProgress = new ShardProgress(kvs); Supplier shards = createProgressUpdatingSupplier(shardsConfig, shardProgress, SweepQueueUtils.REFRESH_TIME); From 0d33f2fe057d5d6379ab8b611924126cb67c206c Mon Sep 17 00:00:00 2001 From: svc-autorelease Date: Wed, 11 Sep 2024 14:20:50 +0000 Subject: [PATCH 06/12] Release 0.1147.0-rc1 [skip ci] From 0dbca2c001a6c029af6626b2da0b204f54dce89f Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Fri, 13 Sep 2024 17:43:15 +0100 Subject: [PATCH 07/12] wrecking ballllll --- .../sweep/metrics/TargetedSweepMetricPublicationFilter.java | 4 ++-- .../metrics/TargetedSweepMetricPublicationFilterTest.java | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/metrics/TargetedSweepMetricPublicationFilter.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/metrics/TargetedSweepMetricPublicationFilter.java index 74ad4bc0870..c9c34b7d9a7 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/metrics/TargetedSweepMetricPublicationFilter.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/metrics/TargetedSweepMetricPublicationFilter.java @@ -33,10 +33,10 @@ */ public class TargetedSweepMetricPublicationFilter implements MetricPublicationFilter { @VisibleForTesting - static final long MINIMUM_READS_WRITES_TO_BE_CONSIDERED_ACTIVE = 1_000; + static final long MINIMUM_READS_WRITES_TO_BE_CONSIDERED_ACTIVE = 0; @VisibleForTesting - static final Duration MINIMUM_STALE_DURATION = Duration.ofHours(4); + static final Duration MINIMUM_STALE_DURATION = Duration.ofMillis(1); private final AtomicBoolean publicationLatch; diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/metrics/TargetedSweepMetricPublicationFilterTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/metrics/TargetedSweepMetricPublicationFilterTest.java index d3832724aee..f28dab3f2ed 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/metrics/TargetedSweepMetricPublicationFilterTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/metrics/TargetedSweepMetricPublicationFilterTest.java @@ -20,8 +20,10 @@ import java.time.Duration; import java.util.concurrent.atomic.AtomicLong; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +@Disabled // Disabled for RC public class TargetedSweepMetricPublicationFilterTest { private final AtomicLong enqueuedWrites = new AtomicLong(); private final AtomicLong entriesRead = new AtomicLong(); From d1a864814c513f7f38794d441270bb645ab4ff19 Mon Sep 17 00:00:00 2001 From: svc-autorelease Date: Fri, 13 Sep 2024 16:49:11 +0000 Subject: [PATCH 08/12] Release 0.1147.0-rc2 [skip ci] From 3ddae1e979e6340d70e16371edbb780e3738dd15 Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Mon, 16 Sep 2024 14:49:11 +0100 Subject: [PATCH 09/12] disable --- .../palantir/atlasdb/sweep/metrics/SweepOutcomeMetricsTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/metrics/SweepOutcomeMetricsTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/metrics/SweepOutcomeMetricsTest.java index 1c274b263f6..078682b37c5 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/metrics/SweepOutcomeMetricsTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/metrics/SweepOutcomeMetricsTest.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; public class SweepOutcomeMetricsTest { @@ -116,6 +117,7 @@ public void targetedSweepDoesNotRegisterExcludedOutcomes() { } @Test + @Disabled // This test really shouldn't be here anyways (it relies on behaviour elsewhere), but eh. public void canFilterOutUninterestingMetrics() { SweepOutcomeMetrics.registerTargeted(metricsManager, ImmutableMap.of("strategy", "thorough"), () -> false); TargetedSweepMetrics targetedMetrics = TargetedSweepMetrics.create( From 17320def6555e13a44804eb000a278380e59268d Mon Sep 17 00:00:00 2001 From: svc-autorelease Date: Mon, 16 Sep 2024 14:08:29 +0000 Subject: [PATCH 10/12] Release 0.1147.0-rc3 [skip ci] From 6c7ad36f6a6489020e40813f6848260b95aa608d Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Mon, 16 Sep 2024 16:42:19 +0100 Subject: [PATCH 11/12] more telemetry --- .../com/palantir/atlasdb/factory/TransactionManagers.java | 1 + .../atlasdb/config/AtlasDbConfigDeserializationTest.java | 4 ++++ atlasdb-config/src/test/resources/test-config.yml | 2 ++ .../java/com/palantir/atlasdb/sweep/queue/SweepQueue.java | 7 +++++++ .../com/palantir/atlasdb/sweep/queue/TargetedSweeper.java | 5 +++++ 5 files changed, 19 insertions(+) diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java index 9df0b53032a..5baabcdeb2c 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java @@ -1062,6 +1062,7 @@ private static TargetedSweeper uninitializedTargetedSweeper( CoordinationAwareKnownAbandonedTransactionsStore abandonedTxnStore = new CoordinationAwareKnownAbandonedTransactionsStore( coordinationService, new AbandonedTimestampStoreImpl(kvs)); + log.info("[PDS-586351] Creating an uninitialized targeted sweeper..."); return TargetedSweeper.createUninitialized( metricsManager, runtime, diff --git a/atlasdb-config/src/test/java/com/palantir/atlasdb/config/AtlasDbConfigDeserializationTest.java b/atlasdb-config/src/test/java/com/palantir/atlasdb/config/AtlasDbConfigDeserializationTest.java index d144fa326af..bbf035fe940 100644 --- a/atlasdb-config/src/test/java/com/palantir/atlasdb/config/AtlasDbConfigDeserializationTest.java +++ b/atlasdb-config/src/test/java/com/palantir/atlasdb/config/AtlasDbConfigDeserializationTest.java @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; import com.palantir.atlasdb.memory.InMemoryAtlasDbConfig; +import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig.SweepIndexResetProgressStage; import com.palantir.conjure.java.api.config.ssl.SslConfiguration; import java.io.File; import java.io.IOException; @@ -42,6 +43,9 @@ public void canDeserializeAtlasDbConfig() throws IOException { assertTimeLockConfigDeserializedCorrectly(config.timelock().get()); assertThat(config.leader()).isNotPresent(); + + assertThat(config.targetedSweep().sweepIndexResetProgressStage()) + .isEqualTo(SweepIndexResetProgressStage.WRITE_IMMEDIATE_FORMAT_AND_SKIP_UNKNOWNS); } @Test diff --git a/atlasdb-config/src/test/resources/test-config.yml b/atlasdb-config/src/test/resources/test-config.yml index 8c0fc9c2cc8..a202629e828 100644 --- a/atlasdb-config/src/test/resources/test-config.yml +++ b/atlasdb-config/src/test/resources/test-config.yml @@ -1,5 +1,7 @@ atlasdb: namespace: brian + targetedSweep: + sweepIndexResetProgressStage: WRITE_IMMEDIATE_FORMAT_AND_SKIP_UNKNOWNS keyValueService: type: memory diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java index d13ac1a0eab..dac1c42b3ec 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java @@ -310,6 +310,7 @@ static SweepQueueFactory create( Function> tablesToTrackDeletions, SweepIndexResetProgressStage resetProgressStage) { Schemas.createTablesAndIndexes(TargetedSweepSchema.INSTANCE.getLatestSchema(), kvs); + log.info("[PDS-586351] Creating a sweep queue factory..."); if (resetProgressStage.shouldInvalidateOldMappings()) { log.info("Invalidating old sweep mappings... now truncating sweep identifier tables."); @@ -329,7 +330,13 @@ static SweepQueueFactory create( e); throw e; } + } else { + log.info( + "Not invalidating old sweep mappings, because we don't believe we've been configured to do" + + " this.", + SafeArg.of("resetProgressStage", resetProgressStage)); } + ShardProgress shardProgress = new ShardProgress(kvs); Supplier shards = createProgressUpdatingSupplier(shardsConfig, shardProgress, SweepQueueUtils.REFRESH_TIME); diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/TargetedSweeper.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/TargetedSweeper.java index 4d105dd0623..ba87674a9d8 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/TargetedSweeper.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/TargetedSweeper.java @@ -142,7 +142,9 @@ public static TargetedSweeper createUninitializedForTest(KeyValueService kvs, Su @Override public void initialize(TransactionManager txManager) { + log.info("[PDS-586351] Initializing targeted sweep..."); initializeWithoutRunning(txManager); + log.info("[PDS-586351] Initialized targeted sweep, now running in background..."); runInBackground(); } @@ -171,8 +173,10 @@ public void initializeWithoutRunning( TransactionService transaction, TargetedSweepFollower follower) { if (isInitialized) { + log.info("[PDS-586351] Targeted sweep thinks it's already initialized..."); return; } + log.info("[PDS-586351] Now initializing targeted sweep, given an initialized kvs..."); Preconditions.checkState( kvs.isInitialized(), "Attempted to initialize targeted sweeper with an uninitialized backing KVS."); metrics = TargetedSweepMetrics.create( @@ -181,6 +185,7 @@ public void initializeWithoutRunning( kvs, metricsConfiguration, runtime.get().shards()); + log.info("[PDS-586351] Initializing a sweep queue..."); queue = SweepQueue.create( metrics, kvs, From ba17d3e9663ad58378f24e1f5f275bb73b99b57c Mon Sep 17 00:00:00 2001 From: svc-autorelease Date: Mon, 16 Sep 2024 15:50:32 +0000 Subject: [PATCH 12/12] Release 0.1147.0-rc4 [skip ci]