-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tasks ordered processing (draft) #252
Conversation
There are some open questions from the original issue that are not covered atm:
|
Is this awaiting review @amseager? It still says "draft" in the title :) |
@badgerwithagun sorry, probably pressed "ready to review" accidentally At the same time, could you please make a quick review of the chosen approach? The production code is more or less ready atm, but I'm still not sure if it's ok conceptually |
new Migration( | ||
9, | ||
"Add createTime column to outbox", | ||
"ALTER TABLE TXNO_OUTBOX ADD COLUMN createTime TIMESTAMP(6) NULL AFTER invocation", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timestamps aren't guaranteed 100% reliable for ordering; it's theoretically possible to write two records at exactly the same time.
I guess the question is whether that's of particular concern. We are watching this PR with interest here, since we may have a use for it, but our use case actually doesn't care about ordering at all; we're looking to use it to throttle processing to one task per group id at a time as a way of "rationing" our batch processing capability between customers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't need to have that "super-precise" reliability in my project either, but I think it may be useful for others.
I'd probably go with a single sequence (or auto-incremented column) for a counter but use it only in tandem with the createTime
field in order to mitigate a potential "max-value to min-value" problem almost completely.
Need to brush up MySQL 5 capabilities for all of that though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've used a special sequence table for this in the past, to mimic the flexibility of oracle sequences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amseager With the potential "max-value to min-value" problem, are you referring to a sequence wrapping around from its maximum value to its minimum value?
"ALTER TABLE TXNO_OUTBOX ADD COLUMN createTime TIMESTAMP(6)")), | ||
new Migration( | ||
10, | ||
"Add groupId column to outbox", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether partitionId
or topicId
might be more obvious terms for what this is doing?
topicId
probably being my favourite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, no problem, although these 2 names are fit for kafka/MQs mostly - 99.9% chance that some of that is used with the outbox pattern, but it's theoretically possible to do smth else instead of sending messages (e.g. rest calls)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I don't mean to imply that groupId
s are an SQS queue or Kafka topic, only that they are an analog of these, so there's probably no point adding another term to the mix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Message groups is the term AWS SNS/SQS FIFOs use for the boundary within which ordering is ensured (within a queue or topic), so the “group” term makes sense at least for those users. For Kafka users “partition” would be the equivalent (once again within a topic). So both group or partition seem to be reasonable choices.
try (PreparedStatement stmt = | ||
tx.connection() | ||
.prepareStatement( | ||
dialect.isSupportsWindowFunctions() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly, supports window functions and CTEs. Worth having two flags here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or just ditch the CTE and replace it with a subquery...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably yes, it's just happened that the 3 RDBMS which are used here and support WF also support CTE (I think the latter appeared to the SQL standard prior to the former)
about a subquery - IIRC I had some problems with that here (hence moved to CTE) but I'll try to do it again
tx.connection() | ||
.prepareStatement( | ||
dialect.isSupportsWindowFunctions() | ||
? "WITH t AS" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you done an EXPLAIN
on these queries on the different DBs and determined if we have suitable indexes?
I have a feeling not having groupId
anywhere is going to make this pretty slow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kind of disagree about groupId
because here we fetch all groups at once so the cardinality of this column is effectively 50/50 (filled or not), and I'm not sure will RDBMS ever use that index even after collecting proper statistics.
At the same time, I think we can add an index for processed
(it's technically 50/50 too but obviously we'll always have much fewer unprocessed rows that processed ones while selecting, and also hints/statistics can help here).
We actually have a composite index for processed, blacklisted, nextAttemptTime
already which should work for a single processed
column in a where
clause too (at least in PG, need to check it it other supported DBs)
@@ -88,6 +88,8 @@ static TransactionOutboxBuilder builder() { | |||
@SuppressWarnings("UnusedReturnValue") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc needs updating to reflect that grouped records are ignored and these should be handled by flushOrdered
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, they will be added (I've already written some of them but haven't pushed yet)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amseager Would you be able to push those changes?
@@ -88,6 +88,8 @@ static TransactionOutboxBuilder builder() { | |||
@SuppressWarnings("UnusedReturnValue") | |||
boolean flush(); | |||
|
|||
boolean flushOrdered(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushTopics()
(if you agree with me on the naming)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also needs Javadoc
* Defaults to true. | ||
* @return Builder. | ||
*/ | ||
public TransactionOutboxBuilder submitImmediately(boolean submitImmediately) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the moment, you can do groupId(...)
without submitImmediately(...)
and it looks like the behaviour is unexpected; because the task is submitted immediately, it ignores ordering.
Can we set submitImmediately
to true
automatically and refuse to allow submitImmediately(false)
in combination with a groupId
?
Better still: is this option required at all? I can't see that it adds anything other than a delay, if you're not using a groupId
. So why not just remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, added it by analogy to other settings initially but now I see that it's mostly useless
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The submit immediately option might not be required to implement ordered processing. However, it could be useful if you want to separate transaction-processing compute from task-processing compute, i.e. only process tasks using flush
from a separate JVM. This would also make the transaction-outbox project more suitable for use with serverless compute like AWS Lambda functions, where tasks cannot be processed in background threads.
log.debug("Flushing stale tasks"); | ||
var batch = | ||
transactionManager.inTransactionReturns( | ||
transaction -> { | ||
List<TransactionOutboxEntry> result = new ArrayList<>(flushBatchSize); | ||
uncheckedly(() -> persistor.selectBatch(transaction, flushBatchSize, now)) | ||
.forEach( | ||
uncheckedly(() -> isOrdered ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tasks will run in parallel at the moment, even if you submit them in order.
I think I'd use CompletableFuture
chaining to put each task on the end of a future chain for the groupId
. You just need to hold onto the last CompletableFuture
you submitted for each groupId
, until the task completes:
ConcurrentMap<String, CompletableFuture> tasks;
Each task can remove itself from the map when it completes, so it doesn't grow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't get about the concurrency problem tbh, could you elaborate pls?
Here, we have only 1st task per group selected in a single flush
iteration, and it seems it should be impossible to select the 2nd one before completing the 1st because of processed
condition (but probably I don't see smth).
So I thought the problem can occur only in a multi-instance environment which could be solved as it did in the existing (batched) select (with locks etc.)
About future chains - is it for a solution when we select all tasks of each group at one call instead? I'm going to experiment with this approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so the idea is that you select all tasks for all groups, but queue up each group separately, something like this:
ConcurrentMap<String, CompletableFuture<Void>> tasks = new ConcurrentMap<>();
...
persistor.selectBatchOrdered(transaction, flushBatchSize, now)
.forEach(entry ->
tasks.compute(
entry.groupId(),
(groupId, future) -> future == null
? CompletableFuture.runAsync(() -> processNow(entry), executor)
: future.thenRunAsync(() -> processNow(entry), executor));
This ignores the presence of Submitter
, which complicates things slightly, but it demonstrates my thinking/
The result of this is a chain of CompletableFuture
s for each group, resulting in each task in a group running after the last task in the group completes.
@@ -111,23 +114,34 @@ public ParameterizedScheduleBuilder with() { | |||
@SuppressWarnings("UnusedReturnValue") | |||
@Override | |||
public boolean flush() { | |||
return flush(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just do both flush types in the one call to flush()
?
We just want to clear out all work. Why complicate it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was to use separate TransactionOutbox
instances for flushing ordered and non-ordered tasks because the scheduler params like a delay between iterations etc. (and flush frequency itself) are planned to be much fewer in the 1st case than in the 2nd. What do you think about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. I'll ponder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about:
flush()
- does everythingflushUnordered()
flushAllTopics()
flushTopics(String...)
At least this way flush()
continues to do everything, so if someone doesn't change their existing scheduled jobs, everything continues to work. If someone wants more granularity, they have it.
listener.scheduled(entry); | ||
submitNow(entry); | ||
}); | ||
if (submitImmediately) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (groupId != null)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed above, I'll remove submitImmediately
and replace it with a groupId != null
check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.
Many thanks for the review, I'll come back later after fixing all of that |
@amseager Do you still have local changes that you want to push? Would be great to get this pull request finished and merged. |
Fixed by #588 |
Plan to add the functionality described in #198
For tasks that need to be ordered, the main idea is to use a separate
TransactionOutbox
instance with a shorter next attempt and job flush intervals, and optionally with disabledsubmitNow
functionality (so tasks are being sent only by a "flush"-job which now has a newflushOrdered
method).Currently, ordering is happened by the new
createTime
property in each group.It's intended to set some value in
groupId()
oftransactionOutbox.with()
builder.Would like to get some comments about the approach if you don't mind.
Ofc I'll add docs/tests for it later (decided not to do it until a superficial review).