Skip to content

Commit

Permalink
CNDB-11339 Make CommitLog.add async
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Oct 20, 2024
1 parent 50331dc commit bcf9e6d
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 70 deletions.
38 changes: 18 additions & 20 deletions src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
Expand All @@ -39,32 +41,29 @@ public CassandraKeyspaceWriteHandler(Keyspace keyspace)

@Override
@SuppressWarnings("resource") // group is closed when CassandraWriteContext is closed
public WriteContext beginWrite(Mutation mutation, WriteOptions writeOptions) throws RequestExecutionException
public CompletableFuture<WriteContext> beginWrite(Mutation mutation, WriteOptions writeOptions) throws RequestExecutionException
{
OpOrder.Group group = null;
try
{
group = Keyspace.writeOrder.start();
OpOrder.Group group = Keyspace.writeOrder.start();

// write the mutation to the commitlog and memtables
CommitLogPosition position = null;
CompletableFuture<CommitLogPosition> position = null;
if (writeOptions.shouldWriteCommitLog(mutation.getKeyspaceName()))
{
position = addToCommitLog(mutation);
} else {
position = CompletableFuture.completedFuture(null);
}
return new CassandraWriteContext(group, position);
}
catch (Throwable t)
{
if (group != null)
{
group.close();
}
throw t;
}
return position.handle((p, error) -> {
if (error != null)
{
group.close();
throw new CompletionException(error);
}
return new CassandraWriteContext(group, p);
});
}

private CommitLogPosition addToCommitLog(Mutation mutation)
private CompletableFuture<CommitLogPosition> addToCommitLog(Mutation mutation)
{
CommitLogPosition position;
// Usually one of these will be true, so first check if that's the case.
Expand All @@ -81,7 +80,7 @@ private CommitLogPosition addToCommitLog(Mutation mutation)
if (!noneSkipCommitlog)
{
if (allSkipCommitlog)
return null;
return CompletableFuture.completedFuture(null);
else
{
Set<TableId> ids = new HashSet<>();
Expand All @@ -97,8 +96,7 @@ private CommitLogPosition addToCommitLog(Mutation mutation)
// or memoize the mutation.getTableIds()->ids map (needs invalidation on schema version change).

Tracing.trace("Appending to commitlog");
position = CommitLog.instance.add(mutation);
return position;
return CommitLog.instance.addAsync(mutation);
}

@SuppressWarnings("resource") // group is closed when CassandraWriteContext is closed
Expand Down
131 changes: 82 additions & 49 deletions src/java/org/apache/cassandra/db/Keyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -590,7 +591,8 @@ private CompletableFuture<?> applyInternal(final Mutation mutation,
if (writeBarrier != null)
return failDueToWriteBarrier(mutation, future);

Lock[] locks = null;
final Collection<TableId> tableIds = mutation.getTableIds();
final Lock[] locks = new Lock[tableIds.size()];

boolean requiresViewUpdate = writeOptions.requiresViewUpdate(viewManager, mutation);

Expand All @@ -599,10 +601,9 @@ private CompletableFuture<?> applyInternal(final Mutation mutation,
mutation.viewLockAcquireStart.compareAndSet(0L, System.currentTimeMillis());

// the order of lock acquisition doesn't matter (from a deadlock perspective) because we only use tryLock()
Collection<TableId> tableIds = mutation.getTableIds();

Iterator<TableId> idIterator = tableIds.iterator();

locks = new Lock[tableIds.size()];
for (int i = 0; i < tableIds.size(); i++)
{
TableId tableId = idIterator.next();
Expand Down Expand Up @@ -683,54 +684,86 @@ else if (isDeferrable)
columnFamilyStores.get(tableId).metric.viewLockAcquireTime.update(acquireTime, MILLISECONDS);
}
}
try (WriteContext ctx = getWriteHandler().beginWrite(mutation, writeOptions))
CompletableFuture<WriteContext> future2 = getWriteHandler().beginWrite(mutation, writeOptions) // this is writing to the commitlog
.whenComplete((ctx, error) -> {
try
{
if (error != null)
{
JVMStabilityInspector.inspectThrowable(error);
logger.error("Failed to write commit log for mutation", error);
if (future != null)
{
future.completeExceptionally(error);
}
return;
}
for (PartitionUpdate upd : mutation.getPartitionUpdates())
{
ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().id);
if (cfs == null)
{
logger.error("Attempting to mutate non-existant table {} ({}.{})", upd.metadata().id, upd.metadata().keyspace, upd.metadata().name);
continue;
}
AtomicLong baseComplete = new AtomicLong(Long.MAX_VALUE);

if (requiresViewUpdate)
{
try
{
Tracing.trace("Creating materialized view mutations from base table replica");
viewManager.forTable(upd.metadata().id).pushViewReplicaUpdates(upd, writeOptions, baseComplete);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
logger.error(String.format("Unknown exception caught while attempting to update MaterializedView! %s",
upd.metadata().toString()), t);
throw t;
}
}

cfs.getWriteHandler().write(upd, ctx, writeOptions.updateIndexes);

if (requiresViewUpdate)
baseComplete.set(System.currentTimeMillis());
}

if (future != null)
{
future.complete(null);
}
}
catch (Throwable error2)
{
JVMStabilityInspector.inspectThrowable(error2);
logger.error("Failed to apply mutation", error2);
if (future != null)
{
future.completeExceptionally(error2);
}
}
finally
{
try
{
ctx.close();
} finally
{
for (Lock lock : locks)
if (lock != null)
lock.unlock();
}
}
});
if (future == null)
{
for (PartitionUpdate upd : mutation.getPartitionUpdates())
{
ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().id);
if (cfs == null)
{
logger.error("Attempting to mutate non-existant table {} ({}.{})", upd.metadata().id, upd.metadata().keyspace, upd.metadata().name);
continue;
}
AtomicLong baseComplete = new AtomicLong(Long.MAX_VALUE);

if (requiresViewUpdate)
{
try
{
Tracing.trace("Creating materialized view mutations from base table replica");
viewManager.forTable(upd.metadata().id).pushViewReplicaUpdates(upd, writeOptions, baseComplete);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
logger.error(String.format("Unknown exception caught while attempting to update MaterializedView! %s",
upd.metadata().toString()), t);
throw t;
}
}

cfs.getWriteHandler().write(upd, ctx, writeOptions.updateIndexes);

if (requiresViewUpdate)
baseComplete.set(System.currentTimeMillis());
}

if (future != null) {
future.complete(null);
}
return future;
}
finally
{
if (locks != null)
{
for (Lock lock : locks)
if (lock != null)
lock.unlock();
}
// TODO rework this
future2.join();
return null;
}
return future;
}

