diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 4b3ab287bbe..fefa92e4160 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -449,9 +449,45 @@ acceptedBreaks: justification: "breaks on internal APIs only" "0.1184.0": com.palantir.atlasdb:atlasdb-api: + - code: "java.method.addedToInterface" + new: "method boolean com.palantir.atlasdb.transaction.api.Transaction::isDefinitivelyCommitted()" + justification: "adding a new method to confirm if a transaction successfully\ + \ committed from the atlas perspective" + - code: "java.method.addedToInterface" + new: "method void com.palantir.atlasdb.transaction.api.OpenTransaction::close()" + justification: "OpenTransaction has a single internal consumer, and it's current\ + \ interface caused confusion in recent refactors" - code: "java.method.addedToInterface" new: "method void com.palantir.atlasdb.transaction.api.Transaction::onCommitOrAbort(java.lang.Runnable)" justification: "similar to onSuccess, already used in other prod codepaths" + - code: "java.method.removed" + old: "method T com.palantir.atlasdb.transaction.api.OpenTransaction::finish(com.palantir.atlasdb.transaction.api.TransactionTask) throws E, com.palantir.atlasdb.transaction.api.TransactionFailedRetriableException" + justification: "OpenTransaction has a single internal consumer, and it's current\ + \ interface caused confusion in recent refactors" + - code: "java.method.removed" + old: "method T com.palantir.atlasdb.transaction.api.OpenTransaction::finishWithCallback(com.palantir.atlasdb.transaction.api.TransactionTask, java.lang.Runnable) throws E, com.palantir.atlasdb.transaction.api.TransactionFailedRetriableException" + justification: "OpenTransaction has a single internal consumer, and it's current\ + \ interface caused confusion in recent refactors" + - code: "java.method.returnTypeTypeParametersChanged" + old: "method java.util.List\ + \ com.palantir.atlasdb.transaction.api.AutoDelegate_TransactionManager::startTransactions(java.util.List)" + new: "method java.util.List\ + \ com.palantir.atlasdb.transaction.api.AutoDelegate_TransactionManager::startTransactions(java.util.List)" + justification: "OpenTransaction has a single internal consumer, and it's current\ + \ interface caused confusion in recent refactors" + - code: "java.method.returnTypeTypeParametersChanged" + old: "method java.util.List\ + \ com.palantir.atlasdb.transaction.api.TransactionManager::startTransactions(java.util.List)" + new: "method java.util.List\ + \ com.palantir.atlasdb.transaction.api.TransactionManager::startTransactions(java.util.List)" + justification: "OpenTransaction has a single internal consumer, and it's current\ + \ interface caused confusion in recent refactors" "0.770.0": com.palantir.atlasdb:atlasdb-api: - code: "java.class.removed" diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/OpenTransaction.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/OpenTransaction.java index a477f5e5a95..1dade2faf3c 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/OpenTransaction.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/OpenTransaction.java @@ -16,27 +16,14 @@ package com.palantir.atlasdb.transaction.api; -import com.palantir.atlasdb.metrics.Timed; - -public interface OpenTransaction extends Transaction { - - /** - * Runs a provided task, commits the transaction, and performs cleanup. If no further work needs to be done with the - * transaction, a no-op task can be passed in. - * - * @return value returned by the task - */ - @Timed - T finish(TransactionTask task) throws E, TransactionFailedRetriableException; +import java.io.Closeable; +public interface OpenTransaction extends Transaction, Closeable { /** - * Like {@link #finish(TransactionTask)}, except runs a callback after the task has finished. This callback will - * not run while the transaction remains in an uncommitted state, but may run afterwards, regardless of whether - * the commit was successful, finished, or aborted. - * - * If the callback is run, it will run before any {@link Transaction#onSuccess(Runnable)} callbacks. + * Aborts the transaction if uncommitted and cleanups transaction state. + * All open transactions must be closed. + * Not closing transactions after they're no longer in use may lead to arbitrary delays elsewhere in the system. */ - @Timed - T finishWithCallback(TransactionTask task, Runnable callback) - throws E, TransactionFailedRetriableException; + @Override + void close(); } diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java index 27328b58b2c..1933732849c 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java @@ -406,7 +406,7 @@ enum TransactionType { boolean isAborted(); /** - * Gets whether the transaction has not been committed. + * Gets whether the transaction has not been committed nor aborted. * * @return true if neither commit() or abort() have been called, * otherwise false @@ -414,6 +414,16 @@ enum TransactionType { @Idempotent boolean isUncommitted(); + /** + * Gets whether the transaction been committed. Note that it's possible for the underlying KVS to have committed + * the transaction but {@link #isDefinitivelyCommitted()} to return false (e.g. when the KVS call timed out + * client-side but succeeded server-side). + * + * @return true commit() has been called and did not throw, otherwise false. + */ + @Idempotent + boolean isDefinitivelyCommitted(); + /** * Gets the timestamp the current transaction is running at. */ @@ -453,6 +463,10 @@ enum TransactionType { /** * Allow consumers to register callbacks to be run on {@link #commit()} or {@link #abort()}, * after a transaction has committed or aborted. + *

+ * {@link #onCommitOrAbort(Runnable)} callbacks are added in a stack and run in opposite order they were added to + * the stack. I.e. they're FILO (first-in-last-out). + *

* {@link #onCommitOrAbort(Runnable)} callbacks run before {@link #onSuccess(Runnable)} callbacks. *

* Callbacks are usually cleanup tasks, e.g. {@link PreCommitCondition#cleanup()}. diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/TransactionManager.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/TransactionManager.java index cfdaf54caec..6f0ff589c98 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/TransactionManager.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/TransactionManager.java @@ -429,12 +429,21 @@ T runTaskWithConditionRea void registerClosingCallback(Runnable closingCallback); /** - * This method can be used for direct control over the lifecycle of a batch of transactions. For example, if the - * work done in each given transaction is interactive and cannot be expressed as a {@link TransactionTask} ahead of - * time, this method allows for a long lived transaction object. For any data read or written to the transaction to - * be valid, the transaction must be committed by calling {@link OpenTransaction#finish(TransactionTask)} to - * also perform additional cleanup. Note that this does not clean up the pre commit condition associated with that - * task. The order of transactions returned corresponds with the pre commit conditions passed in, however there are + * This method can be used for direct control over the lifecycle of a batch of transactions. + * For example, if the work done in each given transaction is interactive and cannot be expressed as a + * {@link TransactionTask} ahead of time, this method allows for a long lived transaction object. + *

+ * For any data read or written to the transaction to be valid, the transaction must be committed explicitly + * by the caller with {@link Transaction#commit()}. + *

+ * {@link PreCommitCondition#cleanup()} associated with a transaction will be run when the respective transaction + * commits or aborts. + * All pre-commit conditions are also cleaned in case {@link #startTransactions(List)} throws. + *

+ * Note the caller must call {@link OpenTransaction#close()} after the transaction is committed to perform + * additional cleanup. Failure to do so might incur in + *

+ * The order of transactions returned corresponds with the pre commit conditions passed in, however there are * no guarantees on the ordering of transactions returned with respect to their start timestamp. * * @return a batch of transactions with associated immutable timestamp locks @@ -442,7 +451,7 @@ T runTaskWithConditionRea */ @Deprecated @Timed - List startTransactions(List condition); + List startTransactions(List condition); /** * Frees resources used by this TransactionManager, and invokes any callbacks registered to run on close. diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/ForwardingTransaction.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/ForwardingTransaction.java index 63cf711ba58..621cf8fdc54 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/ForwardingTransaction.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/ForwardingTransaction.java @@ -179,6 +179,11 @@ public boolean isUncommitted() { return delegate().isUncommitted(); } + @Override + public boolean isDefinitivelyCommitted() { + return delegate().isDefinitivelyCommitted(); + } + @Override public long getTimestamp() { return delegate().getTimestamp(); diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java index 0fab5ee5360..8f71e3943ec 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java @@ -1831,6 +1831,11 @@ public boolean isUncommitted() { return state.get() == State.UNCOMMITTED; } + @Override + public boolean isDefinitivelyCommitted() { + return state.get() == State.COMMITTED; + } + private void ensureUncommitted() { if (!isUncommitted()) { throw new CommittedTransactionException(); @@ -1848,16 +1853,6 @@ private boolean isStillRunning() { return stateNow == State.UNCOMMITTED || stateNow == State.COMMITTING; } - /** - * Returns true iff the transaction is known to have successfully committed. - *

- * Be careful when using this method! A transaction that the client thinks has failed could actually have - * committed as far as the key-value service is concerned. - */ - private boolean isDefinitivelyCommitted() { - return state.get() == State.COMMITTED; - } - /////////////////////////////////////////////////////////////////////////// /// Committing /////////////////////////////////////////////////////////////////////////// diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java index 1d58f12d1f4..de760d1113f 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java @@ -22,6 +22,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Streams; +import com.google.common.io.Closer; import com.palantir.atlasdb.cache.TimestampCache; import com.palantir.atlasdb.cell.api.DataKeyValueServiceManager; import com.palantir.atlasdb.cell.api.DdlManager; @@ -72,6 +73,7 @@ import com.palantir.timestamp.TimestampManagementService; import com.palantir.timestamp.TimestampService; import com.palantir.util.SafeShutdownRunner; +import java.io.IOException; import java.time.Duration; import java.util.List; import java.util.Optional; @@ -190,20 +192,30 @@ protected boolean shouldStopRetrying(int numTimesFailed) { public T runTaskWithConditionThrowOnConflict( C condition, ConditionAwareTransactionTask task) throws E, TransactionFailedRetriableException { checkOpen(); - OpenTransaction openTransaction; + try (OpenTransactionImpl openTransaction = + runTimed(() -> Iterables.getOnlyElement(startTransactions(ImmutableList.of(condition))), "setupTask")) { + return openTransaction.execute(txn -> task.execute(txn, condition)); + } + } + + @Override + public List startTransactions(List conditions) { try { - openTransaction = runTimed( - () -> Iterables.getOnlyElement(startTransactions(ImmutableList.of(condition))), "setupTask"); + return startTransactionsInternal(conditions); } catch (Exception e) { - condition.cleanup(); + Closer closer = Closer.create(); // N.B. using closer to run all cleanup tasks even if one cleanup throws + conditions.forEach(condition -> closer.register(condition::cleanup)); + try { + closer.close(); + } catch (IOException ex) { + e.addSuppressed(ex); + log.info("Failed to cleanup pre-commit conditions on startTransaction failure", ex); + } throw e; } - return openTransaction.finishWithCallback( - transaction -> task.execute(transaction, condition), condition::cleanup); } - @Override - public List startTransactions(List conditions) { + private List startTransactionsInternal(List conditions) { if (conditions.isEmpty()) { return ImmutableList.of(); } @@ -224,7 +236,7 @@ public List startTransactions(List transactions = Streams.zip( + List transactions = Streams.zip( responses.stream(), conditions.stream(), (response, condition) -> { LockToken immutableTsLock = response.immutableTimestamp().getLock(); @@ -232,27 +244,55 @@ public List startTransactions(List lockWatchManager.onTransactionCommit(transaction.getTimestamp())); + + transaction.onCommitOrAbort(transaction::reportExpectationsCollectedData); + transaction.onCommitOrAbort(condition::cleanup); + transaction.onCommitOrAbort( + () -> lockWatchManager.requestTransactionStateRemovalFromCache( + response.startTimestampAndPartition() + .timestamp())); + // N.B. register `onTransactionCommit` after `requestTransactionStateRemovalFromCache` + // `onCommitOrAbort` is FILO, and we need to run `onTransactionCommit` before + // `requestTransactionStateRemovalFromCache` + transaction.onCommitOrAbort(() -> { + if (transaction.isDefinitivelyCommitted()) { + lockWatchManager.onTransactionCommit(transaction.getTimestamp()); + } + }); return new OpenTransactionImpl(transaction, immutableTsLock); }) .collect(Collectors.toList()); openTransactionCounter.inc(transactions.size()); return transactions; } catch (Throwable t) { - responses.forEach(response -> lockWatchManager.requestTransactionStateRemovalFromCache( - response.startTimestampAndPartition().timestamp())); - timelockService.tryUnlock(responses.stream() + // In case of failure, we need to remove the transaction from our local lock-watch cache and unlocking + // immutable timestamp. + // Note that in case we don't throw, the immutable timestamp is registered to be unlocked in the + // SnapshotTransaction constructor. But in case of failure, we need to manually unlock it here. + + Closer closer = Closer.create(); // N.B. using closer to run all cleanup tasks even if one cleanup throws + responses.forEach( + response -> closer.register(() -> lockWatchManager.requestTransactionStateRemovalFromCache( + response.startTimestampAndPartition().timestamp()))); + closer.register(() -> timelockService.tryUnlock(responses.stream() .map(response -> response.immutableTimestamp().getLock()) - .collect(Collectors.toSet())); + .collect(Collectors.toSet()))); + + try { + closer.close(); + } catch (IOException e) { + t.addSuppressed(e); + log.info("Failed to cleanup startTransaction resources on startTransaction failure", t); + } throw Throwables.rewrapAndThrowUncheckedException(t); } } - private final class OpenTransactionImpl extends ForwardingTransaction implements OpenTransaction { + final class OpenTransactionImpl extends ForwardingTransaction implements OpenTransaction { private final ExpectationsAwareTransaction delegate; private final LockToken immutableTsLock; + private boolean hasClosed = false; private OpenTransactionImpl(ExpectationsAwareTransaction delegate, LockToken immutableTsLock) { this.delegate = delegate; @@ -265,33 +305,29 @@ public ExpectationsAwareTransaction delegate() { } @Override - public T finish(TransactionTask task) - throws E, TransactionFailedRetriableException { - return finishWithCallback(task, () -> {}); - } - - @Override - public T finishWithCallback(TransactionTask task, Runnable callback) - throws E, TransactionFailedRetriableException { - Timer postTaskTimer = getTimer("finishTask"); - Timer.Context postTaskContext; - - TransactionTask wrappedTask = wrapTaskIfNecessary(task, immutableTsLock); + public synchronized void close() { + if (hasClosed) { + // Some operations in close are not idempotent, e.g. openTransactionCounter.dec(). + // Let's guarantee we run close() only once, and no-op in following runs to respect Closeable interface. + return; + } ExpectationsAwareTransaction txn = delegate; - T result; try { - txn.onCommitOrAbort(txn::reportExpectationsCollectedData); - txn.onCommitOrAbort(callback); - result = runTaskThrowOnConflict(wrappedTask, txn); + if (txn.isUncommitted()) { + txn.abort(); + } } finally { - lockWatchManager.requestTransactionStateRemovalFromCache(getTimestamp()); - postTaskContext = postTaskTimer.time(); openTransactionCounter.dec(); } scrubForAggressiveHardDelete(extractSnapshotTransaction(txn)); - postTaskContext.stop(); - return result; + + hasClosed = true; + } + + T execute(TransactionTask task) { + TransactionTask wrappedTask = wrapTaskIfNecessary(task, immutableTsLock); + return runTimed(() -> runTaskThrowOnConflict(wrappedTask, delegate), "runTaskThrowOnConflict"); } } diff --git a/atlasdb-service/src/main/java/com/palantir/atlasdb/impl/AtlasDbServiceImpl.java b/atlasdb-service/src/main/java/com/palantir/atlasdb/impl/AtlasDbServiceImpl.java index 69027ac0c58..81266350361 100644 --- a/atlasdb-service/src/main/java/com/palantir/atlasdb/impl/AtlasDbServiceImpl.java +++ b/atlasdb-service/src/main/java/com/palantir/atlasdb/impl/AtlasDbServiceImpl.java @@ -192,7 +192,9 @@ public TransactionToken startTransaction() { public void commit(TransactionToken token) { OpenTransaction openTxn = transactions.getIfPresent(token); if (openTxn != null) { - openTxn.finish((TxTask) transaction -> null); + try (openTxn) { + openTxn.commit(); + } transactions.invalidate(token); } } @@ -201,10 +203,9 @@ public void commit(TransactionToken token) { public void abort(TransactionToken token) { OpenTransaction openTxn = transactions.getIfPresent(token); if (openTxn != null) { - openTxn.finish((TxTask) transaction -> { - transaction.abort(); - return null; - }); + try (openTxn) { + openTxn.abort(); + } transactions.invalidate(token); } } diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManagerTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManagerTest.java index ac0ef3a0193..d1991c9e41e 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManagerTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManagerTest.java @@ -73,8 +73,8 @@ public class SnapshotTransactionManagerTest { private static final String SETUP_TASK_METRIC_NAME = SnapshotTransactionManager.class.getCanonicalName() + ".setupTask"; - private static final String FINISH_TASK_METRIC_NAME = - SnapshotTransactionManager.class.getCanonicalName() + ".finishTask"; + private static final String RUN_TASK_METRIC_NAME = + SnapshotTransactionManager.class.getCanonicalName() + ".runTaskThrowOnConflict"; private final CloseableLockService closeableLockService = mock(CloseableLockService.class); private final Cleaner cleaner = mock(Cleaner.class); @@ -250,14 +250,14 @@ public void registersMetrics() throws InterruptedException { TaggedMetricRegistry registry = snapshotTransactionManager.metricsManager.getTaggedRegistry(); assertThat(registry.getMetrics().keySet().stream().map(MetricName::safeName)) .contains(SETUP_TASK_METRIC_NAME) - .contains(FINISH_TASK_METRIC_NAME); + .contains(RUN_TASK_METRIC_NAME); assertThat(registry.timer(MetricName.builder() .safeName(SETUP_TASK_METRIC_NAME) .build()) .getCount()) .isGreaterThanOrEqualTo(1); assertThat(registry.timer(MetricName.builder() - .safeName(FINISH_TASK_METRIC_NAME) + .safeName(RUN_TASK_METRIC_NAME) .build()) .getCount()) .isGreaterThanOrEqualTo(1); @@ -290,7 +290,7 @@ public void doesNotCallStartTransactionForReadOnlyTransactionsIfFlagIsNotSet() { public void startEmptyBatchOfTransactionsDoesNotCallTimelockService() { TimelockService timelockService = spy(inMemoryTimelockClassExtension.getLegacyTimelockService()); SnapshotTransactionManager transactionManager = createSnapshotTransactionManager(timelockService, false); - List transactions = transactionManager.startTransactions(ImmutableList.of()); + List transactions = transactionManager.startTransactions(ImmutableList.of()); assertThat(transactions).isEmpty(); verify(timelockService, never()).startIdentifiedAtlasDbTransactionBatch(anyInt()); diff --git a/changelog/@unreleased/pr-7411.v2.yml b/changelog/@unreleased/pr-7411.v2.yml new file mode 100644 index 00000000000..12169c2a353 --- /dev/null +++ b/changelog/@unreleased/pr-7411.v2.yml @@ -0,0 +1,9 @@ +type: improvement +improvement: + description: |- + Refactor `OpenTransaction` interface. + + We now put the burden of committing or aborting the transaction explicitly in the hands of the consumer, and modify the contract to require only the consumer to call `close()`. + We then abort the transaction on `close()` if uncommitted. + links: + - https://github.com/palantir/atlasdb/pull/7411 diff --git a/timelock-server/src/integTest/java/com/palantir/atlasdb/timelock/LockWatchEventIntegrationTest.java b/timelock-server/src/integTest/java/com/palantir/atlasdb/timelock/LockWatchEventIntegrationTest.java index 52fd94111d7..f83ef8764dd 100644 --- a/timelock-server/src/integTest/java/com/palantir/atlasdb/timelock/LockWatchEventIntegrationTest.java +++ b/timelock-server/src/integTest/java/com/palantir/atlasdb/timelock/LockWatchEventIntegrationTest.java @@ -189,8 +189,8 @@ public void multipleTransactionVersionsReturnsSnapshotAndOnlyRelevantRecentEvent assertThat(unlockedDescriptors(update.events())).containsExactlyInAnyOrderElementsOf(getDescriptors(CELL_3)); assertThat(watchDescriptors(update.events())).isEmpty(); - secondTxn.finish(_unused -> null); - fifthTxn.finish(_unused -> null); + secondTxn.close(); + fifthTxn.close(); cleanup.run(); }