-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tasks ordered processing (draft) #252
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,7 +72,18 @@ class DefaultMigrationManager { | |
8, | ||
"Update length of invocation column on outbox for MySQL dialects only.", | ||
"ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT", | ||
Map.of(Dialect.POSTGRESQL_9, "", Dialect.H2, ""))); | ||
Map.of(Dialect.POSTGRESQL_9, "", Dialect.H2, "")), | ||
new Migration( | ||
9, | ||
"Add createTime column to outbox", | ||
"ALTER TABLE TXNO_OUTBOX ADD COLUMN createTime TIMESTAMP(6) NULL AFTER invocation", | ||
Map.of( | ||
Dialect.POSTGRESQL_9, | ||
"ALTER TABLE TXNO_OUTBOX ADD COLUMN createTime TIMESTAMP(6)")), | ||
new Migration( | ||
10, | ||
"Add groupId column to outbox", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder whether
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, no problem, although these 2 names are fit for kafka/MQs mostly - 99.9% chance that some of that is used with the outbox pattern, but it's theoretically possible to do smth else instead of sending messages (e.g. rest calls) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I don't mean to imply that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Message groups is the term AWS SNS/SQS FIFOs use for the boundary within which ordering is ensured (within a queue or topic), so the “group” term makes sense at least for those users. For Kafka users “partition” would be the equivalent (once again within a topic). So both group or partition seem to be reasonable choices. |
||
"ALTER TABLE TXNO_OUTBOX ADD COLUMN groupId VARCHAR(250)")); | ||
|
||
static void migrate(TransactionManager transactionManager, @NotNull Dialect dialect) { | ||
transactionManager.inTransaction( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,7 +37,8 @@ | |
public class DefaultPersistor implements Persistor { | ||
|
||
private static final String ALL_FIELDS = | ||
"id, uniqueRequestId, invocation, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version"; | ||
"id, uniqueRequestId, invocation, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version, " | ||
+ "createTime, groupId"; | ||
|
||
/** | ||
* @param writeLockTimeoutSeconds How many seconds to wait before timing out on obtaining a write | ||
|
@@ -93,7 +94,7 @@ public void migrate(TransactionManager transactionManager) { | |
public void save(Transaction tx, TransactionOutboxEntry entry) | ||
throws SQLException, AlreadyScheduledException { | ||
var insertSql = | ||
"INSERT INTO " + tableName + " (" + ALL_FIELDS + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; | ||
"INSERT INTO " + tableName + " (" + ALL_FIELDS + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; | ||
var writer = new StringWriter(); | ||
serializer.serializeInvocation(entry.getInvocation(), writer); | ||
if (entry.getUniqueRequestId() == null) { | ||
|
@@ -133,6 +134,9 @@ private void setupInsert( | |
stmt.setBoolean(7, entry.isBlocked()); | ||
stmt.setBoolean(8, entry.isProcessed()); | ||
stmt.setInt(9, entry.getVersion()); | ||
stmt.setTimestamp( | ||
10, entry.getCreateTime() == null ? null : Timestamp.from(entry.getCreateTime())); | ||
stmt.setString(11, entry.getGroupId()); | ||
} | ||
|
||
@Override | ||
|
@@ -241,14 +245,53 @@ public List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, I | |
+ ALL_FIELDS | ||
+ " FROM " | ||
+ tableName | ||
+ " WHERE nextAttemptTime < ? AND blocked = false AND processed = false LIMIT ?" | ||
+ " WHERE nextAttemptTime < ? AND blocked = false AND processed = false and groupid is null LIMIT ?" | ||
+ forUpdate)) { | ||
stmt.setTimestamp(1, Timestamp.from(now)); | ||
stmt.setInt(2, batchSize); | ||
return gatherResults(batchSize, stmt); | ||
} | ||
} | ||
|
||
@Override | ||
public List<TransactionOutboxEntry> selectBatchOrdered( | ||
final Transaction tx, final int batchSize, final Instant now) throws Exception { | ||
String forUpdate = dialect.isSupportsSkipLock() ? " FOR UPDATE SKIP LOCKED" : ""; | ||
try (PreparedStatement stmt = | ||
tx.connection() | ||
.prepareStatement( | ||
dialect.isSupportsWindowFunctions() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Strictly, supports window functions and CTEs. Worth having two flags here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or just ditch the CTE and replace it with a subquery... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably yes, it's just happened that the 3 RDBMS which are used here and support WF also support CTE (I think the latter appeared to the SQL standard prior to the former) |
||
? "WITH t AS" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have you done an I have a feeling not having There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I kind of disagree about At the same time, I think we can add an index for |
||
+ " (" | ||
+ " SELECT rank() over (PARTITION BY groupid ORDER BY createtime) AS rn, " + ALL_FIELDS | ||
+ " FROM " + tableName | ||
+ " WHERE processed = false" | ||
+ " ) " | ||
+ " SELECT " + ALL_FIELDS | ||
+ " FROM t " | ||
+ " WHERE rn = 1 AND nextAttemptTime < ? AND blocked = false AND groupid IS NOT null LIMIT ?" | ||
+ forUpdate | ||
: | ||
"SELECT " | ||
+ ALL_FIELDS | ||
+ " FROM" | ||
+ " (" | ||
+ " SELECT groupid AS group_id, MIN(createtime) AS min_create_time" | ||
+ " FROM " + tableName | ||
+ " WHERE processed = false" | ||
+ " GROUP BY group_id" | ||
+ " ) AS t" | ||
+ " JOIN " + tableName + " t1" | ||
+ " ON t1.groupid = t.group_id AND t1.createtime = t.min_create_time" | ||
+ " WHERE nextAttemptTime < ? AND blocked = false AND groupid IS NOT null LIMIT ?" | ||
+ forUpdate | ||
)) { | ||
stmt.setTimestamp(1, Timestamp.from(now)); | ||
stmt.setInt(2, batchSize); | ||
return gatherResults(batchSize, stmt); | ||
} | ||
} | ||
|
||
@Override | ||
public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) | ||
throws Exception { | ||
|
@@ -289,6 +332,11 @@ private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOExceptio | |
.blocked(rs.getBoolean("blocked")) | ||
.processed(rs.getBoolean("processed")) | ||
.version(rs.getInt("version")) | ||
.createTime( | ||
rs.getTimestamp("createTime") == null | ||
? null | ||
: rs.getTimestamp("createTime").toInstant()) | ||
.groupId(rs.getString("groupId")) | ||
.build(); | ||
log.debug("Found {}", entry); | ||
return entry; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -88,6 +88,8 @@ static TransactionOutboxBuilder builder() { | |
@SuppressWarnings("UnusedReturnValue") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Javadoc needs updating to reflect that grouped records are ignored and these should be handled by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, they will be added (I've already written some of them but haven't pushed yet) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @amseager Would you be able to push those changes? |
||
boolean flush(); | ||
|
||
boolean flushOrdered(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also needs Javadoc |
||
|
||
/** | ||
* Unblocks a blocked entry and resets the attempt count so that it will be retried again. | ||
* Requires an active transaction and a transaction manager that supports thread local context. | ||
|
@@ -138,6 +140,7 @@ abstract class TransactionOutboxBuilder { | |
protected Boolean serializeMdc; | ||
protected Duration retentionThreshold; | ||
protected Boolean initializeImmediately; | ||
protected Boolean submitImmediately; | ||
|
||
protected TransactionOutboxBuilder() {} | ||
|
||
|
@@ -284,6 +287,16 @@ public TransactionOutboxBuilder initializeImmediately(boolean initializeImmediat | |
return this; | ||
} | ||
|
||
/** | ||
* @param submitImmediately If true, a task will be submitted immediately after scheduling. | ||
* Defaults to true. | ||
* @return Builder. | ||
*/ | ||
public TransactionOutboxBuilder submitImmediately(boolean submitImmediately) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At the moment, you can do Can we set Better still: is this option required at all? I can't see that it adds anything other than a delay, if you're not using a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right, added it by analogy to other settings initially but now I see that it's mostly useless There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The submit immediately option might not be required to implement ordered processing. However, it could be useful if you want to separate transaction-processing compute from task-processing compute, i.e. only process tasks using |
||
this.submitImmediately = submitImmediately; | ||
return this; | ||
} | ||
|
||
/** | ||
* Creates and initialises the {@link TransactionOutbox}. | ||
* | ||
|
@@ -310,6 +323,10 @@ interface ParameterizedScheduleBuilder { | |
*/ | ||
ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId); | ||
|
||
default ParameterizedScheduleBuilder groupId(String groupId) { | ||
return this; | ||
} | ||
|
||
/** | ||
* Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters | ||
* to the request as configured using {@link TransactionOutbox#with()}. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ class TransactionOutboxImpl implements TransactionOutbox { | |
private final boolean serializeMdc; | ||
private final Validator validator; | ||
@NotNull private final Duration retentionThreshold; | ||
@NotNull private final boolean submitImmediately; | ||
private final AtomicBoolean initialized = new AtomicBoolean(); | ||
private final ProxyFactory proxyFactory = new ProxyFactory(); | ||
|
||
|
@@ -62,7 +63,8 @@ private TransactionOutboxImpl( | |
Level logLevelTemporaryFailure, | ||
Boolean serializeMdc, | ||
Duration retentionThreshold, | ||
Boolean initializeImmediately) { | ||
Boolean initializeImmediately, | ||
Boolean submitImmediately) { | ||
this.transactionManager = transactionManager; | ||
this.instantiator = Utils.firstNonNull(instantiator, Instantiator::usingReflection); | ||
this.persistor = persistor; | ||
|
@@ -76,6 +78,7 @@ private TransactionOutboxImpl( | |
this.validator = new Validator(this.clockProvider); | ||
this.serializeMdc = serializeMdc == null || serializeMdc; | ||
this.retentionThreshold = retentionThreshold == null ? Duration.ofDays(7) : retentionThreshold; | ||
this.submitImmediately = submitImmediately == null || submitImmediately; | ||
this.validator.validate(this); | ||
if (initializeImmediately == null || initializeImmediately) { | ||
initialize(); | ||
|
@@ -100,7 +103,7 @@ public void initialize() { | |
|
||
@Override | ||
public <T> T schedule(Class<T> clazz) { | ||
return schedule(clazz, null); | ||
return schedule(clazz, null, null); | ||
} | ||
|
||
@Override | ||
|
@@ -111,23 +114,34 @@ public ParameterizedScheduleBuilder with() { | |
@SuppressWarnings("UnusedReturnValue") | ||
@Override | ||
public boolean flush() { | ||
return flush(false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not just do both flush types in the one call to We just want to clear out all work. Why complicate it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea was to use separate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting. I'll ponder. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about:
At least this way |
||
} | ||
|
||
@Override | ||
public boolean flushOrdered() { | ||
return flush(true); | ||
} | ||
|
||
private boolean flush(boolean isOrdered) { | ||
if (!initialized.get()) { | ||
throw new IllegalStateException("Not initialized"); | ||
} | ||
Instant now = clockProvider.getClock().instant(); | ||
List<TransactionOutboxEntry> batch = flush(now); | ||
List<TransactionOutboxEntry> batch = flush(now, isOrdered); | ||
expireIdempotencyProtection(now); | ||
return !batch.isEmpty(); | ||
} | ||
|
||
private List<TransactionOutboxEntry> flush(Instant now) { | ||
private List<TransactionOutboxEntry> flush(Instant now, boolean isOrdered) { | ||
log.debug("Flushing stale tasks"); | ||
var batch = | ||
transactionManager.inTransactionReturns( | ||
transaction -> { | ||
List<TransactionOutboxEntry> result = new ArrayList<>(flushBatchSize); | ||
uncheckedly(() -> persistor.selectBatch(transaction, flushBatchSize, now)) | ||
.forEach( | ||
uncheckedly(() -> isOrdered ? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The tasks will run in parallel at the moment, even if you submit them in order. I think I'd use
Each task can remove itself from the map when it completes, so it doesn't grow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't get about the concurrency problem tbh, could you elaborate pls? About future chains - is it for a solution when we select all tasks of each group at one call instead? I'm going to experiment with this approach. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, so the idea is that you select all tasks for all groups, but queue up each group separately, something like this:
This ignores the presence of The result of this is a chain of |
||
persistor.selectBatchOrdered(transaction, flushBatchSize, now) : | ||
persistor.selectBatch(transaction, flushBatchSize, now) | ||
).forEach( | ||
entry -> { | ||
log.debug("Reprocessing {}", entry.description()); | ||
try { | ||
|
@@ -214,7 +228,7 @@ public boolean unblock(String entryId, Object transactionContext) { | |
} | ||
} | ||
|
||
private <T> T schedule(Class<T> clazz, String uniqueRequestId) { | ||
private <T> T schedule(Class<T> clazz, String uniqueRequestId, String groupId) { | ||
if (!initialized.get()) { | ||
throw new IllegalStateException("Not initialized"); | ||
} | ||
|
@@ -230,16 +244,19 @@ private <T> T schedule(Class<T> clazz, String uniqueRequestId) { | |
extracted.getMethodName(), | ||
extracted.getParameters(), | ||
extracted.getArgs(), | ||
uniqueRequestId); | ||
uniqueRequestId, | ||
groupId); | ||
validator.validate(entry); | ||
persistor.save(extracted.getTransaction(), entry); | ||
extracted | ||
.getTransaction() | ||
.addPostCommitHook( | ||
() -> { | ||
listener.scheduled(entry); | ||
submitNow(entry); | ||
}); | ||
if (submitImmediately) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. discussed above, I'll remove |
||
extracted | ||
.getTransaction() | ||
.addPostCommitHook( | ||
() -> { | ||
listener.scheduled(entry); | ||
submitNow(entry); | ||
}); | ||
} | ||
log.debug( | ||
"Scheduled {} for running after transaction commit", entry.description()); | ||
return null; | ||
|
@@ -298,7 +315,7 @@ private void invoke(TransactionOutboxEntry entry, Transaction transaction) | |
} | ||
|
||
private TransactionOutboxEntry newEntry( | ||
Class<?> clazz, String methodName, Class<?>[] params, Object[] args, String uniqueRequestId) { | ||
Class<?> clazz, String methodName, Class<?>[] params, Object[] args, String uniqueRequestId, String groupId) { | ||
return TransactionOutboxEntry.builder() | ||
.id(UUID.randomUUID().toString()) | ||
.invocation( | ||
|
@@ -311,6 +328,8 @@ private TransactionOutboxEntry newEntry( | |
.lastAttemptTime(null) | ||
.nextAttemptTime(after(attemptFrequency)) | ||
.uniqueRequestId(uniqueRequestId) | ||
.createTime(clockProvider.getClock().instant()) | ||
.groupId(groupId) | ||
.build(); | ||
} | ||
|
||
|
@@ -388,7 +407,8 @@ public TransactionOutboxImpl build() { | |
logLevelTemporaryFailure, | ||
serializeMdc, | ||
retentionThreshold, | ||
initializeImmediately); | ||
initializeImmediately, | ||
submitImmediately); | ||
} | ||
} | ||
|
||
|
@@ -397,16 +417,25 @@ private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleB | |
@Length(max = 250) | ||
private String uniqueRequestId; | ||
|
||
@Length(max = 250) | ||
private String groupId; | ||
|
||
@Override | ||
public ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId) { | ||
this.uniqueRequestId = uniqueRequestId; | ||
return this; | ||
} | ||
|
||
@Override | ||
public ParameterizedScheduleBuilder groupId(final String groupId) { | ||
this.groupId = groupId; | ||
return this; | ||
} | ||
|
||
@Override | ||
public <T> T schedule(Class<T> clazz) { | ||
validator.validate(this); | ||
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId); | ||
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, groupId); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timestamps aren't guaranteed 100% reliable for ordering; it's theoretically possible to write two records at exactly the same time.
I guess the question is whether that's of particular concern. We are watching this PR with interest here, since we may have a use for it, but our use case actually doesn't care about ordering at all; we're looking to use it to throttle processing to one task per group id at a time as a way of "rationing" our batch processing capability between customers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't need to have that "super-precise" reliability in my project either, but I think it may be useful for others.
I'd probably go with a single sequence (or auto-incremented column) for a counter but use it only in tandem with the
createTime
field in order to mitigate a potential "max-value to min-value" problem almost completely.Need to brush up MySQL 5 capabilities for all of that though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've used a special sequence table for this in the past, to mimic the flexibility of oracle sequences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amseager With the potential "max-value to min-value" problem, are you referring to a sequence wrapping around from its maximum value to its minimum value?