From 73851bd0fb00191a32e2617e78632eb14e6779f1 Mon Sep 17 00:00:00 2001 From: Branimir Lambov Date: Wed, 4 Dec 2024 16:57:46 +0200 Subject: [PATCH] Implement overlap diagnostics --- .../AbstractCompactionStrategy.java | 12 ++ .../db/compaction/CompactionStrategy.java | 7 ++ .../compaction/CompactionStrategyManager.java | 14 +++ .../cassandra/db/compaction/ShardManager.java | 104 +++++++++++------- .../UnifiedCompactionContainer.java | 7 ++ .../compaction/UnifiedCompactionStrategy.java | 59 +++++++++- .../cassandra/metrics/CompactionMetrics.java | 22 ++++ .../org/apache/cassandra/tools/NodeProbe.java | 1 + .../tools/nodetool/CompactionStats.java | 28 +++++ .../org/apache/cassandra/utils/Overlaps.java | 12 +- .../UnifiedCompactionStrategyTest.java | 4 + 11 files changed, 221 insertions(+), 49 deletions(-) diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index a4bcb88b0e18..590e4ff4f3a8 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -28,6 +28,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import org.slf4j.Logger; @@ -48,6 +49,7 @@ import org.apache.cassandra.io.sstable.ScannerList; import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.Overlaps; abstract class AbstractCompactionStrategy implements CompactionStrategy { @@ -397,4 +399,14 @@ public void periodicReport() logger.statistics(this, "periodic", backgroundCompactions.getStatistics(this)); } } + + @Override + public Map getMaxOverlapsMap() + { + final Set liveSSTables = getSSTables(); + return ImmutableMap.of("all", Integer.toString(Overlaps.maxOverlap(liveSSTables, + CompactionSSTable.startsAfter, + CompactionSSTable.firstKeyComparator, + CompactionSSTable.lastKeyComparator))); + } } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategy.java index bba42782a98c..22ae13ef0480 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategy.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; @@ -201,4 +202,10 @@ default boolean supportsCursorCompaction() } void periodicReport(); + + /** + * Returns a map of sstable regions (e.g. repaired, unrepaired, possibly combined with level information) to the + * maximum overlap between the sstables in the region. + */ + Map getMaxOverlapsMap(); } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index bc05884e72ea..a5bd4370279b 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -24,7 +24,9 @@ import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -1154,4 +1156,16 @@ public void onCompleted(UUID id, boolean isSuccess) { } + + @Override + public Map getMaxOverlapsMap() + { + Map result = new LinkedHashMap<>(); + + for (AbstractStrategyHolder holder : holders) + for (LegacyAbstractCompactionStrategy strategy : holder.allStrategies()) + result.putAll(strategy.getMaxOverlapsMap()); + + return result; + } } diff --git a/src/java/org/apache/cassandra/db/compaction/ShardManager.java b/src/java/org/apache/cassandra/db/compaction/ShardManager.java index 776abd47e841..0915982a7d64 100644 --- a/src/java/org/apache/cassandra/db/compaction/ShardManager.java +++ b/src/java/org/apache/cassandra/db/compaction/ShardManager.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.PriorityQueue; import java.util.Set; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import org.apache.cassandra.db.DiskBoundaries; @@ -100,9 +101,7 @@ else if (partitioner.splitter().isPresent()) /// Construct a boundary/shard iterator for the given number of shards. /// - /// Note: This does not offer a method of listing the shard boundaries it generates, just to advance to the - /// corresponding one for a given token. The only usage for listing is currently in tests. Should a need for this - /// arise, see `CompactionSimulationTest` for a possible implementation. + /// If a list of the ranges for each shard is required instead, use [#getShardRanges]. ShardTracker boundaries(int shardCount); static Range coveringRange(CompactionSSTable sstable) @@ -166,16 +165,24 @@ default double density(long onDiskLength, PartitionPosition min, PartitionPositi return onDiskLength / adjustSmallSpans(span, approximatePartitionCount); } + /// Seggregate the given sstables into the shard ranges that intersect sstables from the collection, and call - /// the given function on the combination of each shard range and the intersecting sstable set. - default List splitSSTablesInShards(Collection sstables, - int numShardsForDensity, - BiFunction, Range, T> maker) + /// the given function on the intersecting sstable set, with access to the shard tracker from which information + /// about the shard can be recovered. + /// + /// If an operationRange is given, this method restricts the collection to the given range and assumes all sstables + /// cover at least some portion of that range. + private void assignSSTablesInShards(Collection sstables, + Range operationRange, + int numShardsForDensity, + BiConsumer, ShardTracker> consumer) { var boundaries = boundaries(numShardsForDensity); - List tasks = new ArrayList<>(); SortingIterator items = SortingIterator.create(CompactionSSTable.firstKeyComparator, sstables); PriorityQueue active = new PriorityQueue<>(CompactionSSTable.lastKeyComparator); + // Advance inside the range. This will add all sstables that start before the end of the covering shard. + if (operationRange != null) + boundaries.advanceTo(operationRange.left.nextValidToken()); while (items.hasNext() || !active.isEmpty()) { if (active.isEmpty()) @@ -184,13 +191,16 @@ default List splitSSTablesInShards(Collectio active.add(items.next()); } Token shardEnd = boundaries.shardEnd(); + if (operationRange != null && + !operationRange.right.isMinimum() && + shardEnd != null && + shardEnd.compareTo(operationRange.right) >= 0) + shardEnd = null; // Take all remaining sstables. while (items.hasNext() && (shardEnd == null || items.peek().getFirst().getToken().compareTo(shardEnd) <= 0)) active.add(items.next()); - final T result = maker.apply(active, boundaries.shardSpan()); - if (result != null) - tasks.add(result); + consumer.accept(active, boundaries); while (!active.isEmpty() && (shardEnd == null || active.peek().getLast().getToken().compareTo(shardEnd) <= 0)) active.poll(); @@ -198,7 +208,29 @@ default List splitSSTablesInShards(Collectio if (!active.isEmpty()) // shardEnd must be non-null (otherwise the line above exhausts all) boundaries.advanceTo(shardEnd.nextValidToken()); } - return tasks; + } + + /// Seggregate the given sstables into the shard ranges that intersect sstables from the collection, and call + /// the given function on the combination of each shard index and the intersecting sstable set. + /// + /// If an operationRange is given, this method restricts the collection to the given range and assumes all sstables + /// cover at least some portion of that range. + default void assignSSTablesToShardIndexes(Collection sstables, + Range operationRange, + int numShardsForDensity, + BiConsumer, Integer> consumer) + { + assignSSTablesInShards(sstables, operationRange, numShardsForDensity, + (rangeSSTables, boundaries) -> consumer.accept(rangeSSTables, boundaries.shardIndex())); + } + + /// Seggregate the given sstables into the shard ranges that intersect sstables from the collection, and call + /// the given function on the combination of each shard range and the intersecting sstable set. + default List splitSSTablesInShards(Collection sstables, + int numShardsForDensity, + BiFunction, Range, T> maker) + { + return splitSSTablesInShards(sstables, null, numShardsForDensity, maker); } /// Seggregate the given sstables into the shard ranges that intersect sstables from the collection, and call @@ -211,39 +243,12 @@ default List splitSSTablesInShards(Collectio int numShardsForDensity, BiFunction, Range, T> maker) { - if (operationRange == null) - return splitSSTablesInShards(sstables, numShardsForDensity, maker); - - var boundaries = boundaries(numShardsForDensity); List tasks = new ArrayList<>(); - SortingIterator items = SortingIterator.create(CompactionSSTable.firstKeyComparator, sstables); - PriorityQueue active = new PriorityQueue<>(CompactionSSTable.lastKeyComparator); - // Advance inside the range. This will add all sstables that start before the end of the covering shard. - boundaries.advanceTo(operationRange.left.nextValidToken()); - while (items.hasNext() || !active.isEmpty()) - { - if (active.isEmpty()) - { - boundaries.advanceTo(items.peek().getFirst().getToken()); - active.add(items.next()); - } - Token shardEnd = boundaries.shardEnd(); - if (!operationRange.right.isMinimum() && shardEnd != null && shardEnd.compareTo(operationRange.right) >= 0) - shardEnd = null; // Take all remaining sstables. - - while (items.hasNext() && (shardEnd == null || items.peek().getFirst().getToken().compareTo(shardEnd) <= 0)) - active.add(items.next()); - - final T result = maker.apply(active, boundaries.shardSpan()); + assignSSTablesInShards(sstables, operationRange, numShardsForDensity, (rangeSSTables, boundaries) -> { + final T result = maker.apply(rangeSSTables, boundaries.shardSpan()); if (result != null) tasks.add(result); - - while (!active.isEmpty() && (shardEnd == null || active.peek().getLast().getToken().compareTo(shardEnd) <= 0)) - active.poll(); - - if (!active.isEmpty()) // shardEnd must be non-null (otherwise the line above exhausts all) - boundaries.advanceTo(shardEnd.nextValidToken()); - } + }); return tasks; } @@ -336,4 +341,19 @@ default int coveredShardCount(PartitionPosition first, PartitionPosition last, i int lastShard = boundaries.shardIndex(); return lastShard - firstShard + 1; } + + /// Get the list of shard ranges for the given shard count. Useful for diagnostics and debugging. + default List> getShardRanges(int shardCount) + { + var boundaries = boundaries(shardCount); + var result = new ArrayList>(shardCount); + while (true) + { + result.add(boundaries.shardSpan()); + if (boundaries.shardEnd() == null) + break; + boundaries.advanceTo(boundaries.shardEnd().nextValidToken()); + } + return result; + } } diff --git a/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionContainer.java b/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionContainer.java index 6795e4bd983c..079722fbf3ce 100644 --- a/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionContainer.java +++ b/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionContainer.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -358,6 +359,12 @@ public void periodicReport() strategy.periodicReport(); } + @Override + public Map getMaxOverlapsMap() + { + return strategy.getMaxOverlapsMap(); + } + BackgroundCompactions getBackgroundCompactions() { return strategy.backgroundCompactions; diff --git a/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java index 5669a4ce48dd..1a7fce33b338 100644 --- a/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java @@ -728,7 +728,7 @@ private void maybeUpdateSelector() } } - // used by CNDB + /// Get the current shard manager. Used internally, in tests and by CNDB. public ShardManager getShardManager() { maybeUpdateSelector(); @@ -1237,6 +1237,63 @@ public Map> getLevels(Collection return ret; } + /** + * Creates a map of maximum overlap, organized as a map from arena:level to the maximum number of sstables that + * overlap in that level, as well as a list showing the per-shard maximum overlap. + * + * The number of shards to list is calculated based on the maximum density of the sstables in the realm. + */ + @Override + public Map getMaxOverlapsMap() + { + final Set liveSSTables = realm.getLiveSSTables(); + Map> arenas = + getLevels(liveSSTables, (i1, i2) -> true); // take all sstables + + ShardManager shardManager = getShardManager(); + Map map = new LinkedHashMap<>(); + + // max general overlap (max # of sstables per query) + map.put("all", getMaxOverlapsPerShardString(liveSSTables, shardManager)); + + for (var arena : arenas.entrySet()) + { + final String arenaName = arena.getKey().name(); + for (var level : arena.getValue()) + map.put(arenaName + "-L" + level.getIndex(), getMaxOverlapsPerShardString(level.getSSTables(), shardManager)); + } + return map; + } + + private String getMaxOverlapsPerShardString(Collection sstables, ShardManager shardManager) + { + // Find the sstable with the biggest density to define the shard count. + // This is better than using a level's max bound as that will show more shards than there actually are. + double maxDensity = 0; + for (CompactionSSTable liveSSTable : sstables) + maxDensity = Math.max(maxDensity, shardManager.density(liveSSTable)); + int shardCount = controller.getNumShards(maxDensity); + + int[] overlapsMap = getMaxOverlapsPerShard(sstables, shardManager, shardCount); + int max = 0; + for (int i : overlapsMap) + max = Math.max(max, i); + return max + " (per shard: " + Arrays.toString(overlapsMap) + ")"; + } + + public static int[] getMaxOverlapsPerShard(Collection sstables, ShardManager shardManager, int shardCount) + { + int[] overlapsMap = new int[shardCount]; + shardManager.assignSSTablesToShardIndexes(sstables, null, shardCount, + (shardSSTables, shard) -> + overlapsMap[shard] = Overlaps.maxOverlap(shardSSTables, + CompactionSSTable.startsAfter, + CompactionSSTable.firstKeyComparator, + CompactionSSTable.lastKeyComparator)); + // Indexes that do not have sstables are left with 0 overlaps. + return overlapsMap; + } + private static int levelOf(CompactionPick pick) { return (int) pick.parent(); diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java index c11476122076..b7e487e770eb 100644 --- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java @@ -101,6 +101,8 @@ public class CompactionMetrics /** Total number compactions that consisted of only expired SSTables */ public final Meter deleteOnlyCompactions; + public final Gauge>>> overlapsMap; + public CompactionMetrics(final ThreadPoolExecutor... collectors) { pendingTasks = Metrics.register(factory.createMetricName("PendingTasks"), () -> { @@ -220,6 +222,26 @@ protected List loadValue() } }); + overlapsMap = Metrics.register(factory.createMetricName("MaxOverlapsMap"), + new CachedGauge>>>(50, TimeUnit.MILLISECONDS) + { + public Map>> loadValue() + { + Map>> ret = new HashMap<>(); + for (String keyspaceName : Schema.instance.getKeyspaces()) + { + Map> ksMap = new HashMap<>(); + ret.put(keyspaceName, ksMap); + for (ColumnFamilyStore cfs : Keyspace.open(keyspaceName).getColumnFamilyStores()) + { + Map overlaps = cfs.getCompactionStrategy().getMaxOverlapsMap(); + ksMap.put(cfs.getTableName(), overlaps); + } + } + return ret; + } + }); + runningCompactions = Metrics.register(factory.createMetricName("RunningCompactions"), new DerivativeGauge, Integer>(aggregateCompactions) { diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 75b6a2775205..0d0a662ee8cb 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1659,6 +1659,7 @@ public Object getCompactionMetric(String metricName) case "PendingTasksByTableName": case "WriteAmplificationByTableName": case "AggregateCompactions": + case "MaxOverlapsMap": return JMX.newMBeanProxy(mbeanServerConn, new ObjectName("org.apache.cassandra.metrics:type=Compaction,name=" + metricName), CassandraMetricsRegistry.JmxGaugeMBean.class).getValue(); diff --git a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java index 659fc27d5afa..6ce0bbb42f1c 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java +++ b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java @@ -49,6 +49,11 @@ public class CompactionStats extends NodeToolCmd description = "Show the compaction aggregates for the compactions in progress, e.g. the levels for LCS or the buckets for STCS and TWCS.") private boolean aggregate = false; + @Option(title = "overlap", + name = {"-O", "--overlap"}, + description = "Show a map of the maximum sstable overlap per compaction region.") + private boolean overlap = false; + @Override public void execute(NodeProbe probe) { @@ -94,6 +99,9 @@ public void execute(NodeProbe probe) { reportAggregateCompactions(probe); } + + if (overlap) + reportOverlap((Map>>) probe.getCompactionMetric("MaxOverlapsMap")); } public static void reportCompactionTable(List> compactions, int compactionThroughput, boolean humanReadable, PrintStream out) @@ -142,4 +150,24 @@ private static void reportAggregateCompactions(NodeProbe probe) for (CompactionStrategyStatistics stat : statistics) System.out.println(stat.toString()); } + + private static void reportOverlap(Map>> maxOverlap) + { + if (maxOverlap == null) + System.out.println("Overlap map is not available."); + + for (Map.Entry>> ksEntry : maxOverlap.entrySet()) + { + String ksName = ksEntry.getKey(); + for (Map.Entry> tableEntry : ksEntry.getValue().entrySet()) + { + String tableName = tableEntry.getKey(); + System.out.println("Max overlap map for " + ksName + "." + tableName + ":"); + for (Map.Entry compactionEntry : tableEntry.getValue().entrySet()) + { + System.out.println(" " + compactionEntry.getKey() + ": " + compactionEntry.getValue()); + } + } + } + } } diff --git a/src/java/org/apache/cassandra/utils/Overlaps.java b/src/java/org/apache/cassandra/utils/Overlaps.java index e8844f75bd89..9aae6072f06e 100644 --- a/src/java/org/apache/cassandra/utils/Overlaps.java +++ b/src/java/org/apache/cassandra/utils/Overlaps.java @@ -84,9 +84,9 @@ public static List> constructOverlapSets(Collection items, /// @param endsComparator Comparator of items' ending positions. /// @return The maximum overlap in the given set of items. public static int maxOverlap(Collection items, - BiPredicate startsAfter, - Comparator startsComparator, - Comparator endsComparator) + BiPredicate startsAfter, + Comparator startsComparator, + Comparator endsComparator) { return constructOverlapSets(items, startsAfter, startsComparator, endsComparator, (max, active) -> Math.max(max, active.size()), 0); @@ -119,9 +119,9 @@ public static int maxOverlap(Collection items, /// @param initialValue Initial value for the reducer. /// @return The result of processing the overlap sets. public static R constructOverlapSets(Collection items, - BiPredicate startsAfter, - Comparator startsComparator, - Comparator endsComparator, + BiPredicate startsAfter, + Comparator startsComparator, + Comparator endsComparator, BiFunction, R> reducer, R initialValue) { diff --git a/test/unit/org/apache/cassandra/db/compaction/UnifiedCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/UnifiedCompactionStrategyTest.java index c743d545570e..9c00d88a2a21 100644 --- a/test/unit/org/apache/cassandra/db/compaction/UnifiedCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/UnifiedCompactionStrategyTest.java @@ -1976,6 +1976,10 @@ public void testBucketSelection(int[] counts, int[] expecteds, Overlaps.Inclusio } } } + + Mockito.when(controller.getNumShards(anyDouble())).thenReturn(16); // co-prime with counts to ensure multiple sstables fall in each shard + System.out.println(strategy.getMaxOverlapsMap()); + dataTracker.removeUnsafe(allSSTables); }