-
Notifications
You must be signed in to change notification settings - Fork 15
Refactor OpenTransaction
interface
#7411
base: develop
Are you sure you want to change the base?
Changes from 16 commits
2fd9a42
8b063af
db07e66
9be7eff
85f78cb
f306d38
3160aba
187843a
6406cc8
498a416
2200b2f
0e2508f
5561a1a
22fafcb
9f73895
21006b7
c34afc1
8d5df25
d8f3276
648461b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,27 +16,14 @@ | |
|
||
package com.palantir.atlasdb.transaction.api; | ||
|
||
import com.palantir.atlasdb.metrics.Timed; | ||
|
||
public interface OpenTransaction extends Transaction { | ||
|
||
/** | ||
* Runs a provided task, commits the transaction, and performs cleanup. If no further work needs to be done with the | ||
* transaction, a no-op task can be passed in. | ||
* | ||
* @return value returned by the task | ||
*/ | ||
@Timed | ||
<T, E extends Exception> T finish(TransactionTask<T, E> task) throws E, TransactionFailedRetriableException; | ||
import java.io.Closeable; | ||
|
||
public interface OpenTransaction extends Transaction, Closeable { | ||
/** | ||
* Like {@link #finish(TransactionTask)}, except runs a callback after the task has finished. This callback will | ||
* not run while the transaction remains in an uncommitted state, but may run afterwards, regardless of whether | ||
* the commit was successful, finished, or aborted. | ||
* | ||
* If the callback is run, it will run before any {@link Transaction#onSuccess(Runnable)} callbacks. | ||
* Aborts the transaction if uncommitted and cleanups transaction state. | ||
* All open transactions <b>must be closed</b>. | ||
jkozlowski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* Not closing transactions after they're no longer in use may lead to arbitrary delays elsewhere in the system. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "may lead to memory leaks and arbitrary delays..." |
||
*/ | ||
@Timed | ||
<T, E extends Exception> T finishWithCallback(TransactionTask<T, E> task, Runnable callback) | ||
throws E, TransactionFailedRetriableException; | ||
@Override | ||
void close(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -453,6 +453,10 @@ enum TransactionType { | |
/** | ||
* Allow consumers to register callbacks to be run on {@link #commit()} or {@link #abort()}, | ||
* after a transaction has committed or aborted. | ||
* <p> | ||
* {@link #onCommitOrAbort(Runnable)} callbacks are added in a stack and run in opposite order they were added to | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You already say this later?
|
||
* the stack. I.e. they're FILO (first-in-last-out). | ||
* <p> | ||
* {@link #onCommitOrAbort(Runnable)} callbacks run before {@link #onSuccess(Runnable)} callbacks. | ||
* <p> | ||
* Callbacks are usually cleanup tasks, e.g. {@link PreCommitCondition#cleanup()}. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -429,20 +429,29 @@ <T, C extends PreCommitCondition, E extends Exception> T runTaskWithConditionRea | |
void registerClosingCallback(Runnable closingCallback); | ||
|
||
/** | ||
* This method can be used for direct control over the lifecycle of a batch of transactions. For example, if the | ||
* work done in each given transaction is interactive and cannot be expressed as a {@link TransactionTask} ahead of | ||
* time, this method allows for a long lived transaction object. For any data read or written to the transaction to | ||
* be valid, the transaction must be committed by calling {@link OpenTransaction#finish(TransactionTask)} to | ||
* also perform additional cleanup. Note that this does not clean up the pre commit condition associated with that | ||
* task. The order of transactions returned corresponds with the pre commit conditions passed in, however there are | ||
* This method can be used for direct control over the lifecycle of a batch of transactions. | ||
* For example, if the work done in each given transaction is interactive and cannot be expressed as a | ||
* {@link TransactionTask} ahead of time, this method allows for a long lived transaction object. | ||
* <p> | ||
* For any data read or written to the transaction to be valid, the transaction must be committed explicitly | ||
* by the caller with {@link Transaction#commit()}. | ||
* <p> | ||
* {@link PreCommitCondition#cleanup()} associated with a transaction will be run when the respective transaction | ||
* commits or aborts. | ||
* All pre-commit conditions are also cleaned in case {@link #startTransactions(List)} throws. | ||
* <p> | ||
* Note the caller must call {@link OpenTransaction#close()} after the transaction is committed to perform | ||
* additional cleanup. Failure to do so might incur in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you didn't finish a sentence here? |
||
* <p> | ||
* The order of transactions returned corresponds with the pre commit conditions passed in, however there are | ||
* no guarantees on the ordering of transactions returned with respect to their start timestamp. | ||
* | ||
* @return a batch of transactions with associated immutable timestamp locks | ||
* @deprecated Similar functionality will exist, but this method is likely to change in the future | ||
*/ | ||
@Deprecated | ||
@Timed | ||
List<OpenTransaction> startTransactions(List<? extends PreCommitCondition> condition); | ||
List<? extends OpenTransaction> startTransactions(List<? extends PreCommitCondition> condition); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For my edification: what do you get out of doing "? extends ", other than the callers now having to do the same thing? |
||
|
||
/** | ||
* Frees resources used by this TransactionManager, and invokes any callbacks registered to run on close. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
import com.google.common.collect.Iterables; | ||
import com.google.common.collect.Lists; | ||
import com.google.common.collect.Streams; | ||
import com.google.common.io.Closer; | ||
import com.palantir.atlasdb.cache.TimestampCache; | ||
import com.palantir.atlasdb.cell.api.DataKeyValueServiceManager; | ||
import com.palantir.atlasdb.cell.api.DdlManager; | ||
|
@@ -72,6 +73,7 @@ | |
import com.palantir.timestamp.TimestampManagementService; | ||
import com.palantir.timestamp.TimestampService; | ||
import com.palantir.util.SafeShutdownRunner; | ||
import java.io.IOException; | ||
import java.time.Duration; | ||
import java.util.List; | ||
import java.util.Optional; | ||
|
@@ -190,20 +192,30 @@ protected boolean shouldStopRetrying(int numTimesFailed) { | |
public <T, C extends PreCommitCondition, E extends Exception> T runTaskWithConditionThrowOnConflict( | ||
C condition, ConditionAwareTransactionTask<T, C, E> task) throws E, TransactionFailedRetriableException { | ||
checkOpen(); | ||
OpenTransaction openTransaction; | ||
try (OpenTransactionImpl openTransaction = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SO the way this code works is:
Meaning, we improve the flow here, without breaking any functionality. |
||
runTimed(() -> Iterables.getOnlyElement(startTransactions(ImmutableList.of(condition))), "setupTask")) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For interest, it was very funny to have a look at the implementation of List.of in the JDK, and weirdly, it has List12, which is kinda bizzareo. Guava defo does better here |
||
return openTransaction.execute(txn -> task.execute(txn, condition)); | ||
} | ||
} | ||
|
||
@Override | ||
public List<OpenTransactionImpl> startTransactions(List<? extends PreCommitCondition> conditions) { | ||
try { | ||
openTransaction = runTimed( | ||
() -> Iterables.getOnlyElement(startTransactions(ImmutableList.of(condition))), "setupTask"); | ||
return startTransactionsInternal(conditions); | ||
} catch (Exception e) { | ||
condition.cleanup(); | ||
Closer closer = Closer.create(); // N.B. using closer to run all cleanup tasks even if one cleanup throws | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yaaas, much better There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although, I just started reading startTransactionsInternal, and my immediate thought was "shit we forgot to cleanup conditions when that throws exceptions... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if instead of this split thing, we try something like this:
As you acquire resources, you register things with it, so you'd immediately accumulate all of the conditions there. Then in #startTransactionsImpl, IFF you end up with an exception, you execute the closer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically, this is SUCH critical code, that from a reader's perspective, EVERYTIME I see resource acquisition, on the next line I want it being registered with SOMETHING that's responsible for cleaning it up in case of exceptions. |
||
conditions.forEach(condition -> closer.register(condition::cleanup)); | ||
try { | ||
closer.close(); | ||
} catch (IOException ex) { | ||
e.addSuppressed(ex); | ||
fsamuel-bs marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we instead use your ThreadSafeCloser and add a method #closeAfterException(Throwable) or something that will do the logging and #addSuppressed? I'm also honestly not that fussed with #addSuppressed, it feels like noise. And specifically, in the internal skiing project, might lead to giant stacktraces (and just lots of logging?), because we use pretty large batches for #startTransactions? How about we log the individual failures somehow as separate loglines |
||
log.info("Failed to cleanup pre-commit conditions on startTransaction failure", ex); | ||
} | ||
throw e; | ||
} | ||
return openTransaction.finishWithCallback( | ||
transaction -> task.execute(transaction, condition), condition::cleanup); | ||
} | ||
|
||
@Override | ||
public List<OpenTransaction> startTransactions(List<? extends PreCommitCondition> conditions) { | ||
private List<OpenTransactionImpl> startTransactionsInternal(List<? extends PreCommitCondition> conditions) { | ||
if (conditions.isEmpty()) { | ||
return ImmutableList.of(); | ||
} | ||
|
@@ -224,35 +236,63 @@ public List<OpenTransaction> startTransactions(List<? extends PreCommitCondition | |
recordImmutableTimestamp(immutableTs); | ||
cleaner.punch(responses.get(0).startTimestampAndPartition().timestamp()); | ||
|
||
List<OpenTransaction> transactions = Streams.zip( | ||
List<OpenTransactionImpl> transactions = Streams.zip( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, something doesn't add up here if you're having to make this available and return the impl There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about, instead of having the #execute method on OpenTransactionImpl, just inline it here? Like you're really not gaining anything from this, but now EVERYTHING is polluted with these weird generics. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct me if there's a deeper reason |
||
responses.stream(), conditions.stream(), (response, condition) -> { | ||
LockToken immutableTsLock = | ||
response.immutableTimestamp().getLock(); | ||
LongSupplier startTimestampSupplier = response.startTimestampAndPartition()::timestamp; | ||
|
||
ExpectationsAwareTransaction transaction = createTransaction( | ||
immutableTs, startTimestampSupplier, immutableTsLock, condition); | ||
transaction.onSuccess( | ||
() -> lockWatchManager.onTransactionCommit(transaction.getTimestamp())); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, deep breaths before reviewing this... |
||
transaction.onCommitOrAbort(transaction::reportExpectationsCollectedData); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one: should this just be an internal transaction thing in SnapshotTransaction? Why is this even registered here, as opposed to much deeper? This definitely feels silly, it shouldn't even be on the interface. |
||
transaction.onCommitOrAbort(condition::cleanup); | ||
transaction.onCommitOrAbort( | ||
() -> lockWatchManager.requestTransactionStateRemovalFromCache( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alternative to this VERY CAREFUL ORDERING, is to actually delegate this to LockWatchManager honestly? It feels wrong that such CRITICAL piece of functionality has high code-distance from LockWatchManager itself. This is probably a pre-req for this PR merging? I'd support something like maybe:
|
||
response.startTimestampAndPartition() | ||
.timestamp())); | ||
// N.B. run this before clearing the state from the cache with | ||
// requestTransactionStateRemovalFromCache | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was quite nuanced:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also thought of writing
but decided not do this because I like guaranteeing both are run in case one fails. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also thought of adding Edit: decided to go down this route and added |
||
transaction.onCommitOrAbort(() -> { | ||
if (!transaction.isAborted()) { | ||
// in onCommitOrAbort + not aborted == committed | ||
lockWatchManager.onTransactionCommit(transaction.getTimestamp()); | ||
} | ||
}); | ||
return new OpenTransactionImpl(transaction, immutableTsLock); | ||
}) | ||
.collect(Collectors.toList()); | ||
openTransactionCounter.inc(transactions.size()); | ||
return transactions; | ||
} catch (Throwable t) { | ||
responses.forEach(response -> lockWatchManager.requestTransactionStateRemovalFromCache( | ||
response.startTimestampAndPartition().timestamp())); | ||
timelockService.tryUnlock(responses.stream() | ||
// In case of failure, we need to remove the transaction from our local lock-watch cache and unlocking | ||
// immutable timestamp. | ||
// Note that in case we don't throw, the immutable timestamp is registered to be unlocked in the | ||
// SnapshotTransaction constructor. But in case of failure, we need to manually unlock it here. | ||
|
||
Closer closer = Closer.create(); // N.B. using closer to run all cleanup tasks even if one cleanup throws | ||
responses.forEach( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note for reviewers: Closer goes in stack order, so we're technically changing the order here. It shouldn't matter, these are unrelated. |
||
response -> closer.register(() -> lockWatchManager.requestTransactionStateRemovalFromCache( | ||
response.startTimestampAndPartition().timestamp()))); | ||
closer.register(() -> timelockService.tryUnlock(responses.stream() | ||
.map(response -> response.immutableTimestamp().getLock()) | ||
.collect(Collectors.toSet())); | ||
.collect(Collectors.toSet()))); | ||
|
||
try { | ||
closer.close(); | ||
} catch (IOException e) { | ||
t.addSuppressed(e); | ||
fsamuel-bs marked this conversation as resolved.
Show resolved
Hide resolved
|
||
log.info("Failed to cleanup startTransaction resources on startTransaction failure", t); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, you're doing this again which makes me think my original comments are valuable. |
||
} | ||
throw Throwables.rewrapAndThrowUncheckedException(t); | ||
} | ||
} | ||
|
||
private final class OpenTransactionImpl extends ForwardingTransaction implements OpenTransaction { | ||
final class OpenTransactionImpl extends ForwardingTransaction implements OpenTransaction { | ||
|
||
private final ExpectationsAwareTransaction delegate; | ||
private final LockToken immutableTsLock; | ||
private boolean hasClosed = false; | ||
|
||
private OpenTransactionImpl(ExpectationsAwareTransaction delegate, LockToken immutableTsLock) { | ||
this.delegate = delegate; | ||
|
@@ -265,33 +305,29 @@ public ExpectationsAwareTransaction delegate() { | |
} | ||
|
||
@Override | ||
public <T, E extends Exception> T finish(TransactionTask<T, E> task) | ||
throws E, TransactionFailedRetriableException { | ||
return finishWithCallback(task, () -> {}); | ||
} | ||
|
||
@Override | ||
public <T, E extends Exception> T finishWithCallback(TransactionTask<T, E> task, Runnable callback) | ||
throws E, TransactionFailedRetriableException { | ||
Timer postTaskTimer = getTimer("finishTask"); | ||
Timer.Context postTaskContext; | ||
|
||
TransactionTask<T, E> wrappedTask = wrapTaskIfNecessary(task, immutableTsLock); | ||
public synchronized void close() { | ||
if (hasClosed) { | ||
// Some operations in close are not idempotent, e.g. openTransactionCounter.dec(). | ||
jkozlowski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Let's guarantee we run close() only once, and no-op in following runs to respect Closeable interface. | ||
return; | ||
} | ||
|
||
ExpectationsAwareTransaction txn = delegate; | ||
T result; | ||
try { | ||
txn.onCommitOrAbort(txn::reportExpectationsCollectedData); | ||
txn.onCommitOrAbort(callback); | ||
result = runTaskThrowOnConflict(wrappedTask, txn); | ||
if (txn.isUncommitted()) { | ||
txn.abort(); | ||
} | ||
} finally { | ||
lockWatchManager.requestTransactionStateRemovalFromCache(getTimestamp()); | ||
postTaskContext = postTaskTimer.time(); | ||
openTransactionCounter.dec(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if this was ALSO moved somewhere higher? to somewhere around the place where we increment this value? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that would be the most correct thing |
||
} | ||
scrubForAggressiveHardDelete(extractSnapshotTransaction(txn)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly, should ScrubForAggressiveHardDelete happen only on the happy path, or always, regardless of exceptions? |
||
postTaskContext.stop(); | ||
return result; | ||
|
||
hasClosed = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so IF you're making sure this is idempotent and e.g. openTransactionCounter.dec is not called multiple times and that's your intent, let's stick this in the finally? |
||
} | ||
|
||
<T, E extends Exception> T execute(TransactionTask<T, E> task) { | ||
TransactionTask<T, E> wrappedTask = wrapTaskIfNecessary(task, immutableTsLock); | ||
return runTimed(() -> runTaskThrowOnConflict(wrappedTask, delegate), "runTaskThrowOnConflict"); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -192,7 +192,9 @@ public TransactionToken startTransaction() { | |
public void commit(TransactionToken token) { | ||
OpenTransaction openTxn = transactions.getIfPresent(token); | ||
if (openTxn != null) { | ||
openTxn.finish((TxTask) transaction -> null); | ||
try (openTxn) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you strictly need the try-with-resources? It doesn't feel like you would? |
||
openTxn.commit(); | ||
} | ||
transactions.invalidate(token); | ||
} | ||
} | ||
|
@@ -201,10 +203,9 @@ public void commit(TransactionToken token) { | |
public void abort(TransactionToken token) { | ||
OpenTransaction openTxn = transactions.getIfPresent(token); | ||
if (openTxn != null) { | ||
openTxn.finish((TxTask) transaction -> { | ||
transaction.abort(); | ||
return null; | ||
}); | ||
try (openTxn) { | ||
openTxn.abort(); | ||
} | ||
transactions.invalidate(token); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"and cleans up"?