Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[PDS-586351] [Possibly RC only, though debatable] Mechanism for clearing a potentially inaccurate sweep namesToIds and vice versa mapping #7269

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,7 @@ private static TargetedSweeper uninitializedTargetedSweeper(
CoordinationAwareKnownAbandonedTransactionsStore abandonedTxnStore =
new CoordinationAwareKnownAbandonedTransactionsStore(
coordinationService, new AbandonedTimestampStoreImpl(kvs));
log.info("[PDS-586351] Creating an uninitialized targeted sweeper...");
return TargetedSweeper.createUninitialized(
metricsManager,
runtime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.palantir.atlasdb.memory.InMemoryAtlasDbConfig;
import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig.SweepIndexResetProgressStage;
import com.palantir.conjure.java.api.config.ssl.SslConfiguration;
import java.io.File;
import java.io.IOException;
Expand All @@ -42,6 +43,9 @@ public void canDeserializeAtlasDbConfig() throws IOException {
assertTimeLockConfigDeserializedCorrectly(config.timelock().get());

assertThat(config.leader()).isNotPresent();

assertThat(config.targetedSweep().sweepIndexResetProgressStage())
.isEqualTo(SweepIndexResetProgressStage.WRITE_IMMEDIATE_FORMAT_AND_SKIP_UNKNOWNS);
}

@Test
Expand Down
2 changes: 2 additions & 0 deletions atlasdb-config/src/test/resources/test-config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
atlasdb:
namespace: brian
targetedSweep:
sweepIndexResetProgressStage: WRITE_IMMEDIATE_FORMAT_AND_SKIP_UNKNOWNS

keyValueService:
type: memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,49 @@
import com.google.common.primitives.Ints;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.ptobject.EncodingUtils;
import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig;
import com.palantir.atlasdb.sweep.queue.id.SweepTableIndices;
import com.palantir.conjure.java.jackson.optimizations.ObjectMapperOptimizations;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Optional;

public final class WriteReferencePersister {
private static final byte[] writePrefix = {1};
private static final byte[] ZERO_BYTE = {0};
private static final byte[] ONE_BYTE = {1};

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.registerModule(new Jdk8Module())
.registerModules(ObjectMapperOptimizations.createModules());
private static final StoredWriteReference DUMMY = ImmutableStoredWriteReference.of(PtBytes.EMPTY_BYTE_ARRAY);

private final SweepTableIndices tableIndices;
private final WriteMethod writeMethod;
private final UnknownIdentifierHandlingMethod unknownIdentifierHandlingMethod;

public WriteReferencePersister(SweepTableIndices tableIndices) {
WriteReferencePersister(
SweepTableIndices tableIndices,
WriteMethod writeMethod,
UnknownIdentifierHandlingMethod unknownIdentifierHandlingMethod) {
this.tableIndices = tableIndices;
this.writeMethod = writeMethod;
this.unknownIdentifierHandlingMethod = unknownIdentifierHandlingMethod;
}

public static WriteReferencePersister create(
SweepTableIndices sweepTableIndices,
TargetedSweepInstallConfig.SweepIndexResetProgressStage resetProgressStage) {
return new WriteReferencePersister(
sweepTableIndices,
resetProgressStage.shouldWriteImmediateFormat()
? WriteMethod.TABLE_NAME_AS_STRING_BINARY
: WriteMethod.TABLE_ID_BINARY,
resetProgressStage.shouldSkipUnknowns()
? UnknownIdentifierHandlingMethod.IGNORE
: UnknownIdentifierHandlingMethod.THROW);
}

public Optional<WriteReference> unpersist(StoredWriteReference writeReference) {
Expand Down Expand Up @@ -72,20 +98,38 @@ public Optional<WriteReference> visitTableNameAsStringBinary(byte[] ref) {
public Optional<WriteReference> visitTableIdBinary(byte[] ref) {
int offset = 1;
int tableId = Ints.checkedCast(EncodingUtils.decodeUnsignedVarLong(ref, offset));
TableReference tableReference = tableIndices.getTableReference(tableId);
Optional<TableReference> maybeTableReference = safeGetTableReference(tableId);
if (maybeTableReference.isEmpty()) {
return Optional.empty();
}
offset += EncodingUtils.sizeOfUnsignedVarLong(tableId);
byte[] row = EncodingUtils.decodeSizedBytes(ref, offset);
offset += EncodingUtils.sizeOfSizedBytes(row);
byte[] column = EncodingUtils.decodeSizedBytes(ref, offset);
offset += EncodingUtils.sizeOfSizedBytes(column);
long isTombstone = EncodingUtils.decodeUnsignedVarLong(ref, offset);
return Optional.of(ImmutableWriteReference.builder()
.tableRef(tableReference)
.tableRef(maybeTableReference.get())
.cell(Cell.create(row, column))
.isTombstone(isTombstone == 1)
.build());
}

private Optional<TableReference> safeGetTableReference(int tableId) {
try {
return Optional.of(tableIndices.getTableReference(tableId));
} catch (NoSuchElementException e) {
switch (unknownIdentifierHandlingMethod) {
case IGNORE:
return Optional.empty();
case THROW:
throw e;
default:
throw new SafeIllegalStateException("Unexpected unknown identifier handling method", e);
}
}
}

@Override
public Optional<WriteReference> visitDummy() {
return Optional.empty();
Expand All @@ -98,10 +142,43 @@ public StoredWriteReference persist(Optional<WriteReference> writeReference) {
return DUMMY;
}
WriteReference writeRef = writeReference.get();
byte[] tableId = EncodingUtils.encodeUnsignedVarLong(tableIndices.getTableId(writeRef.tableRef()));
byte[] tableIdentifier = getTableIdentifier(writeRef.tableRef());
byte[] row = EncodingUtils.encodeSizedBytes(writeRef.cell().getRowName());
byte[] column = EncodingUtils.encodeSizedBytes(writeRef.cell().getColumnName());
byte[] isTombstone = EncodingUtils.encodeUnsignedVarLong(writeRef.isTombstone() ? 1 : 0);
return ImmutableStoredWriteReference.of(EncodingUtils.add(writePrefix, tableId, row, column, isTombstone));
return ImmutableStoredWriteReference.of(
EncodingUtils.add(writeMethod.getBytePrefix(), tableIdentifier, row, column, isTombstone));
}

private byte[] getTableIdentifier(TableReference tableReference) {
switch (writeMethod) {
case TABLE_ID_BINARY:
return EncodingUtils.encodeUnsignedVarLong(tableIndices.getTableId(tableReference));
case TABLE_NAME_AS_STRING_BINARY:
return EncodingUtils.encodeVarString(tableReference.toString());
default:
throw new SafeIllegalStateException("Unhandled write method", SafeArg.of("writeMethod", writeMethod));
}
}

@SuppressWarnings("ImmutableEnumChecker") // Overhead of needless wrapping is probably undesirable.
enum WriteMethod {
TABLE_NAME_AS_STRING_BINARY(ZERO_BYTE),
TABLE_ID_BINARY(ONE_BYTE);

private final byte[] bytePrefix;

WriteMethod(byte[] bytePrefix) {
this.bytePrefix = bytePrefix;
}

byte[] getBytePrefix() {
return bytePrefix;
}
}

enum UnknownIdentifierHandlingMethod {
THROW,
IGNORE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
*/
public class TargetedSweepMetricPublicationFilter implements MetricPublicationFilter {
@VisibleForTesting
static final long MINIMUM_READS_WRITES_TO_BE_CONSIDERED_ACTIVE = 1_000;
static final long MINIMUM_READS_WRITES_TO_BE_CONSIDERED_ACTIVE = 0;

@VisibleForTesting
static final Duration MINIMUM_STALE_DURATION = Duration.ofHours(4);
static final Duration MINIMUM_STALE_DURATION = Duration.ofMillis(1);

private final AtomicBoolean publicationLatch;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@
package com.palantir.atlasdb.sweep.queue;

import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.protos.generated.TableMetadataPersistence.LogSafety;
import com.palantir.atlasdb.schema.TargetedSweepSchema;
import com.palantir.atlasdb.schema.generated.TargetedSweepTableFactory;
import com.palantir.atlasdb.sweep.Sweeper;
import com.palantir.atlasdb.sweep.metrics.SweepOutcome;
import com.palantir.atlasdb.sweep.metrics.TargetedSweepMetrics;
import com.palantir.atlasdb.sweep.queue.SweepQueueReader.ReadBatchingRuntimeContext;
import com.palantir.atlasdb.sweep.queue.clear.DefaultTableClearer;
import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig.SweepIndexResetProgressStage;
import com.palantir.atlasdb.table.description.Schemas;
import com.palantir.atlasdb.table.description.SweeperStrategy;
import com.palantir.atlasdb.transaction.impl.TimelockTimestampServiceAdapter;
Expand Down Expand Up @@ -78,9 +81,17 @@ public static SweepQueue create(
AbandonedTransactionConsumer abortedTransactionConsumer,
TargetedSweepFollower follower,
ReadBatchingRuntimeContext readBatchingRuntimeContext,
Function<TableReference, Optional<LogSafety>> tablesToTrackDeletions) {
Function<TableReference, Optional<LogSafety>> tablesToTrackDeletions,
SweepIndexResetProgressStage resetProgressStage) {
SweepQueueFactory factory = SweepQueueFactory.create(
metrics, kvs, timelock, shardsConfig, transaction, readBatchingRuntimeContext, tablesToTrackDeletions);
metrics,
kvs,
timelock,
shardsConfig,
transaction,
readBatchingRuntimeContext,
tablesToTrackDeletions,
resetProgressStage);
return new SweepQueue(factory, follower, abortedTransactionConsumer);
}

Expand All @@ -93,7 +104,13 @@ public static MultiTableSweepQueueWriter createWriter(
TimelockService timelock,
Supplier<Integer> shardsConfig,
ReadBatchingRuntimeContext readBatchingRuntimeContext) {
return SweepQueueFactory.create(metrics, kvs, timelock, shardsConfig, readBatchingRuntimeContext)
return SweepQueueFactory.create(
metrics,
kvs,
timelock,
shardsConfig,
readBatchingRuntimeContext,
SweepIndexResetProgressStage.NO_ACTIVE_RESET)
.createWriter();
}

Expand Down Expand Up @@ -266,7 +283,8 @@ static SweepQueueFactory create(
KeyValueService kvs,
TimelockService timelock,
Supplier<Integer> shardsConfig,
ReadBatchingRuntimeContext readBatchingRuntimeContext) {
ReadBatchingRuntimeContext readBatchingRuntimeContext,
SweepIndexResetProgressStage resetProgressStage) {
// It is OK that the transaction service is different from the one used by the transaction manager,
// as transaction services must not hold any local state in them that would affect correctness.
TransactionService transaction =
Expand All @@ -278,7 +296,8 @@ static SweepQueueFactory create(
shardsConfig,
transaction,
readBatchingRuntimeContext,
_unused -> Optional.empty());
_unused -> Optional.empty(),
resetProgressStage);
}

static SweepQueueFactory create(
Expand All @@ -288,13 +307,41 @@ static SweepQueueFactory create(
Supplier<Integer> shardsConfig,
TransactionService transaction,
ReadBatchingRuntimeContext readBatchingRuntimeContext,
Function<TableReference, Optional<LogSafety>> tablesToTrackDeletions) {
Function<TableReference, Optional<LogSafety>> tablesToTrackDeletions,
SweepIndexResetProgressStage resetProgressStage) {
Schemas.createTablesAndIndexes(TargetedSweepSchema.INSTANCE.getLatestSchema(), kvs);
log.info("[PDS-586351] Creating a sweep queue factory...");
if (resetProgressStage.shouldInvalidateOldMappings()) {
log.info("Invalidating old sweep mappings... now truncating sweep identifier tables.");

TargetedSweepTableFactory tableFactory = TargetedSweepTableFactory.of();
try {
kvs.truncateTables(ImmutableSet.of(
tableFactory.getSweepIdToNameTable(null).getTableRef(),
tableFactory.getSweepNameToIdTable(null).getTableRef()));
log.info("Successfully truncated the sweep identifier tables.");
} catch (Exception e) {
log.warn(
"A failure was observed when truncating the sweep identifier tables. If you are running"
+ " this as part of a broader clearance task, you MUST make sure that the success"
+ " message is logged BEFORE considering the reset to have been performed. Seeing this"
+ " message is neither an indication that the operation was success, nor is it an"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, but doesn't really matter so ignore it given this code is going to be ripped out: was a success / was successful (former is consistent with the rest of the sentence structure)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps worth writing in the SOP - if all nodes fail, either identify what the problem is, fix, and then bounce the nodes, or to just page someone, or some other guidance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as much as it hurts me, I will ignore the nit 😅

SOP comments added

+ " indication that the operation was not a success.",
e);
throw e;
}
} else {
log.info(
"Not invalidating old sweep mappings, because we don't believe we've been configured to do"
+ " this.",
SafeArg.of("resetProgressStage", resetProgressStage));
}

ShardProgress shardProgress = new ShardProgress(kvs);
Supplier<Integer> shards =
createProgressUpdatingSupplier(shardsConfig, shardProgress, SweepQueueUtils.REFRESH_TIME);
WriteInfoPartitioner partitioner = new WriteInfoPartitioner(kvs, shards);
SweepableCells cells = new SweepableCells(kvs, partitioner, metrics, transaction);
SweepableCells cells = new SweepableCells(kvs, partitioner, metrics, transaction, resetProgressStage);
SweepableTimestamps timestamps = new SweepableTimestamps(kvs, partitioner);
return new SweepQueueFactory(
shardProgress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.palantir.atlasdb.schema.generated.TargetedSweepTableFactory;
import com.palantir.atlasdb.sweep.CommitTsCache;
import com.palantir.atlasdb.sweep.metrics.TargetedSweepMetrics;
import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig.SweepIndexResetProgressStage;
import com.palantir.atlasdb.sweep.queue.id.SweepTableIndices;
import com.palantir.atlasdb.transaction.impl.TransactionConstants;
import com.palantir.atlasdb.transaction.service.TransactionService;
Expand Down Expand Up @@ -75,10 +76,11 @@ public SweepableCells(
KeyValueService kvs,
WriteInfoPartitioner partitioner,
TargetedSweepMetrics metrics,
TransactionService transactionService) {
TransactionService transactionService,
SweepIndexResetProgressStage resetProgressStage) {
super(kvs, TargetedSweepTableFactory.of().getSweepableCellsTable(null).getTableRef(), partitioner, metrics);
this.commitTsCache = CommitTsCache.create(transactionService);
this.writeReferencePersister = new WriteReferencePersister(new SweepTableIndices(kvs));
this.writeReferencePersister = WriteReferencePersister.create(new SweepTableIndices(kvs), resetProgressStage);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class TargetedSweeper implements MultiTableSweepQueueWriter, BackgroundSw
private final BackgroundSweepScheduler noneScheduler;

private final KeyValueService keyValueService;
private final TargetedSweepInstallConfig.SweepIndexResetProgressStage resetProgressStage;

private LastSweptTimestampUpdater lastSweptTimestampUpdater;
private TargetedSweepMetrics metrics;
Expand Down Expand Up @@ -95,6 +96,7 @@ private TargetedSweeper(
this.metricsConfiguration = install.metricsConfiguration();
this.abandonedTransactionConsumer = abandonedTransactionConsumer;
this.keyValueService = keyValueService;
this.resetProgressStage = install.sweepIndexResetProgressStage();
}

public boolean isInitialized() {
Expand Down Expand Up @@ -140,7 +142,9 @@ public static TargetedSweeper createUninitializedForTest(KeyValueService kvs, Su

@Override
public void initialize(TransactionManager txManager) {
log.info("[PDS-586351] Initializing targeted sweep...");
initializeWithoutRunning(txManager);
log.info("[PDS-586351] Initialized targeted sweep, now running in background...");
runInBackground();
}

Expand Down Expand Up @@ -169,8 +173,10 @@ public void initializeWithoutRunning(
TransactionService transaction,
TargetedSweepFollower follower) {
if (isInitialized) {
log.info("[PDS-586351] Targeted sweep thinks it's already initialized...");
return;
}
log.info("[PDS-586351] Now initializing targeted sweep, given an initialized kvs...");
Preconditions.checkState(
kvs.isInitialized(), "Attempted to initialize targeted sweeper with an uninitialized backing KVS.");
metrics = TargetedSweepMetrics.create(
Expand All @@ -179,6 +185,7 @@ public void initializeWithoutRunning(
kvs,
metricsConfiguration,
runtime.get().shards());
log.info("[PDS-586351] Initializing a sweep queue...");
queue = SweepQueue.create(
metrics,
kvs,
Expand All @@ -191,7 +198,8 @@ public void initializeWithoutRunning(
.maximumPartitions(this::getPartitionBatchLimit)
.cellsThreshold(() -> runtime.get().batchCellThreshold())
.build(),
table -> runtime.get().tablesToTrackDeletions().apply(table));
table -> runtime.get().tablesToTrackDeletions().apply(table),
resetProgressStage);
timestampsSupplier = timestamps;
timeLock = timelockService;
lastSweptTimestampUpdater = new LastSweptTimestampUpdater(
Expand Down
Loading