private CompletableFuture<?> failDueToWriteBarrier(Mutation mutation, CompletableFuture<?> future)
Expand Down
4 changes: 3 additions & 1 deletion src/java/org/apache/cassandra/db/KeyspaceWriteHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

package org.apache.cassandra.db;

import java.util.concurrent.CompletableFuture;

import org.apache.cassandra.exceptions.RequestExecutionException;

public interface KeyspaceWriteHandler
{
// mutation can be null if writeOptions.writeCommitLog is false
WriteContext beginWrite(Mutation mutation, WriteOptions writeOptions) throws RequestExecutionException;
CompletableFuture<WriteContext> beginWrite(Mutation mutation, WriteOptions writeOptions) throws RequestExecutionException;

WriteContext createContextForIndexing();
WriteContext createContextForRead();
Expand Down
12 changes: 12 additions & 0 deletions src/java/org/apache/cassandra/db/commitlog/ICommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
Expand All @@ -39,6 +40,17 @@ public interface ICommitLog

CommitLogPosition add(Mutation mutation) throws CDCWriteException;

default CompletableFuture<CommitLogPosition> addAsync(Mutation mutation) {
try
{
return CompletableFuture.completedFuture(add(mutation));
} catch (Throwable error) {
CompletableFuture<CommitLogPosition> future = new CompletableFuture<>();
future.completeExceptionally(error);
return future;
}
}

CommitLogArchiver archiver();

CommitLogPosition getCurrentPosition();
Expand Down

0 comments on commit bcf9e6d

Please sign in to comment.