Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tasks ordered processing (draft) #252

Closed
wants to merge 1 commit into from
Closed

Conversation

amseager
Copy link
Contributor

Plan to add the functionality described in #198

For tasks that need to be ordered, the main idea is to use a separate TransactionOutbox instance with a shorter next attempt and job flush intervals, and optionally with disabled submitNow functionality (so tasks are being sent only by a "flush"-job which now has a new flushOrdered method).
Currently, ordering is happened by the new createTime property in each group.
It's intended to set some value in groupId() of transactionOutbox.with() builder.

Would like to get some comments about the approach if you don't mind.
Ofc I'll add docs/tests for it later (decided not to do it until a superficial review).

@amseager
Copy link
Contributor Author

There are some open questions from the original issue that are not covered atm:

  • ordering by any other property (like in priority queue) - don't know if it's really needed tbh
  • selecting all tasks from a group in a single job iteration but processing them in order - could be designed and made in the future, but currently, we can just use short job intervals to achieve almost the same

@amseager amseager marked this pull request as draft February 18, 2022 14:10
@amseager amseager marked this pull request as ready for review February 23, 2022 13:04
@badgerwithagun
Copy link
Member

Is this awaiting review @amseager? It still says "draft" in the title :)

@amseager
Copy link
Contributor Author

@badgerwithagun sorry, probably pressed "ready to review" accidentally

At the same time, could you please make a quick review of the chosen approach? The production code is more or less ready atm, but I'm still not sure if it's ok conceptually

