Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Refactor OpenTransaction interface #7411

Open
wants to merge 20 commits into
base: develop
Choose a base branch
from
32 changes: 32 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,41 @@ acceptedBreaks:
justification: "breaks on internal APIs only"
"0.1184.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.method.addedToInterface"
new: "method void com.palantir.atlasdb.transaction.api.OpenTransaction::close()"
justification: "OpenTransaction has a single internal consumer, and it's current\
\ interface caused confusion in recent refactors"
- code: "java.method.addedToInterface"
new: "method void com.palantir.atlasdb.transaction.api.Transaction::onCommitOrAbort(java.lang.Runnable)"
justification: "similar to onSuccess, already used in other prod codepaths"
- code: "java.method.removed"
old: "method <T, E extends java.lang.Exception> T com.palantir.atlasdb.transaction.api.OpenTransaction::finish(com.palantir.atlasdb.transaction.api.TransactionTask<T,\
\ E>) throws E, com.palantir.atlasdb.transaction.api.TransactionFailedRetriableException"
justification: "OpenTransaction has a single internal consumer, and it's current\
\ interface caused confusion in recent refactors"
- code: "java.method.removed"
old: "method <T, E extends java.lang.Exception> T com.palantir.atlasdb.transaction.api.OpenTransaction::finishWithCallback(com.palantir.atlasdb.transaction.api.TransactionTask<T,\
\ E>, java.lang.Runnable) throws E, com.palantir.atlasdb.transaction.api.TransactionFailedRetriableException"
justification: "OpenTransaction has a single internal consumer, and it's current\
\ interface caused confusion in recent refactors"
- code: "java.method.returnTypeTypeParametersChanged"
old: "method java.util.List<com.palantir.atlasdb.transaction.api.OpenTransaction>\
\ com.palantir.atlasdb.transaction.api.AutoDelegate_TransactionManager::startTransactions(java.util.List<?\
\ extends com.palantir.atlasdb.transaction.api.PreCommitCondition>)"
new: "method java.util.List<? extends com.palantir.atlasdb.transaction.api.OpenTransaction>\
\ com.palantir.atlasdb.transaction.api.AutoDelegate_TransactionManager::startTransactions(java.util.List<?\
\ extends com.palantir.atlasdb.transaction.api.PreCommitCondition>)"
justification: "OpenTransaction has a single internal consumer, and it's current\
\ interface caused confusion in recent refactors"
- code: "java.method.returnTypeTypeParametersChanged"
old: "method java.util.List<com.palantir.atlasdb.transaction.api.OpenTransaction>\
\ com.palantir.atlasdb.transaction.api.TransactionManager::startTransactions(java.util.List<?\
\ extends com.palantir.atlasdb.transaction.api.PreCommitCondition>)"
new: "method java.util.List<? extends com.palantir.atlasdb.transaction.api.OpenTransaction>\
\ com.palantir.atlasdb.transaction.api.TransactionManager::startTransactions(java.util.List<?\
\ extends com.palantir.atlasdb.transaction.api.PreCommitCondition>)"
justification: "OpenTransaction has a single internal consumer, and it's current\
\ interface caused confusion in recent refactors"
"0.770.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.class.removed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,14 @@

package com.palantir.atlasdb.transaction.api;

import com.palantir.atlasdb.metrics.Timed;

