Skip to content

Commit

Permalink
Implement overlap diagnostics
Browse files Browse the repository at this point in the history
  • Loading branch information
blambov committed Dec 13, 2024
1 parent 321b0b6 commit 73851bd
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;

import org.slf4j.Logger;
Expand All @@ -48,6 +49,7 @@
import org.apache.cassandra.io.sstable.ScannerList;
import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Overlaps;

abstract class AbstractCompactionStrategy implements CompactionStrategy
{
Expand Down Expand Up @@ -397,4 +399,14 @@ public void periodicReport()
logger.statistics(this, "periodic", backgroundCompactions.getStatistics(this));
}
}

@Override
public Map<String, String> getMaxOverlapsMap()
{
final Set<? extends CompactionSSTable> liveSSTables = getSSTables();
return ImmutableMap.of("all", Integer.toString(Overlaps.maxOverlap(liveSSTables,
CompactionSSTable.startsAfter,
CompactionSSTable.firstKeyComparator,
CompactionSSTable.lastKeyComparator)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

Expand Down Expand Up @@ -201,4 +202,10 @@ default boolean supportsCursorCompaction()
}

void periodicReport();

/**
* Returns a map of sstable regions (e.g. repaired, unrepaired, possibly combined with level information) to the
* maximum overlap between the sstables in the region.
*/
Map<String, String> getMaxOverlapsMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -1154,4 +1156,16 @@ public void onCompleted(UUID id, boolean isSuccess)
{

}

@Override
public Map<String, String> getMaxOverlapsMap()
{
Map<String, String> result = new LinkedHashMap<>();

for (AbstractStrategyHolder holder : holders)
for (LegacyAbstractCompactionStrategy strategy : holder.allStrategies())
result.putAll(strategy.getMaxOverlapsMap());

return result;
}
}
104 changes: 62 additions & 42 deletions src/java/org/apache/cassandra/db/compaction/ShardManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

import org.apache.cassandra.db.DiskBoundaries;
Expand Down Expand Up @@ -100,9 +101,7 @@ else if (partitioner.splitter().isPresent())

/// Construct a boundary/shard iterator for the given number of shards.
///
/// Note: This does not offer a method of listing the shard boundaries it generates, just to advance to the
/// corresponding one for a given token. The only usage for listing is currently in tests. Should a need for this
/// arise, see `CompactionSimulationTest` for a possible implementation.
/// If a list of the ranges for each shard is required instead, use [#getShardRanges].
ShardTracker boundaries(int shardCount);

static Range<Token> coveringRange(CompactionSSTable sstable)
Expand Down Expand Up @@ -166,16 +165,24 @@ default double density(long onDiskLength, PartitionPosition min, PartitionPositi
return onDiskLength / adjustSmallSpans(span, approximatePartitionCount);
}


/// Seggregate the given sstables into the shard ranges that intersect sstables from the collection, and call
/// the given function on the combination of each shard range and the intersecting sstable set.
default <T, R extends CompactionSSTable> List<T> splitSSTablesInShards(Collection<R> sstables,
int numShardsForDensity,
BiFunction<Collection<R>, Range<Token>, T> maker)
/// the given function on the intersecting sstable set, with access to the shard tracker from which information
/// about the shard can be recovered.
///
/// If an operationRange is given, this method restricts the collection to the given range and assumes all sstables
/// cover at least some portion of that range.
private <R extends CompactionSSTable> void assignSSTablesInShards(Collection<R> sstables,
Range<Token> operationRange,
int numShardsForDensity,
BiConsumer<Collection<R>, ShardTracker> consumer)
{
var boundaries = boundaries(numShardsForDensity);
List<T> tasks = new ArrayList<>();
SortingIterator<R> items = SortingIterator.create(CompactionSSTable.firstKeyComparator, sstables);
PriorityQueue<R> active = new PriorityQueue<>(CompactionSSTable.lastKeyComparator);
// Advance inside the range. This will add all sstables that start before the end of the covering shard.
if (operationRange != null)
boundaries.advanceTo(operationRange.left.nextValidToken());
while (items.hasNext() || !active.isEmpty())
{
if (active.isEmpty())
Expand All @@ -184,21 +191,46 @@ default <T, R extends CompactionSSTable> List<T> splitSSTablesInShards(Collectio
active.add(items.next());
}
Token shardEnd = boundaries.shardEnd();
if (operationRange != null &&
!operationRange.right.isMinimum() &&
shardEnd != null &&
shardEnd.compareTo(operationRange.right) >= 0)
shardEnd = null; // Take all remaining sstables.

while (items.hasNext() && (shardEnd == null || items.peek().getFirst().getToken().compareTo(shardEnd) <= 0))
active.add(items.next());

final T result = maker.apply(active, boundaries.shardSpan());
if (result != null)
tasks.add(result);
consumer.accept(active, boundaries);

while (!active.isEmpty() && (shardEnd == null || active.peek().getLast().getToken().compareTo(shardEnd) <= 0))
active.poll();

if (!active.isEmpty()) // shardEnd must be non-null (otherwise the line above exhausts all)
boundaries.advanceTo(shardEnd.nextValidToken());
}
return tasks;
}

/// Seggregate the given sstables into the shard ranges that intersect sstables from the collection, and call
/// the given function on the combination of each shard index and the intersecting sstable set.
///
/// If an operationRange is given, this method restricts the collection to the given range and assumes all sstables
/// cover at least some portion of that range.
default <R extends CompactionSSTable> void assignSSTablesToShardIndexes(Collection<R> sstables,
Range<Token> operationRange,
int numShardsForDensity,
BiConsumer<Collection<R>, Integer> consumer)
{
assignSSTablesInShards(sstables, operationRange, numShardsForDensity,
(rangeSSTables, boundaries) -> consumer.accept(rangeSSTables, boundaries.shardIndex()));
}

/// Seggregate the given sstables into the shard ranges that intersect sstables from the collection, and call
/// the given function on the combination of each shard range and the intersecting sstable set.
default <T, R extends CompactionSSTable> List<T> splitSSTablesInShards(Collection<R> sstables,
int numShardsForDensity,
BiFunction<Collection<R>, Range<Token>, T> maker)
{
return splitSSTablesInShards(sstables, null, numShardsForDensity, maker);
}

/// Seggregate the given sstables into the shard ranges that intersect sstables from the collection, and call
Expand All @@ -211,39 +243,12 @@ default <T, R extends CompactionSSTable> List<T> splitSSTablesInShards(Collectio
int numShardsForDensity,
BiFunction<Collection<R>, Range<Token>, T> maker)
{
if (operationRange == null)
return splitSSTablesInShards(sstables, numShardsForDensity, maker);

var boundaries = boundaries(numShardsForDensity);
List<T> tasks = new ArrayList<>();
SortingIterator<R> items = SortingIterator.create(CompactionSSTable.firstKeyComparator, sstables);
PriorityQueue<R> active = new PriorityQueue<>(CompactionSSTable.lastKeyComparator);
// Advance inside the range. This will add all sstables that start before the end of the covering shard.
boundaries.advanceTo(operationRange.left.nextValidToken());
while (items.hasNext() || !active.isEmpty())
{
if (active.isEmpty())
{
boundaries.advanceTo(items.peek().getFirst().getToken());
active.add(items.next());
}
Token shardEnd = boundaries.shardEnd();
if (!operationRange.right.isMinimum() && shardEnd != null && shardEnd.compareTo(operationRange.right) >= 0)
shardEnd = null; // Take all remaining sstables.

while (items.hasNext() && (shardEnd == null || items.peek().getFirst().getToken().compareTo(shardEnd) <= 0))
active.add(items.next());

final T result = maker.apply(active, boundaries.shardSpan());
assignSSTablesInShards(sstables, operationRange, numShardsForDensity, (rangeSSTables, boundaries) -> {
final T result = maker.apply(rangeSSTables, boundaries.shardSpan());
if (result != null)
tasks.add(result);

while (!active.isEmpty() && (shardEnd == null || active.peek().getLast().getToken().compareTo(shardEnd) <= 0))
active.poll();

if (!active.isEmpty()) // shardEnd must be non-null (otherwise the line above exhausts all)
boundaries.advanceTo(shardEnd.nextValidToken());
}
});
return tasks;
}

Expand Down Expand Up @@ -336,4 +341,19 @@ default int coveredShardCount(PartitionPosition first, PartitionPosition last, i
int lastShard = boundaries.shardIndex();
return lastShard - firstShard + 1;
}

/// Get the list of shard ranges for the given shard count. Useful for diagnostics and debugging.
default List<Range<Token>> getShardRanges(int shardCount)
{
var boundaries = boundaries(shardCount);
var result = new ArrayList<Range<Token>>(shardCount);
while (true)
{
result.add(boundaries.shardSpan());
if (boundaries.shardEnd() == null)
break;
boundaries.advanceTo(boundaries.shardEnd().nextValidToken());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -358,6 +359,12 @@ public void periodicReport()
strategy.periodicReport();
}

@Override
public Map<String, String> getMaxOverlapsMap()
{
return strategy.getMaxOverlapsMap();
}

BackgroundCompactions getBackgroundCompactions()
{
return strategy.backgroundCompactions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ private void maybeUpdateSelector()
}
}

// used by CNDB
/// Get the current shard manager. Used internally, in tests and by CNDB.
public ShardManager getShardManager()
{
maybeUpdateSelector();
Expand Down Expand Up @@ -1237,6 +1237,63 @@ public Map<Arena, List<Level>> getLevels(Collection<? extends CompactionSSTable>
return ret;
}

/**
* Creates a map of maximum overlap, organized as a map from arena:level to the maximum number of sstables that
* overlap in that level, as well as a list showing the per-shard maximum overlap.
*
* The number of shards to list is calculated based on the maximum density of the sstables in the realm.
*/
@Override
public Map<String, String> getMaxOverlapsMap()
{
final Set<? extends CompactionSSTable> liveSSTables = realm.getLiveSSTables();
Map<UnifiedCompactionStrategy.Arena, List<UnifiedCompactionStrategy.Level>> arenas =
getLevels(liveSSTables, (i1, i2) -> true); // take all sstables

ShardManager shardManager = getShardManager();
Map<String, String> map = new LinkedHashMap<>();

// max general overlap (max # of sstables per query)
map.put("all", getMaxOverlapsPerShardString(liveSSTables, shardManager));

for (var arena : arenas.entrySet())
{
final String arenaName = arena.getKey().name();
for (var level : arena.getValue())
map.put(arenaName + "-L" + level.getIndex(), getMaxOverlapsPerShardString(level.getSSTables(), shardManager));
}
return map;
}

private String getMaxOverlapsPerShardString(Collection<? extends CompactionSSTable> sstables, ShardManager shardManager)
{
// Find the sstable with the biggest density to define the shard count.
// This is better than using a level's max bound as that will show more shards than there actually are.
double maxDensity = 0;
for (CompactionSSTable liveSSTable : sstables)
maxDensity = Math.max(maxDensity, shardManager.density(liveSSTable));
int shardCount = controller.getNumShards(maxDensity);

int[] overlapsMap = getMaxOverlapsPerShard(sstables, shardManager, shardCount);
int max = 0;
for (int i : overlapsMap)
max = Math.max(max, i);
return max + " (per shard: " + Arrays.toString(overlapsMap) + ")";
}

public static int[] getMaxOverlapsPerShard(Collection<? extends CompactionSSTable> sstables, ShardManager shardManager, int shardCount)
{
int[] overlapsMap = new int[shardCount];
shardManager.assignSSTablesToShardIndexes(sstables, null, shardCount,
(shardSSTables, shard) ->
overlapsMap[shard] = Overlaps.maxOverlap(shardSSTables,
CompactionSSTable.startsAfter,
CompactionSSTable.firstKeyComparator,
CompactionSSTable.lastKeyComparator));
// Indexes that do not have sstables are left with 0 overlaps.
return overlapsMap;
}

private static int levelOf(CompactionPick pick)
{
return (int) pick.parent();
Expand Down
22 changes: 22 additions & 0 deletions src/java/org/apache/cassandra/metrics/CompactionMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public class CompactionMetrics
/** Total number compactions that consisted of only expired SSTables */
public final Meter deleteOnlyCompactions;

public final Gauge<Map<String, Map<String, Map<String, String>>>> overlapsMap;

public CompactionMetrics(final ThreadPoolExecutor... collectors)
{
pendingTasks = Metrics.register(factory.createMetricName("PendingTasks"), () -> {
Expand Down Expand Up @@ -220,6 +222,26 @@ protected List<CompactionStrategyStatistics> loadValue()
}
});

overlapsMap = Metrics.register(factory.createMetricName("MaxOverlapsMap"),
new CachedGauge<Map<String, Map<String, Map<String, String>>>>(50, TimeUnit.MILLISECONDS)
{
public Map<String, Map<String, Map<String, String>>> loadValue()
{
Map<String, Map<String, Map<String, String>>> ret = new HashMap<>();
for (String keyspaceName : Schema.instance.getKeyspaces())
{
Map<String, Map<String, String>> ksMap = new HashMap<>();
ret.put(keyspaceName, ksMap);
for (ColumnFamilyStore cfs : Keyspace.open(keyspaceName).getColumnFamilyStores())
{
Map<String, String> overlaps = cfs.getCompactionStrategy().getMaxOverlapsMap();
ksMap.put(cfs.getTableName(), overlaps);
}
}
return ret;
}
});

runningCompactions = Metrics.register(factory.createMetricName("RunningCompactions"),
new DerivativeGauge<List<CompactionStrategyStatistics>, Integer>(aggregateCompactions)
{
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/tools/NodeProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -1659,6 +1659,7 @@ public Object getCompactionMetric(String metricName)
case "PendingTasksByTableName":
case "WriteAmplificationByTableName":
case "AggregateCompactions":
case "MaxOverlapsMap":
return JMX.newMBeanProxy(mbeanServerConn,
new ObjectName("org.apache.cassandra.metrics:type=Compaction,name=" + metricName),
CassandraMetricsRegistry.JmxGaugeMBean.class).getValue();
Expand Down
Loading

0 comments on commit 73851bd

Please sign in to comment.