Skip to content

Commit

Permalink
STAR-1872: Parallelize UCS compactions per output shard (#1342)
Browse files Browse the repository at this point in the history
This splits compactions that are to produce more than one
output sstable into tasks that can execute in parallel.
Such tasks share a transaction and have combined progress
and observer. Because we cannot mark parts of an sstable
as unneeded, the transaction is only applied when all
tasks have succeeded. This also means that early open
is not supported for such tasks.

The parallelization also takes into account thread reservations,
reducing the parallelism to the number of available threads
for its level. The new functionality is turned on by default.

Major compactions will apply the same mechanism to
parallelize the operation. They will only split on pre-
existing boundary points if they are also boundary
points for the current UCS configuration. This is done
to ensure that major compactions can re-shard data when
the configuration is changed. If pre-existing boundaries
match the current state, a major compaction will still be
broken into multiple operations to reduce the space
overhead of the operation.

Also:
- Introduces a parallelism parameter to major compactions
  (`nodetool compact -j <threads>`, defaulting to half the
  compaction threads) to avoid stopping all other compaction
  for the duration.

- Changes SSTable expiration to be done in a separate
  `getNextBackgroundCompactions` round to improve the
  efficiency of expiration (separate task can run quickly
  and remove the relevant sstables without waiting for
  a compaction to end).

- Applies small-partition-count correction in
  `ShardManager.calculateCombinedDensity`.
  • Loading branch information
blambov authored Dec 13, 2024
1 parent 80e0d45 commit 321b0b6
Show file tree
Hide file tree
Showing 116 changed files with 6,321 additions and 1,370 deletions.
1 change: 1 addition & 0 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1645,6 +1645,7 @@
<jvmarg value="-Dcassandra.tolerate_sstable_size=true"/>
<jvmarg value="-Dcassandra.skip_sync=true" />
<jvmarg value="-Dcassandra.allow_cursor_compaction=false" />
<jvmarg value="-Dunified_compaction.parallelize_output_shards=false" />
<jvmarg value="-Dlogback.configurationFile=file://${test.logback.configurationFile}"/>
</testmacrohelper>
</sequential>
Expand Down
12 changes: 8 additions & 4 deletions src/java/org/apache/cassandra/cache/KeyCacheKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Arrays;
import java.util.Objects;

import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
Expand All @@ -31,9 +30,7 @@ public class KeyCacheKey extends CacheKey
{
public final Descriptor desc;

private static final long EMPTY_SIZE = ObjectSizes.measure(new KeyCacheKey(TableMetadata.builder("ks", "tab")
.addPartitionKeyColumn("pk", UTF8Type.instance)
.build(), null, ByteBufferUtil.EMPTY_BYTE_BUFFER));
private static final long EMPTY_SIZE = ObjectSizes.measure(new KeyCacheKey());

// keeping an array instead of a ByteBuffer lowers the overhead of the key cache working set,
// without extra copies on lookup since client-provided key ByteBuffers will be array-backed already
Expand All @@ -47,6 +44,13 @@ public KeyCacheKey(TableMetadata tableMetadata, Descriptor desc, ByteBuffer key)
assert this.key != null;
}

private KeyCacheKey() // Only for EMPTY_SIZE
{
super(null, null);
this.desc = null;
this.key = null;
}

public String toString()
{
return String.format("KeyCacheKey(%s, %s)", desc, ByteBufferUtil.bytesToHex(ByteBuffer.wrap(key)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,11 @@ public enum CassandraRelevantProperties
*/
COMPACTION_SKIP_REPAIR_STATE_CHECKING("cassandra.compaction.skip_repair_state_checking", "false"),

/**
* If this is true, compaction will not verify that sstables selected for compaction are marked compacted.
*/
COMPACTION_SKIP_COMPACTING_STATE_CHECKING("cassandra.compaction.skip_compacting_state_checking", "false"),

/**
* If true, the searcher object created when opening a SAI index will be replaced by a dummy object and index
* are never marked queriable (querying one will fail). This is obviously usually undesirable, but can be used if
Expand Down
11 changes: 11 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,7 @@ public Collection<SSTableReader> flushMemtable(ColumnFamilyStore cfs, Memtable m
}

Throwable accumulate = null;

for (SSTableMultiWriter writer : flushResults)
{
accumulate = writer.commit(accumulate);
Expand Down Expand Up @@ -2455,6 +2456,16 @@ public void forceMajorCompaction(boolean splitOutput)
CompactionManager.instance.performMaximal(this, splitOutput);
}

public void forceMajorCompaction(int parallelism)
{
CompactionManager.instance.performMaximal(this, false, parallelism);
}

public void forceMajorCompaction(boolean splitOutput, int parallelism)
{
CompactionManager.instance.performMaximal(this, splitOutput, parallelism);
}

public void forceCompactionForTokenRange(Collection<Range<Token>> tokenRanges) throws ExecutionException, InterruptedException
{
CompactionManager.instance.forceCompactionForTokenRange(this, tokenRanges);
Expand Down
9 changes: 9 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ public interface ColumnFamilyStoreMBean
*/
public void forceMajorCompaction(boolean splitOutput) throws ExecutionException, InterruptedException;

/**
* force a major compaction of this column family
*
* @param permittedParallelism The maximum number of compaction threads that can be used by the operation.
* If 0, the operation can use all available threads.
* If <0, the default parallelism will be used.
*/
public void forceMajorCompaction(int permittedParallelism) throws ExecutionException, InterruptedException;

/**
* Forces a major compaction of specified token ranges in this column family.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,16 @@ public void shutdown()
}

/**
* @param gcBefore throw away tombstones older than this
*
* @param gcBefore throw away tombstones older than this
* @param permittedParallelism the maximum permitted parallelism for the operation
* @return a compaction task that should be run to compact this columnfamilystore
* as much as possible. Null if nothing to do.
*
* <p>
* Is responsible for marking its sstables as compaction-pending.
*/
@Override
@SuppressWarnings("resource")
public synchronized CompactionTasks getMaximalTasks(int gcBefore, boolean splitOutput)
public synchronized CompactionTasks getMaximalTasks(int gcBefore, boolean splitOutput, int permittedParallelism)
{
Iterable<? extends CompactionSSTable> filteredSSTables = Iterables.filter(getSSTables(), sstable -> !sstable.isMarkedSuspect());
if (Iterables.isEmpty(filteredSSTables))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@
import com.google.common.base.Preconditions;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.io.FSDiskFullWriteError;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;

import static com.google.common.base.Throwables.propagate;

Expand All @@ -43,9 +41,11 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
// See CNDB-10549
static final boolean SKIP_REPAIR_STATE_CHECKING =
CassandraRelevantProperties.COMPACTION_SKIP_REPAIR_STATE_CHECKING.getBoolean();
static final boolean SKIP_COMPACTING_STATE_CHECKING =
CassandraRelevantProperties.COMPACTION_SKIP_COMPACTING_STATE_CHECKING.getBoolean();

protected final CompactionRealm realm;
protected LifecycleTransaction transaction;
protected ILifecycleTransaction transaction;
protected boolean isUserDefined;
protected OperationType compactionType;
protected TableOperationObserver opObserver;
Expand All @@ -55,7 +55,7 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
* @param realm
* @param transaction the modifying managing the status of the sstables we're replacing
*/
protected AbstractCompactionTask(CompactionRealm realm, LifecycleTransaction transaction)
protected AbstractCompactionTask(CompactionRealm realm, ILifecycleTransaction transaction)
{
this.realm = realm;
this.transaction = transaction;
Expand All @@ -66,10 +66,13 @@ protected AbstractCompactionTask(CompactionRealm realm, LifecycleTransaction tra

try
{
// enforce contract that caller should mark sstables compacting
Set<SSTableReader> compacting = transaction.getCompacting();
for (SSTableReader sstable : transaction.originals())
assert compacting.contains(sstable) : sstable.getFilename() + " is not correctly marked compacting";
if (!SKIP_COMPACTING_STATE_CHECKING && !transaction.isOffline())
{
// enforce contract that caller should mark sstables compacting
var compacting = realm.getCompactingSSTables();
for (SSTableReader sstable : transaction.originals())
assert compacting.contains(sstable) : sstable.getFilename() + " is not correctly marked compacting";
}

validateSSTables(transaction.originals());
}
Expand Down Expand Up @@ -120,18 +123,18 @@ private void validateSSTables(Set<SSTableReader> sstables)
* Executes the task after setting a new observer, normally the observer is the
* compaction manager metrics.
*/
public int execute(TableOperationObserver observer)
public void execute(TableOperationObserver observer)
{
return setOpObserver(observer).execute();
setOpObserver(observer).execute();
}

/** Executes the task */
public int execute()
public void execute()
{
Throwable t = null;
try
{
return executeInternal();
executeInternal();
}
catch (FSDiskFullWriteError e)
{
Expand All @@ -151,7 +154,12 @@ public int execute()
}
}

public Throwable cleanup(Throwable err)
public Throwable rejected(Throwable t)
{
return cleanup(t);
}

protected Throwable cleanup(Throwable err)
{
final boolean isSuccess = err == null;
for (CompactionObserver compObserver : compObservers)
Expand All @@ -160,22 +168,11 @@ public Throwable cleanup(Throwable err)
return Throwables.perform(err, () -> transaction.close());
}

public abstract CompactionAwareWriter getCompactionAwareWriter(CompactionRealm realm, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);

@VisibleForTesting
public LifecycleTransaction getTransaction()
{
return transaction;
}

@VisibleForTesting
public OperationType getCompactionType()
protected void executeInternal()
{
return compactionType;
run();
}

protected abstract int executeInternal();

// TODO Eventually these three setters should be passed in to the constructor.

public AbstractCompactionTask setUserDefined(boolean isUserDefined)
Expand All @@ -184,6 +181,14 @@ public AbstractCompactionTask setUserDefined(boolean isUserDefined)
return this;
}

/**
* @return The type of compaction this task is performing. Used by CNDB.
*/
public OperationType getCompactionType()
{
return compactionType;
}

public AbstractCompactionTask setCompactionType(OperationType compactionType)
{
this.compactionType = compactionType;
Expand All @@ -204,14 +209,18 @@ public void addObserver(CompactionObserver compObserver)
compObservers.add(compObserver);
}

@VisibleForTesting
/**
* @return The compaction observers for this task. Used by CNDB.
*/
public List<CompactionObserver> getCompObservers()
{
return compObservers;
}

@VisibleForTesting
public LifecycleTransaction transaction()
/**
* Return the transaction that this task is working on. Used by CNDB as well as tests.
*/
public ILifecycleTransaction getTransaction()
{
return transaction;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public boolean managesSSTable(CompactionSSTable sstable)

public abstract Collection<TasksSupplier> getBackgroundTaskSuppliers(int gcBefore);

public abstract Collection<AbstractCompactionTask> getMaximalTasks(int gcBefore, boolean splitOutput);
public abstract Collection<AbstractCompactionTask> getMaximalTasks(int gcBefore, boolean splitOutput, int permittedParallelism);

public abstract Collection<AbstractCompactionTask> getUserDefinedTasks(GroupedSSTableContainer<CompactionSSTable> sstables, int gcBefore);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,12 @@ CompletableFuture<?>[] startCompactionTasks(ColumnFamilyStore cfs, Collection<Ab
if (!compactionTasks.isEmpty())
{
logger.debug("Running compaction tasks: {}", compactionTasks);
return compactionTasks.stream()
.map(task -> startTask(cfs, task))
.toArray(CompletableFuture<?>[]::new);
CompletableFuture<Void>[] arr = new CompletableFuture[compactionTasks.size()];
int index = 0;
for (AbstractCompactionTask task : compactionTasks)
arr[index++] = startTask(cfs, task);

return arr;
}
else
{
Expand Down
Loading

0 comments on commit 321b0b6

Please sign in to comment.