public interface OpenTransaction extends Transaction {

/**
* Runs a provided task, commits the transaction, and performs cleanup. If no further work needs to be done with the
* transaction, a no-op task can be passed in.
*
* @return value returned by the task
*/
@Timed
<T, E extends Exception> T finish(TransactionTask<T, E> task) throws E, TransactionFailedRetriableException;
import java.io.Closeable;

public interface OpenTransaction extends Transaction, Closeable {
/**
* Like {@link #finish(TransactionTask)}, except runs a callback after the task has finished. This callback will
* not run while the transaction remains in an uncommitted state, but may run afterwards, regardless of whether
* the commit was successful, finished, or aborted.
*
* If the callback is run, it will run before any {@link Transaction#onSuccess(Runnable)} callbacks.
* Aborts the transaction if uncommitted and cleanups transaction state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"and cleans up"?

* All open transactions <b>must be closed</b>.
jkozlowski marked this conversation as resolved.
Show resolved Hide resolved
* Not closing transactions after they're no longer in use may lead to arbitrary delays elsewhere in the system.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"may lead to memory leaks and arbitrary delays..."

*/
@Timed
<T, E extends Exception> T finishWithCallback(TransactionTask<T, E> task, Runnable callback)
throws E, TransactionFailedRetriableException;
@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,10 @@ enum TransactionType {
/**
* Allow consumers to register callbacks to be run on {@link #commit()} or {@link #abort()},
* after a transaction has committed or aborted.
* <p>
* {@link #onCommitOrAbort(Runnable)} callbacks are added in a stack and run in opposite order they were added to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You already say this later?

Callbacks are run in the reverse order they were added - i.e. the last added callback will be run first.

* the stack. I.e. they're FILO (first-in-last-out).
* <p>
* {@link #onCommitOrAbort(Runnable)} callbacks run before {@link #onSuccess(Runnable)} callbacks.
* <p>
* Callbacks are usually cleanup tasks, e.g. {@link PreCommitCondition#cleanup()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,20 +429,29 @@ <T, C extends PreCommitCondition, E extends Exception> T runTaskWithConditionRea
void registerClosingCallback(Runnable closingCallback);

/**
* This method can be used for direct control over the lifecycle of a batch of transactions. For example, if the
* work done in each given transaction is interactive and cannot be expressed as a {@link TransactionTask} ahead of
* time, this method allows for a long lived transaction object. For any data read or written to the transaction to
* be valid, the transaction must be committed by calling {@link OpenTransaction#finish(TransactionTask)} to
* also perform additional cleanup. Note that this does not clean up the pre commit condition associated with that
* task. The order of transactions returned corresponds with the pre commit conditions passed in, however there are
* This method can be used for direct control over the lifecycle of a batch of transactions.
* For example, if the work done in each given transaction is interactive and cannot be expressed as a
* {@link TransactionTask} ahead of time, this method allows for a long lived transaction object.
* <p>
* For any data read or written to the transaction to be valid, the transaction must be committed explicitly
* by the caller with {@link Transaction#commit()}.
* <p>
* {@link PreCommitCondition#cleanup()} associated with a transaction will be run when the respective transaction
* commits or aborts.
* All pre-commit conditions are also cleaned in case {@link #startTransactions(List)} throws.
* <p>
* Note the caller must call {@link OpenTransaction#close()} after the transaction is committed to perform
* additional cleanup. Failure to do so might incur in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you didn't finish a sentence here?

* <p>
* The order of transactions returned corresponds with the pre commit conditions passed in, however there are
* no guarantees on the ordering of transactions returned with respect to their start timestamp.
*
* @return a batch of transactions with associated immutable timestamp locks
* @deprecated Similar functionality will exist, but this method is likely to change in the future
*/
@Deprecated
@Timed
List<OpenTransaction> startTransactions(List<? extends PreCommitCondition> condition);
List<? extends OpenTransaction> startTransactions(List<? extends PreCommitCondition> condition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my edification: what do you get out of doing "? extends ", other than the callers now having to do the same thing?


/**
* Frees resources used by this TransactionManager, and invokes any callbacks registered to run on close.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import com.google.common.io.Closer;
import com.palantir.atlasdb.cache.TimestampCache;
import com.palantir.atlasdb.cell.api.DataKeyValueServiceManager;
import com.palantir.atlasdb.cell.api.DdlManager;
Expand Down Expand Up @@ -72,6 +73,7 @@
import com.palantir.timestamp.TimestampManagementService;
import com.palantir.timestamp.TimestampService;
import com.palantir.util.SafeShutdownRunner;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -190,20 +192,30 @@ protected boolean shouldStopRetrying(int numTimesFailed) {
public <T, C extends PreCommitCondition, E extends Exception> T runTaskWithConditionThrowOnConflict(
C condition, ConditionAwareTransactionTask<T, C, E> task) throws E, TransactionFailedRetriableException {
checkOpen();
OpenTransaction openTransaction;
try (OpenTransactionImpl openTransaction =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SO the way this code works is:

  • if you throw as part of startTransactions, the PreCommitCondition will be cleaned up immediately there. Otherwise, the ownership of those moves to OpenTransaction and is therefore cleaned up as part of it's #close implementation.

Meaning, we improve the flow here, without breaking any functionality.

runTimed(() -> Iterables.getOnlyElement(startTransactions(ImmutableList.of(condition))), "setupTask")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For interest, it was very funny to have a look at the implementation of List.of in the JDK, and weirdly, it has List12, which is kinda bizzareo. Guava defo does better here

return openTransaction.execute(txn -> task.execute(txn, condition));
}
}

@Override
public List<OpenTransactionImpl> startTransactions(List<? extends PreCommitCondition> conditions) {
try {
openTransaction = runTimed(
() -> Iterables.getOnlyElement(startTransactions(ImmutableList.of(condition))), "setupTask");
return startTransactionsInternal(conditions);
} catch (Exception e) {
condition.cleanup();
Closer closer = Closer.create(); // N.B. using closer to run all cleanup tasks even if one cleanup throws
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yaaas, much better

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, I just started reading startTransactionsInternal, and my immediate thought was "shit we forgot to cleanup conditions when that throws exceptions...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if instead of this split thing, we try something like this:

  • StartTransactionsInternal at the beginning creates a Closer onExceptionCloser = Closer.create();

As you acquire resources, you register things with it, so you'd immediately accumulate all of the conditions there.

Then in #startTransactionsImpl, IFF you end up with an exception, you execute the closer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, this is SUCH critical code, that from a reader's perspective, EVERYTIME I see resource acquisition, on the next line I want it being registered with SOMETHING that's responsible for cleaning it up in case of exceptions.

conditions.forEach(condition -> closer.register(condition::cleanup));
try {
closer.close();
} catch (IOException ex) {
e.addSuppressed(ex);
fsamuel-bs marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we instead use your ThreadSafeCloser and add a method #closeAfterException(Throwable) or something that will do the logging and #addSuppressed?

I'm also honestly not that fussed with #addSuppressed, it feels like noise. And specifically, in the internal skiing project, might lead to giant stacktraces (and just lots of logging?), because we use pretty large batches for #startTransactions?

How about we log the individual failures somehow as separate loglines

log.info("Failed to cleanup pre-commit conditions on startTransaction failure", ex);
}
throw e;
}
return openTransaction.finishWithCallback(
transaction -> task.execute(transaction, condition), condition::cleanup);
}

@Override
public List<OpenTransaction> startTransactions(List<? extends PreCommitCondition> conditions) {
private List<OpenTransactionImpl> startTransactionsInternal(List<? extends PreCommitCondition> conditions) {
if (conditions.isEmpty()) {
return ImmutableList.of();
}
Expand All @@ -224,35 +236,63 @@ public List<OpenTransaction> startTransactions(List<? extends PreCommitCondition
recordImmutableTimestamp(immutableTs);
cleaner.punch(responses.get(0).startTimestampAndPartition().timestamp());

List<OpenTransaction> transactions = Streams.zip(
List<OpenTransactionImpl> transactions = Streams.zip(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, something doesn't add up here if you're having to make this available and return the impl

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about, instead of having the #execute method on OpenTransactionImpl, just inline it here? Like you're really not gaining anything from this, but now EVERYTHING is polluted with these weird generics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if there's a deeper reason

responses.stream(), conditions.stream(), (response, condition) -> {
LockToken immutableTsLock =
response.immutableTimestamp().getLock();
LongSupplier startTimestampSupplier = response.startTimestampAndPartition()::timestamp;

ExpectationsAwareTransaction transaction = createTransaction(
immutableTs, startTimestampSupplier, immutableTsLock, condition);
transaction.onSuccess(
() -> lockWatchManager.onTransactionCommit(transaction.getTimestamp()));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, deep breaths before reviewing this...

transaction.onCommitOrAbort(transaction::reportExpectationsCollectedData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one: should this just be an internal transaction thing in SnapshotTransaction? Why is this even registered here, as opposed to much deeper?

This definitely feels silly, it shouldn't even be on the interface.

transaction.onCommitOrAbort(condition::cleanup);
transaction.onCommitOrAbort(
() -> lockWatchManager.requestTransactionStateRemovalFromCache(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative to this VERY CAREFUL ORDERING, is to actually delegate this to LockWatchManager honestly? It feels wrong that such CRITICAL piece of functionality has high code-distance from LockWatchManager itself. This is probably a pre-req for this PR merging?

I'd support something like maybe:

  • StartIdentifiedAtlasDbTransactionResponse has some sort of Consumer, to which you push the outcome of the transaction and it runs the lockWatchManager code.

response.startTimestampAndPartition()
.timestamp()));
// N.B. run this before clearing the state from the cache with
// requestTransactionStateRemovalFromCache
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was quite nuanced:

  • lockWatchManager.onTransactionCommit needs to run before lockWatchManager.requestTransactionStateRemovalFromCache
  • onSuccess ran before the finally block before, and therefore it worked
  • now we need to add to onCommitOrAbort in the right order, otherwise tests failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also thought of writing

                                transaction.onCommitOrAbort(() -> {
                                    if (!transaction.isAborted()) {
                                        // in onCommitOrAbort + not aborted == committed
                                        lockWatchManager.onTransactionCommit(transaction.getTimestamp());
                                    }
                                    lockWatchManager.requestTransactionStateRemovalFromCache(....)
                                });

but decided not do this because I like guaranteeing both are run in case one fails.

Copy link
Contributor Author

@fsamuel-bs fsamuel-bs Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also thought of adding txn.isCommitted() as a method, but not doing that for now - could pick do in a flup pr if we decide to go down that route.

Edit: decided to go down this route and added txn.isDefinitelyCommitted()

transaction.onCommitOrAbort(() -> {
if (!transaction.isAborted()) {
// in onCommitOrAbort + not aborted == committed
lockWatchManager.onTransactionCommit(transaction.getTimestamp());
}
});
return new OpenTransactionImpl(transaction, immutableTsLock);
})
.collect(Collectors.toList());
openTransactionCounter.inc(transactions.size());
return transactions;
} catch (Throwable t) {
responses.forEach(response -> lockWatchManager.requestTransactionStateRemovalFromCache(
response.startTimestampAndPartition().timestamp()));
timelockService.tryUnlock(responses.stream()
// In case of failure, we need to remove the transaction from our local lock-watch cache and unlocking
// immutable timestamp.
// Note that in case we don't throw, the immutable timestamp is registered to be unlocked in the
// SnapshotTransaction constructor. But in case of failure, we need to manually unlock it here.

Closer closer = Closer.create(); // N.B. using closer to run all cleanup tasks even if one cleanup throws
responses.forEach(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewers: Closer goes in stack order, so we're technically changing the order here. It shouldn't matter, these are unrelated.

response -> closer.register(() -> lockWatchManager.requestTransactionStateRemovalFromCache(
response.startTimestampAndPartition().timestamp())));
closer.register(() -> timelockService.tryUnlock(responses.stream()
.map(response -> response.immutableTimestamp().getLock())
.collect(Collectors.toSet()));
.collect(Collectors.toSet())));

try {
closer.close();
} catch (IOException e) {
t.addSuppressed(e);
fsamuel-bs marked this conversation as resolved.
Show resolved Hide resolved
log.info("Failed to cleanup startTransaction resources on startTransaction failure", t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're doing this again which makes me think my original comments are valuable.

}
throw Throwables.rewrapAndThrowUncheckedException(t);
}
}

private final class OpenTransactionImpl extends ForwardingTransaction implements OpenTransaction {
final class OpenTransactionImpl extends ForwardingTransaction implements OpenTransaction {

private final ExpectationsAwareTransaction delegate;
private final LockToken immutableTsLock;
private boolean hasClosed = false;

private OpenTransactionImpl(ExpectationsAwareTransaction delegate, LockToken immutableTsLock) {
this.delegate = delegate;
Expand All @@ -265,33 +305,29 @@ public ExpectationsAwareTransaction delegate() {
}

@Override
public <T, E extends Exception> T finish(TransactionTask<T, E> task)
throws E, TransactionFailedRetriableException {
return finishWithCallback(task, () -> {});
}

@Override
public <T, E extends Exception> T finishWithCallback(TransactionTask<T, E> task, Runnable callback)
throws E, TransactionFailedRetriableException {
Timer postTaskTimer = getTimer("finishTask");
Timer.Context postTaskContext;

TransactionTask<T, E> wrappedTask = wrapTaskIfNecessary(task, immutableTsLock);
public synchronized void close() {
if (hasClosed) {
// Some operations in close are not idempotent, e.g. openTransactionCounter.dec().
jkozlowski marked this conversation as resolved.
Show resolved Hide resolved
// Let's guarantee we run close() only once, and no-op in following runs to respect Closeable interface.
return;
}

ExpectationsAwareTransaction txn = delegate;
T result;
try {
txn.onCommitOrAbort(txn::reportExpectationsCollectedData);
txn.onCommitOrAbort(callback);
result = runTaskThrowOnConflict(wrappedTask, txn);
if (txn.isUncommitted()) {
txn.abort();
}
} finally {
lockWatchManager.requestTransactionStateRemovalFromCache(getTimestamp());
postTaskContext = postTaskTimer.time();
openTransactionCounter.dec();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if this was ALSO moved somewhere higher? to somewhere around the place where we increment this value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be the most correct thing

}
scrubForAggressiveHardDelete(extractSnapshotTransaction(txn));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, should ScrubForAggressiveHardDelete happen only on the happy path, or always, regardless of exceptions?

postTaskContext.stop();
return result;

hasClosed = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so IF you're making sure this is idempotent and e.g. openTransactionCounter.dec is not called multiple times and that's your intent, let's stick this in the finally?

}

<T, E extends Exception> T execute(TransactionTask<T, E> task) {
TransactionTask<T, E> wrappedTask = wrapTaskIfNecessary(task, immutableTsLock);
return runTimed(() -> runTaskThrowOnConflict(wrappedTask, delegate), "runTaskThrowOnConflict");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ public TransactionToken startTransaction() {
public void commit(TransactionToken token) {
OpenTransaction openTxn = transactions.getIfPresent(token);
if (openTxn != null) {
openTxn.finish((TxTask) transaction -> null);
try (openTxn) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you strictly need the try-with-resources? It doesn't feel like you would?

openTxn.commit();
}
transactions.invalidate(token);
}
}
Expand All @@ -201,10 +203,9 @@ public void commit(TransactionToken token) {
public void abort(TransactionToken token) {
OpenTransaction openTxn = transactions.getIfPresent(token);
if (openTxn != null) {
openTxn.finish((TxTask) transaction -> {
transaction.abort();
return null;
});
try (openTxn) {
openTxn.abort();
}
transactions.invalidate(token);
}
}
Expand Down
Loading