diff --git a/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java b/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java index 437ded8b1da5..acc4cd14987b 100644 --- a/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java +++ b/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java @@ -20,6 +20,8 @@ import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogPosition; @@ -39,32 +41,29 @@ public CassandraKeyspaceWriteHandler(Keyspace keyspace) @Override @SuppressWarnings("resource") // group is closed when CassandraWriteContext is closed - public WriteContext beginWrite(Mutation mutation, WriteOptions writeOptions) throws RequestExecutionException + public CompletableFuture beginWrite(Mutation mutation, WriteOptions writeOptions) throws RequestExecutionException { - OpOrder.Group group = null; - try - { - group = Keyspace.writeOrder.start(); + OpOrder.Group group = Keyspace.writeOrder.start(); // write the mutation to the commitlog and memtables - CommitLogPosition position = null; + CompletableFuture position = null; if (writeOptions.shouldWriteCommitLog(mutation.getKeyspaceName())) { position = addToCommitLog(mutation); + } else { + position = CompletableFuture.completedFuture(null); } - return new CassandraWriteContext(group, position); - } - catch (Throwable t) - { - if (group != null) - { - group.close(); - } - throw t; - } + return position.handle((p, error) -> { + if (error != null) + { + group.close(); + throw new CompletionException(error); + } + return new CassandraWriteContext(group, p); + }); } - private CommitLogPosition addToCommitLog(Mutation mutation) + private CompletableFuture addToCommitLog(Mutation mutation) { CommitLogPosition position; // Usually one of these will be true, so first check if that's the case. @@ -81,7 +80,7 @@ private CommitLogPosition addToCommitLog(Mutation mutation) if (!noneSkipCommitlog) { if (allSkipCommitlog) - return null; + return CompletableFuture.completedFuture(null); else { Set ids = new HashSet<>(); @@ -97,8 +96,7 @@ private CommitLogPosition addToCommitLog(Mutation mutation) // or memoize the mutation.getTableIds()->ids map (needs invalidation on schema version change). Tracing.trace("Appending to commitlog"); - position = CommitLog.instance.add(mutation); - return position; + return CommitLog.instance.addAsync(mutation); } @SuppressWarnings("resource") // group is closed when CassandraWriteContext is closed diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 88873502036e..e546b7de44dd 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; @@ -590,7 +591,8 @@ private CompletableFuture applyInternal(final Mutation mutation, if (writeBarrier != null) return failDueToWriteBarrier(mutation, future); - Lock[] locks = null; + final Collection tableIds = mutation.getTableIds(); + final Lock[] locks = new Lock[tableIds.size()]; boolean requiresViewUpdate = writeOptions.requiresViewUpdate(viewManager, mutation); @@ -599,10 +601,9 @@ private CompletableFuture applyInternal(final Mutation mutation, mutation.viewLockAcquireStart.compareAndSet(0L, System.currentTimeMillis()); // the order of lock acquisition doesn't matter (from a deadlock perspective) because we only use tryLock() - Collection tableIds = mutation.getTableIds(); + Iterator idIterator = tableIds.iterator(); - locks = new Lock[tableIds.size()]; for (int i = 0; i < tableIds.size(); i++) { TableId tableId = idIterator.next(); @@ -683,54 +684,86 @@ else if (isDeferrable) columnFamilyStores.get(tableId).metric.viewLockAcquireTime.update(acquireTime, MILLISECONDS); } } - try (WriteContext ctx = getWriteHandler().beginWrite(mutation, writeOptions)) + CompletableFuture future2 = getWriteHandler().beginWrite(mutation, writeOptions) // this is writing to the commitlog + .whenComplete((ctx, error) -> { + try + { + if (error != null) + { + JVMStabilityInspector.inspectThrowable(error); + logger.error("Failed to write commit log for mutation", error); + if (future != null) + { + future.completeExceptionally(error); + } + return; + } + for (PartitionUpdate upd : mutation.getPartitionUpdates()) + { + ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().id); + if (cfs == null) + { + logger.error("Attempting to mutate non-existant table {} ({}.{})", upd.metadata().id, upd.metadata().keyspace, upd.metadata().name); + continue; + } + AtomicLong baseComplete = new AtomicLong(Long.MAX_VALUE); + + if (requiresViewUpdate) + { + try + { + Tracing.trace("Creating materialized view mutations from base table replica"); + viewManager.forTable(upd.metadata().id).pushViewReplicaUpdates(upd, writeOptions, baseComplete); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + logger.error(String.format("Unknown exception caught while attempting to update MaterializedView! %s", + upd.metadata().toString()), t); + throw t; + } + } + + cfs.getWriteHandler().write(upd, ctx, writeOptions.updateIndexes); + + if (requiresViewUpdate) + baseComplete.set(System.currentTimeMillis()); + } + + if (future != null) + { + future.complete(null); + } + } + catch (Throwable error2) + { + JVMStabilityInspector.inspectThrowable(error2); + logger.error("Failed to apply mutation", error2); + if (future != null) + { + future.completeExceptionally(error2); + } + } + finally + { + try + { + ctx.close(); + } finally + { + for (Lock lock : locks) + if (lock != null) + lock.unlock(); + } + } + }); + if (future == null) { - for (PartitionUpdate upd : mutation.getPartitionUpdates()) - { - ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().id); - if (cfs == null) - { - logger.error("Attempting to mutate non-existant table {} ({}.{})", upd.metadata().id, upd.metadata().keyspace, upd.metadata().name); - continue; - } - AtomicLong baseComplete = new AtomicLong(Long.MAX_VALUE); - - if (requiresViewUpdate) - { - try - { - Tracing.trace("Creating materialized view mutations from base table replica"); - viewManager.forTable(upd.metadata().id).pushViewReplicaUpdates(upd, writeOptions, baseComplete); - } - catch (Throwable t) - { - JVMStabilityInspector.inspectThrowable(t); - logger.error(String.format("Unknown exception caught while attempting to update MaterializedView! %s", - upd.metadata().toString()), t); - throw t; - } - } - - cfs.getWriteHandler().write(upd, ctx, writeOptions.updateIndexes); - - if (requiresViewUpdate) - baseComplete.set(System.currentTimeMillis()); - } - - if (future != null) { - future.complete(null); - } - return future; - } - finally - { - if (locks != null) - { - for (Lock lock : locks) - if (lock != null) - lock.unlock(); - } + // TODO rework this + future2.join(); + return null; } + return future; } private CompletableFuture failDueToWriteBarrier(Mutation mutation, CompletableFuture future) diff --git a/src/java/org/apache/cassandra/db/KeyspaceWriteHandler.java b/src/java/org/apache/cassandra/db/KeyspaceWriteHandler.java index 81205b1a866f..0327e859a198 100644 --- a/src/java/org/apache/cassandra/db/KeyspaceWriteHandler.java +++ b/src/java/org/apache/cassandra/db/KeyspaceWriteHandler.java @@ -18,12 +18,14 @@ package org.apache.cassandra.db; +import java.util.concurrent.CompletableFuture; + import org.apache.cassandra.exceptions.RequestExecutionException; public interface KeyspaceWriteHandler { // mutation can be null if writeOptions.writeCommitLog is false - WriteContext beginWrite(Mutation mutation, WriteOptions writeOptions) throws RequestExecutionException; + CompletableFuture beginWrite(Mutation mutation, WriteOptions writeOptions) throws RequestExecutionException; WriteContext createContextForIndexing(); WriteContext createContextForRead(); diff --git a/src/java/org/apache/cassandra/db/commitlog/ICommitLog.java b/src/java/org/apache/cassandra/db/commitlog/ICommitLog.java index d6bafba9ad6b..4f9697a3020a 100644 --- a/src/java/org/apache/cassandra/db/commitlog/ICommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/ICommitLog.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; @@ -39,6 +40,17 @@ public interface ICommitLog CommitLogPosition add(Mutation mutation) throws CDCWriteException; + default CompletableFuture addAsync(Mutation mutation) { + try + { + return CompletableFuture.completedFuture(add(mutation)); + } catch (Throwable error) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(error); + return future; + } + } + CommitLogArchiver archiver(); CommitLogPosition getCurrentPosition();