@amseager amseager marked this pull request as draft February 25, 2022 18:52
new Migration(
9,
"Add createTime column to outbox",
"ALTER TABLE TXNO_OUTBOX ADD COLUMN createTime TIMESTAMP(6) NULL AFTER invocation",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timestamps aren't guaranteed 100% reliable for ordering; it's theoretically possible to write two records at exactly the same time.

I guess the question is whether that's of particular concern. We are watching this PR with interest here, since we may have a use for it, but our use case actually doesn't care about ordering at all; we're looking to use it to throttle processing to one task per group id at a time as a way of "rationing" our batch processing capability between customers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't need to have that "super-precise" reliability in my project either, but I think it may be useful for others.

I'd probably go with a single sequence (or auto-incremented column) for a counter but use it only in tandem with the createTime field in order to mitigate a potential "max-value to min-value" problem almost completely.
Need to brush up MySQL 5 capabilities for all of that though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've used a special sequence table for this in the past, to mimic the flexibility of oracle sequences.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amseager With the potential "max-value to min-value" problem, are you referring to a sequence wrapping around from its maximum value to its minimum value?

"ALTER TABLE TXNO_OUTBOX ADD COLUMN createTime TIMESTAMP(6)")),
new Migration(
10,
"Add groupId column to outbox",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether partitionId or topicId might be more obvious terms for what this is doing?

topicId probably being my favourite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, no problem, although these 2 names are fit for kafka/MQs mostly - 99.9% chance that some of that is used with the outbox pattern, but it's theoretically possible to do smth else instead of sending messages (e.g. rest calls)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't mean to imply that groupIds are an SQS queue or Kafka topic, only that they are an analog of these, so there's probably no point adding another term to the mix.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Message groups is the term AWS SNS/SQS FIFOs use for the boundary within which ordering is ensured (within a queue or topic), so the “group” term makes sense at least for those users. For Kafka users “partition” would be the equivalent (once again within a topic). So both group or partition seem to be reasonable choices.

try (PreparedStatement stmt =
tx.connection()
.prepareStatement(
dialect.isSupportsWindowFunctions()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly, supports window functions and CTEs. Worth having two flags here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just ditch the CTE and replace it with a subquery...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably yes, it's just happened that the 3 RDBMS which are used here and support WF also support CTE (I think the latter appeared to the SQL standard prior to the former)
about a subquery - IIRC I had some problems with that here (hence moved to CTE) but I'll try to do it again

tx.connection()
.prepareStatement(
dialect.isSupportsWindowFunctions()
? "WITH t AS"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you done an EXPLAIN on these queries on the different DBs and determined if we have suitable indexes?

I have a feeling not having groupId anywhere is going to make this pretty slow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of disagree about groupId because here we fetch all groups at once so the cardinality of this column is effectively 50/50 (filled or not), and I'm not sure will RDBMS ever use that index even after collecting proper statistics.

At the same time, I think we can add an index for processed (it's technically 50/50 too but obviously we'll always have much fewer unprocessed rows that processed ones while selecting, and also hints/statistics can help here).
We actually have a composite index for processed, blacklisted, nextAttemptTime already which should work for a single processed column in a where clause too (at least in PG, need to check it it other supported DBs)

@@ -88,6 +88,8 @@ static TransactionOutboxBuilder builder() {
@SuppressWarnings("UnusedReturnValue")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc needs updating to reflect that grouped records are ignored and these should be handled by flushOrdered

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, they will be added (I've already written some of them but haven't pushed yet)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amseager Would you be able to push those changes?

@@ -88,6 +88,8 @@ static TransactionOutboxBuilder builder() {
@SuppressWarnings("UnusedReturnValue")
boolean flush();

boolean flushOrdered();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushTopics() (if you agree with me on the naming)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also needs Javadoc

* Defaults to true.
* @return Builder.
*/
public TransactionOutboxBuilder submitImmediately(boolean submitImmediately) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, you can do groupId(...) without submitImmediately(...) and it looks like the behaviour is unexpected; because the task is submitted immediately, it ignores ordering.

Can we set submitImmediately to true automatically and refuse to allow submitImmediately(false) in combination with a groupId?

Better still: is this option required at all? I can't see that it adds anything other than a delay, if you're not using a groupId. So why not just remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, added it by analogy to other settings initially but now I see that it's mostly useless

Copy link

@markushc markushc Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The submit immediately option might not be required to implement ordered processing. However, it could be useful if you want to separate transaction-processing compute from task-processing compute, i.e. only process tasks using flush from a separate JVM. This would also make the transaction-outbox project more suitable for use with serverless compute like AWS Lambda functions, where tasks cannot be processed in background threads.

log.debug("Flushing stale tasks");
var batch =
transactionManager.inTransactionReturns(
transaction -> {
List<TransactionOutboxEntry> result = new ArrayList<>(flushBatchSize);
uncheckedly(() -> persistor.selectBatch(transaction, flushBatchSize, now))
.forEach(
uncheckedly(() -> isOrdered ?
Copy link
Member

@badgerwithagun badgerwithagun Feb 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tasks will run in parallel at the moment, even if you submit them in order.

I think I'd use CompletableFuture chaining to put each task on the end of a future chain for the groupId. You just need to hold onto the last CompletableFuture you submitted for each groupId, until the task completes:

ConcurrentMap<String, CompletableFuture> tasks;

Each task can remove itself from the map when it completes, so it doesn't grow.

Copy link
Contributor Author

@amseager amseager Feb 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't get about the concurrency problem tbh, could you elaborate pls?
Here, we have only 1st task per group selected in a single flush iteration, and it seems it should be impossible to select the 2nd one before completing the 1st because of processed condition (but probably I don't see smth).
So I thought the problem can occur only in a multi-instance environment which could be solved as it did in the existing (batched) select (with locks etc.)

About future chains - is it for a solution when we select all tasks of each group at one call instead? I'm going to experiment with this approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so the idea is that you select all tasks for all groups, but queue up each group separately, something like this:

ConcurrentMap<String, CompletableFuture<Void>> tasks = new ConcurrentMap<>();
...
persistor.selectBatchOrdered(transaction, flushBatchSize, now)
  .forEach(entry ->
    tasks.compute(
      entry.groupId(),
      (groupId, future) -> future == null
        ? CompletableFuture.runAsync(() -> processNow(entry), executor)
        : future.thenRunAsync(() -> processNow(entry), executor));

This ignores the presence of Submitter, which complicates things slightly, but it demonstrates my thinking/

The result of this is a chain of CompletableFutures for each group, resulting in each task in a group running after the last task in the group completes.

@@ -111,23 +114,34 @@ public ParameterizedScheduleBuilder with() {
@SuppressWarnings("UnusedReturnValue")
@Override
public boolean flush() {
return flush(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just do both flush types in the one call to flush()?

We just want to clear out all work. Why complicate it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to use separate TransactionOutbox instances for flushing ordered and non-ordered tasks because the scheduler params like a delay between iterations etc. (and flush frequency itself) are planned to be much fewer in the 1st case than in the 2nd. What do you think about this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I'll ponder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

  • flush() - does everything
  • flushUnordered()
  • flushAllTopics()
  • flushTopics(String...)

At least this way flush() continues to do everything, so if someone doesn't change their existing scheduled jobs, everything continues to work. If someone wants more granularity, they have it.

listener.scheduled(entry);
submitNow(entry);
});
if (submitImmediately) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (groupId != null)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed above, I'll remove submitImmediately and replace it with a groupId != null check

Copy link
Member

@badgerwithagun badgerwithagun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

@amseager
Copy link
Contributor Author

Many thanks for the review, I'll come back later after fixing all of that

@markushc
Copy link

@amseager Do you still have local changes that you want to push? Would be great to get this pull request finished and merged.

@badgerwithagun
Copy link
Member

Fixed by #588

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants