Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
make hash rotation configurable (#7403)
Browse files Browse the repository at this point in the history
  • Loading branch information
barisoyoruk authored Oct 25, 2024
1 parent 5d62e44 commit e3fed6c
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ protected MultiTableSweepQueueWriter getSweepQueueWriterInitialized() {
() -> 128,
SweepQueueReader.DEFAULT_READ_BATCHING_RUNTIME_CONTEXT,
new TargetedSweepFollower(List.of(), mock(TransactionManager.class)),
MismatchBehaviour.UPDATE)
MismatchBehaviour.UPDATE,
() -> 1440)
.writer();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ private AtlasDbConstants() {
public static final int TARGETED_SWEEP_NONE_SHARDS = 1;
public static final int DEFAULT_TARGETED_SWEEP_THREADS = 1;
public static final int MAX_SWEEP_QUEUE_SHARDS = TargetedSweepMetadata.MAX_SHARDS;
public static final int DEFAULT_SHARD_ROTATION_INTERVAL_MINUTES = 1440;

public static final int DEFAULT_STREAM_IN_MEMORY_THRESHOLD = 4 * 1024 * 1024;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.palantir.atlasdb.sweep.queue;

import com.google.common.base.Suppliers;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.protos.generated.TableMetadataPersistence.LogSafety;
Expand Down Expand Up @@ -60,7 +61,8 @@ static SweepQueueComponents create(
Supplier<Integer> shardsConfig,
ReadBatchingRuntimeContext readBatchingRuntimeContext,
TargetedSweepFollower follower,
MismatchBehaviour mismatchBehaviourForShards) {
MismatchBehaviour mismatchBehaviourForShards,
Supplier<Integer> rotationIntervalMinutesConfig) {
// It is OK that the transaction service is different from the one used by the transaction manager,
// as transaction services must not hold any local state in them that would affect correctness.
TransactionService transaction =
Expand All @@ -74,7 +76,8 @@ static SweepQueueComponents create(
readBatchingRuntimeContext,
_unused -> Optional.empty(),
follower,
mismatchBehaviourForShards);
mismatchBehaviourForShards,
rotationIntervalMinutesConfig);
}

static SweepQueueComponents create(
Expand All @@ -86,7 +89,8 @@ static SweepQueueComponents create(
ReadBatchingRuntimeContext readBatchingRuntimeContext,
Function<TableReference, Optional<LogSafety>> tablesToTrackDeletions,
TargetedSweepFollower follower,
MismatchBehaviour mismatchBehaviourForShards) {
MismatchBehaviour mismatchBehaviourForShards,
Supplier<Integer> rotationIntervalMinutesConfig) {
Schemas.createTablesAndIndexes(TargetedSweepSchema.INSTANCE.getLatestSchema(), kvs);
ShardProgress shardProgress = new ShardProgress(kvs);

Expand All @@ -95,8 +99,11 @@ static SweepQueueComponents create(
shardsConfig,
mismatchBehaviourForShards,
Duration.ofMillis(SweepQueueUtils.REFRESH_TIME));
Supplier<Integer> rotationIntervalMinutesProvider = Suppliers.memoizeWithExpiration(
rotationIntervalMinutesConfig::get, Duration.ofMillis(SweepQueueUtils.REFRESH_TIME));

WriteInfoPartitioner partitioner = new WriteInfoPartitioner(kvs, numberOfShardsProvider::getNumberOfShards);
WriteInfoPartitioner partitioner = new WriteInfoPartitioner(
kvs, numberOfShardsProvider::getNumberOfShards, rotationIntervalMinutesProvider);
SweepableCells cells = new SweepableCells(kvs, partitioner, metrics, transaction);
SweepableTimestamps timestamps = new SweepableTimestamps(kvs, partitioner);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ public void initializeWithoutRunning(
.build(),
table -> runtime.get().tablesToTrackDeletions().apply(table),
follower,
mismatchBehaviour);
mismatchBehaviour,
() -> runtime.get().shardRotationIntervalMinutes());

SingleBatchSweeper singleBatchSweeper = new DefaultSingleBatchSweeper(
metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import com.palantir.atlasdb.keyvalue.api.TimestampRangeDelete;
import com.palantir.atlasdb.keyvalue.api.WriteReference;
import com.palantir.atlasdb.sweep.Sweeper;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
import org.immutables.value.Value;

Expand All @@ -43,19 +44,23 @@ default TimestampRangeDelete toDelete(Sweeper sweeper) {
.build();
}

default int toShard(int numShards) {
return IntMath.mod(dayRotatingHash(), numShards);
default int toShard(int numShards, int shardRotationIntervalMinutes) {
return IntMath.mod(rotatingHash(shardRotationIntervalMinutes), numShards);
}

/**
* The purpose of the rotating hash calculation is to redistribute shards every day to alleviate issues caused by
* imbalanced write patterns overloading few shards.
* The purpose of the rotating hash calculation is to redistribute shards every given minutes to alleviate issues
* caused by imbalanced write patterns overloading few shards.
*/
default int dayRotatingHash() {
default int rotatingHash(int shardRotationIntervalMinutes) {
int hash = 5381;
hash = hash * 1439
+ writeRef().orElse(SweepQueueUtils.DUMMY).cellReference().goodHash();
hash = hash * 1439 + LocalDate.now(ZoneOffset.UTC).hashCode();

LocalDateTime now = LocalDateTime.now(ZoneOffset.UTC);
LocalDateTime epoch = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC);
long diff = ChronoUnit.MINUTES.between(epoch, now) / shardRotationIntervalMinutes;
hash = hash * 1439 + Long.hashCode(diff);
return hash;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ public class WriteInfoPartitioner {

private final KeyValueService kvs;
private final Supplier<Integer> numShards;
private final Supplier<Integer> shardRotationIntervalMinutes;

private final LoadingCache<TableReference, SweeperStrategy> cache =
Caffeine.newBuilder().build(this::getStrategyFromKvs);

public WriteInfoPartitioner(KeyValueService kvs, Supplier<Integer> numShards) {
public WriteInfoPartitioner(
KeyValueService kvs, Supplier<Integer> numShards, Supplier<Integer> shardRotationIntervalMinutes) {
this.kvs = kvs;
this.numShards = numShards;
this.shardRotationIntervalMinutes = shardRotationIntervalMinutes;
}

/**
Expand Down Expand Up @@ -81,14 +84,16 @@ private Map<PartitionInfo, List<WriteInfo>> filterAndPartitionForSingleTimestamp
}

int shards = numShards.get();
int rotationIntervalMinutes = shardRotationIntervalMinutes.get();

// Note that we do a single pass over writes to determine their sweep strategy, and only add sweepable write to
// sweepablePartitionedWrites map, so when it is empty we consider all writes for that timestamp non-sweepable.
Map<PartitionInfo, List<WriteInfo>> sweepablePartitionedWrites = new HashMap<>();
for (WriteInfo writeInfo : writes) {
SweeperStrategy strategy = getStrategy(writeInfo);
if (strategy != SweeperStrategy.NON_SWEEPABLE) {
PartitionInfo partition = PartitionInfo.of(writeInfo.toShard(shards), strategy, writeInfo.timestamp());
PartitionInfo partition = PartitionInfo.of(
writeInfo.toShard(shards, rotationIntervalMinutes), strategy, writeInfo.timestamp());
sweepablePartitionedWrites
.computeIfAbsent(partition, _k -> new ArrayList<>())
.add(writeInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public int shards() {
return AtlasDbConstants.DEFAULT_TARGETED_SWEEP_SHARDS;
}

@Value.Default
public int shardRotationIntervalMinutes() {
return AtlasDbConstants.DEFAULT_SHARD_ROTATION_INTERVAL_MINUTES;
}

/**
* Specifies the maximum number of (fine) partitions over which targeted sweep attempts to read sweep queue
* information before executing deletes. Only partitions which actually contain information about writes will count
Expand Down Expand Up @@ -91,6 +96,14 @@ void checkShardSize() {
SafeArg.of("shards", shards()));
}

@Value.Check
void checkRotationIntervalMinutes() {
Preconditions.checkArgument(
shardRotationIntervalMinutes() >= 10 && shardRotationIntervalMinutes() <= 1440,
"Shard rotation interval minutes must be between 10 and 1440 minutes inclusive.",
SafeArg.of("rotationIntervalMinutes", shardRotationIntervalMinutes()));
}

/**
* Hint for the duration of pause between iterations of targeted sweep. Must not be longer than a day.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@

public class SingleBucketSweepTaskIntegrationTest {
private static final int SHARDS = 1; // Used to avoid complications of the hash function
private static final int SHARD_ROTATION_INTERVAL_MINUTES = 1440;
private static final int SHARD_ZERO = 0;

private static final TableReference CONSERVATIVE_TABLE = TableReference.createFromFullyQualifiedName("terri.tory");
Expand All @@ -107,7 +108,8 @@ public class SingleBucketSweepTaskIntegrationTest {
private static final byte[] THIRD_VALUE = PtBytes.toBytes("drittel");

private final KeyValueService keyValueService = new InMemoryKeyValueService(true);
private final WriteInfoPartitioner writeInfoPartitioner = new WriteInfoPartitioner(keyValueService, () -> SHARDS);
private final WriteInfoPartitioner writeInfoPartitioner =
new WriteInfoPartitioner(keyValueService, () -> SHARDS, () -> SHARD_ROTATION_INTERVAL_MINUTES);
private final MetricsManager metricsManager = MetricsManagers.createForTests();
private final TransactionService transactionService =
SimpleTransactionService.createV3(keyValueService, metricsManager.getTaggedRegistry(), () -> false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,21 @@ public abstract class AbstractSweepQueueTest {
static final long TS_FINE_PARTITION = SweepQueueUtils.tsPartitionFine(TS);
static final long TS2_FINE_PARTITION = SweepQueueUtils.tsPartitionFine(TS2);
static final int DEFAULT_SHARDS = 8;
static final int DEFAULT_SHARD_ROTATION_INTERVAL_MINUTES = 1440;
static final int FIXED_SHARD = WriteInfo.write(
TABLE_CONS,
getCellRefWithFixedShard(0, TABLE_CONS, DEFAULT_SHARDS).cell(),
0L)
.toShard(DEFAULT_SHARDS);
static final int CONS_SHARD =
WriteInfo.tombstone(TABLE_CONS, DEFAULT_CELL, 0).toShard(DEFAULT_SHARDS);
static final int THOR_SHARD =
WriteInfo.tombstone(TABLE_THOR, DEFAULT_CELL, 0).toShard(DEFAULT_SHARDS);
static final int THOR_MIGRATION_SHARD =
WriteInfo.tombstone(TABLE_THOR_MIGRATION, DEFAULT_CELL, 0).toShard(DEFAULT_SHARDS);
.toShard(DEFAULT_SHARDS, DEFAULT_SHARD_ROTATION_INTERVAL_MINUTES);
static final int CONS_SHARD = WriteInfo.tombstone(TABLE_CONS, DEFAULT_CELL, 0)
.toShard(DEFAULT_SHARDS, DEFAULT_SHARD_ROTATION_INTERVAL_MINUTES);
static final int THOR_SHARD = WriteInfo.tombstone(TABLE_THOR, DEFAULT_CELL, 0)
.toShard(DEFAULT_SHARDS, DEFAULT_SHARD_ROTATION_INTERVAL_MINUTES);
static final int THOR_MIGRATION_SHARD = WriteInfo.tombstone(TABLE_THOR_MIGRATION, DEFAULT_CELL, 0)
.toShard(DEFAULT_SHARDS, DEFAULT_SHARD_ROTATION_INTERVAL_MINUTES);

int numShards;
int shardRotationIntervalMinutes;
long immutableTs;
long unreadableTs;
int shardCons;
Expand All @@ -84,6 +86,8 @@ public abstract class AbstractSweepQueueTest {
@BeforeEach
public void setup() {
numShards = DEFAULT_SHARDS;
shardRotationIntervalMinutes = DEFAULT_SHARD_ROTATION_INTERVAL_MINUTES;

unreadableTs = SweepQueueUtils.TS_COARSE_GRANULARITY * 5;
immutableTs = SweepQueueUtils.TS_COARSE_GRANULARITY * 5;

Expand All @@ -95,7 +99,7 @@ public void setup() {
spiedKvs.createTable(TABLE_THOR, metadataBytes(SweepStrategy.THOROUGH));
spiedKvs.createTable(TABLE_THOR_MIGRATION, metadataBytes(SweepStrategy.THOROUGH_MIGRATION));
spiedKvs.createTable(TABLE_NOTH, metadataBytes(SweepStrategy.NOTHING));
partitioner = new WriteInfoPartitioner(spiedKvs, () -> numShards);
partitioner = new WriteInfoPartitioner(spiedKvs, () -> numShards, () -> shardRotationIntervalMinutes);
txnService = TransactionServices.createV1TransactionService(spiedKvs);
}

Expand Down Expand Up @@ -141,7 +145,7 @@ int putTombstoneToDefaultCommitted(SweepQueueTable queueWriter, long timestamp,
private int write(SweepQueueTable writer, long ts, Cell cell, boolean isTombstone, TableReference tableRef) {
WriteInfo write = WriteInfo.of(WriteReference.of(tableRef, cell, isTombstone), ts);
writer.enqueue(ImmutableList.of(write));
return write.toShard(numShards);
return write.toShard(numShards, shardRotationIntervalMinutes);
}

void putTimestampIntoTransactionTable(long ts, long commitTs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1569,14 +1569,18 @@ private Map<Integer, Integer> enqueueAtLeastThresholdWritesInDefaultShardWithSta
List<WriteInfo> writeInfos = new ArrayList<>();
int counter = 0;
while (writeInfos.stream()
.filter(write -> write.toShard(DEFAULT_SHARDS) == CONS_SHARD)
.filter(write ->
write.toShard(DEFAULT_SHARDS, DEFAULT_SHARD_ROTATION_INTERVAL_MINUTES) == CONS_SHARD)
.count()
< threshold) {
writeInfos.addAll(generateHundredWrites(counter++, startTs));
}
sweepQueue.enqueue(writeInfos);
return writeInfos.stream()
.collect(Collectors.toMap(write -> write.toShard(DEFAULT_SHARDS), write -> 1, Integer::sum));
.collect(Collectors.toMap(
write -> write.toShard(DEFAULT_SHARDS, DEFAULT_SHARD_ROTATION_INTERVAL_MINUTES),
write -> 1,
Integer::sum));
}

private List<WriteInfo> generateHundredWrites(int startCol, long startTs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ CONSERVATIVE2, metadataBytes(TableMetadataPersistence.SweepStrategy.CONSERVATIVE
private KeyValueService mockKvs = mock(KeyValueService.class);
private WriteInfoPartitioner partitioner;
private int numShards = 128;
private int shardRotationIntervalMinutes = 1440;

@BeforeEach
public void setup() {
partitioner = new WriteInfoPartitioner(mockKvs, () -> numShards);
partitioner = new WriteInfoPartitioner(mockKvs, () -> numShards, () -> shardRotationIntervalMinutes);
when(mockKvs.getMetadataForTable(any(TableReference.class))).thenAnswer(args -> {
TableReference tableRef = args.getArgument(0);
return METADATA_MAP.getOrDefault(tableRef, AtlasDbConstants.EMPTY_TABLE_METADATA);
Expand Down Expand Up @@ -156,7 +157,10 @@ public void partitionWritesByShardStrategyTimestampGroupsOnShardClash() {
}
Map<PartitionInfo, List<WriteInfo>> partitions = partitioner.filterAndPartition(writes);
assertThat(partitions.keySet())
.containsExactly(PartitionInfo.of(writes.get(0).toShard(numShards), SweeperStrategy.CONSERVATIVE, 1L));
.containsExactly(PartitionInfo.of(
writes.get(0).toShard(numShards, shardRotationIntervalMinutes),
SweeperStrategy.CONSERVATIVE,
1L));
assertThat(Iterables.getOnlyElement(partitions.values())).containsExactlyElementsOf(writes);
}

Expand All @@ -181,7 +185,7 @@ public void cellsWithSameRowAndColumnNamesGetAssignedToShardsUniformly() {
int writes = 100_000;
Map<Integer, Long> result = IntStream.range(0, writes)
.mapToObj(index -> getWriteInfo(TABLE_CONS, index, index, 1L))
.map(writeInfo -> writeInfo.toShard(numShards))
.map(writeInfo -> writeInfo.toShard(numShards, shardRotationIntervalMinutes))
.collect(Collectors.groupingBy(shard -> shard, Collectors.counting()));

assertThat(result).hasSize(numShards);
Expand All @@ -196,7 +200,7 @@ private static TableReference getTableRef(String tableName) {
private WriteInfo getWriteInfoWithFixedShard(TableReference tableRef, int cellIndex, int numShards) {
return IntStream.iterate(0, i -> i + 1)
.mapToObj(index -> getWriteInfo(tableRef, cellIndex, index, 1L))
.filter(writeInfo -> writeInfo.toShard(numShards) == 0)
.filter(writeInfo -> writeInfo.toShard(numShards, shardRotationIntervalMinutes) == 0)
.findFirst()
.orElseThrow(() -> new RuntimeException("Infinite stream had no cell possibilities :("));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class WriteInfoTest {
private static final long ONE = 1L;
private static final long TWO = 2L;
private static final int SHARDS = 128;
private static final int SHARD_ROTATION_INTERVAL_MINUTES = 1440;

@Test
public void cellReferenceIgnoresTombstoneStatus() {
Expand All @@ -40,18 +41,19 @@ public void cellReferenceIgnoresTombstoneStatus() {
@Test
public void tombstoneStatusIsIgnoredForSharding() {
assertThat(getWriteAt(ONE)).isNotEqualTo(getTombstoneAt(ONE));
assertThat(getWriteAt(ONE).toShard(SHARDS))
.isEqualTo(getTombstoneAt(ONE).toShard(SHARDS));
assertThat(getWriteAt(ONE).toShard(SHARDS, SHARD_ROTATION_INTERVAL_MINUTES))
.isEqualTo(getTombstoneAt(ONE).toShard(SHARDS, SHARD_ROTATION_INTERVAL_MINUTES));
}

@Test
public void timestampIsIgnoredForSharding() {
assertThat(getWriteAt(ONE)).isNotEqualTo(getWriteAt(TWO));
assertThat(getTombstoneAt(ONE)).isNotEqualTo(getTombstoneAt(TWO));

assertThat(getWriteAt(ONE).toShard(SHARDS)).isEqualTo(getWriteAt(TWO).toShard(SHARDS));
assertThat(getTombstoneAt(ONE).toShard(SHARDS))
.isEqualTo(getTombstoneAt(TWO).toShard(SHARDS));
assertThat(getWriteAt(ONE).toShard(SHARDS, SHARD_ROTATION_INTERVAL_MINUTES))
.isEqualTo(getWriteAt(TWO).toShard(SHARDS, SHARD_ROTATION_INTERVAL_MINUTES));
assertThat(getTombstoneAt(ONE).toShard(SHARDS, SHARD_ROTATION_INTERVAL_MINUTES))
.isEqualTo(getTombstoneAt(TWO).toShard(SHARDS, SHARD_ROTATION_INTERVAL_MINUTES));
}

@Test
Expand Down

0 comments on commit e3fed6c

Please sign in to comment.