diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java index 5f9ea6c1..d30532b1 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java @@ -72,7 +72,18 @@ class DefaultMigrationManager { 8, "Update length of invocation column on outbox for MySQL dialects only.", "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT", - Map.of(Dialect.POSTGRESQL_9, "", Dialect.H2, ""))); + Map.of(Dialect.POSTGRESQL_9, "", Dialect.H2, "")), + new Migration( + 9, + "Add createTime column to outbox", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN createTime TIMESTAMP(6) NULL AFTER invocation", + Map.of( + Dialect.POSTGRESQL_9, + "ALTER TABLE TXNO_OUTBOX ADD COLUMN createTime TIMESTAMP(6)")), + new Migration( + 10, + "Add groupId column to outbox", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN groupId VARCHAR(250)")); static void migrate(TransactionManager transactionManager, @NotNull Dialect dialect) { transactionManager.inTransaction( diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java index 396f4e5f..28388c6c 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java @@ -37,7 +37,8 @@ public class DefaultPersistor implements Persistor { private static final String ALL_FIELDS = - "id, uniqueRequestId, invocation, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version"; + "id, uniqueRequestId, invocation, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version, " + + "createTime, groupId"; /** * @param writeLockTimeoutSeconds How many seconds to wait before timing out on obtaining a write @@ -93,7 +94,7 @@ public void migrate(TransactionManager transactionManager) { public void save(Transaction tx, TransactionOutboxEntry entry) throws SQLException, AlreadyScheduledException { var insertSql = - "INSERT INTO " + tableName + " (" + ALL_FIELDS + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "INSERT INTO " + tableName + " (" + ALL_FIELDS + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; var writer = new StringWriter(); serializer.serializeInvocation(entry.getInvocation(), writer); if (entry.getUniqueRequestId() == null) { @@ -133,6 +134,9 @@ private void setupInsert( stmt.setBoolean(7, entry.isBlocked()); stmt.setBoolean(8, entry.isProcessed()); stmt.setInt(9, entry.getVersion()); + stmt.setTimestamp( + 10, entry.getCreateTime() == null ? null : Timestamp.from(entry.getCreateTime())); + stmt.setString(11, entry.getGroupId()); } @Override @@ -241,7 +245,7 @@ public List selectBatch(Transaction tx, int batchSize, I + ALL_FIELDS + " FROM " + tableName - + " WHERE nextAttemptTime < ? AND blocked = false AND processed = false LIMIT ?" + + " WHERE nextAttemptTime < ? AND blocked = false AND processed = false and groupid is null LIMIT ?" + forUpdate)) { stmt.setTimestamp(1, Timestamp.from(now)); stmt.setInt(2, batchSize); @@ -249,6 +253,45 @@ public List selectBatch(Transaction tx, int batchSize, I } } + @Override + public List selectBatchOrdered( + final Transaction tx, final int batchSize, final Instant now) throws Exception { + String forUpdate = dialect.isSupportsSkipLock() ? " FOR UPDATE SKIP LOCKED" : ""; + try (PreparedStatement stmt = + tx.connection() + .prepareStatement( + dialect.isSupportsWindowFunctions() + ? "WITH t AS" + + " (" + + " SELECT rank() over (PARTITION BY groupid ORDER BY createtime) AS rn, " + ALL_FIELDS + + " FROM " + tableName + + " WHERE processed = false" + + " ) " + + " SELECT " + ALL_FIELDS + + " FROM t " + + " WHERE rn = 1 AND nextAttemptTime < ? AND blocked = false AND groupid IS NOT null LIMIT ?" + + forUpdate + : + "SELECT " + + ALL_FIELDS + + " FROM" + + " (" + + " SELECT groupid AS group_id, MIN(createtime) AS min_create_time" + + " FROM " + tableName + + " WHERE processed = false" + + " GROUP BY group_id" + + " ) AS t" + + " JOIN " + tableName + " t1" + + " ON t1.groupid = t.group_id AND t1.createtime = t.min_create_time" + + " WHERE nextAttemptTime < ? AND blocked = false AND groupid IS NOT null LIMIT ?" + + forUpdate + )) { + stmt.setTimestamp(1, Timestamp.from(now)); + stmt.setInt(2, batchSize); + return gatherResults(batchSize, stmt); + } + } + @Override public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) throws Exception { @@ -289,6 +332,11 @@ private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOExceptio .blocked(rs.getBoolean("blocked")) .processed(rs.getBoolean("processed")) .version(rs.getInt("version")) + .createTime( + rs.getTimestamp("createTime") == null + ? null + : rs.getTimestamp("createTime").toInstant()) + .groupId(rs.getString("groupId")) .build(); log.debug("Found {}", entry); return entry; diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java index 9e0c2422..ab8ae158 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java @@ -13,12 +13,13 @@ @Getter @Beta public enum Dialect { - MY_SQL_5(false, Constants.DEFAULT_DELETE_EXPIRED_STMT), - MY_SQL_8(true, Constants.DEFAULT_DELETE_EXPIRED_STMT), + MY_SQL_5(false, false, Constants.DEFAULT_DELETE_EXPIRED_STMT), + MY_SQL_8(true, true, Constants.DEFAULT_DELETE_EXPIRED_STMT), POSTGRESQL_9( + true, true, "DELETE FROM {{table}} WHERE ctid IN (SELECT ctid FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?)"), - H2(false, Constants.DEFAULT_DELETE_EXPIRED_STMT); + H2(false, true, Constants.DEFAULT_DELETE_EXPIRED_STMT); /** * @return True if hot row support ({@code SKIP LOCKED}) is available, increasing performance when @@ -28,6 +29,8 @@ public enum Dialect { @SuppressWarnings("JavaDoc") private final boolean supportsSkipLock; + private final boolean supportsWindowFunctions; + /** @return Format string for the SQL required to delete expired retained records. */ @SuppressWarnings("JavaDoc") private final String deleteExpired; diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java index a9b208b7..16936ec1 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java @@ -105,5 +105,8 @@ static DefaultPersistor forDialect(Dialect dialect) { List selectBatch(Transaction tx, int batchSize, Instant now) throws Exception; + List selectBatchOrdered(Transaction tx, int batchSize, Instant now) + throws Exception; + int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) throws Exception; } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java index af251462..1bf911ce 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java @@ -45,6 +45,12 @@ public List selectBatch(Transaction tx, int batchSize, I return List.of(); } + @Override + public List selectBatchOrdered( + final Transaction tx, final int batchSize, final Instant now) { + return List.of(); + } + @Override public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) { return 0; diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java index 3f288bed..4360510e 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java @@ -88,6 +88,8 @@ static TransactionOutboxBuilder builder() { @SuppressWarnings("UnusedReturnValue") boolean flush(); + boolean flushOrdered(); + /** * Unblocks a blocked entry and resets the attempt count so that it will be retried again. * Requires an active transaction and a transaction manager that supports thread local context. @@ -138,6 +140,7 @@ abstract class TransactionOutboxBuilder { protected Boolean serializeMdc; protected Duration retentionThreshold; protected Boolean initializeImmediately; + protected Boolean submitImmediately; protected TransactionOutboxBuilder() {} @@ -284,6 +287,16 @@ public TransactionOutboxBuilder initializeImmediately(boolean initializeImmediat return this; } + /** + * @param submitImmediately If true, a task will be submitted immediately after scheduling. + * Defaults to true. + * @return Builder. + */ + public TransactionOutboxBuilder submitImmediately(boolean submitImmediately) { + this.submitImmediately = submitImmediately; + return this; + } + /** * Creates and initialises the {@link TransactionOutbox}. * @@ -310,6 +323,10 @@ interface ParameterizedScheduleBuilder { */ ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId); + default ParameterizedScheduleBuilder groupId(String groupId) { + return this; + } + /** * Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters * to the request as configured using {@link TransactionOutbox#with()}. diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java index 76d5b670..02af4f3a 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java @@ -109,6 +109,10 @@ public class TransactionOutboxEntry { @Setter private int version; + @Getter private Instant createTime; + + @Getter private String groupId; + @EqualsAndHashCode.Exclude @ToString.Exclude private volatile boolean initialized; @EqualsAndHashCode.Exclude @ToString.Exclude private String description; diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java index 7bd9678e..d618103c 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java @@ -46,6 +46,7 @@ class TransactionOutboxImpl implements TransactionOutbox { private final boolean serializeMdc; private final Validator validator; @NotNull private final Duration retentionThreshold; + @NotNull private final boolean submitImmediately; private final AtomicBoolean initialized = new AtomicBoolean(); private final ProxyFactory proxyFactory = new ProxyFactory(); @@ -62,7 +63,8 @@ private TransactionOutboxImpl( Level logLevelTemporaryFailure, Boolean serializeMdc, Duration retentionThreshold, - Boolean initializeImmediately) { + Boolean initializeImmediately, + Boolean submitImmediately) { this.transactionManager = transactionManager; this.instantiator = Utils.firstNonNull(instantiator, Instantiator::usingReflection); this.persistor = persistor; @@ -76,6 +78,7 @@ private TransactionOutboxImpl( this.validator = new Validator(this.clockProvider); this.serializeMdc = serializeMdc == null || serializeMdc; this.retentionThreshold = retentionThreshold == null ? Duration.ofDays(7) : retentionThreshold; + this.submitImmediately = submitImmediately == null || submitImmediately; this.validator.validate(this); if (initializeImmediately == null || initializeImmediately) { initialize(); @@ -100,7 +103,7 @@ public void initialize() { @Override public T schedule(Class clazz) { - return schedule(clazz, null); + return schedule(clazz, null, null); } @Override @@ -111,23 +114,34 @@ public ParameterizedScheduleBuilder with() { @SuppressWarnings("UnusedReturnValue") @Override public boolean flush() { + return flush(false); + } + + @Override + public boolean flushOrdered() { + return flush(true); + } + + private boolean flush(boolean isOrdered) { if (!initialized.get()) { throw new IllegalStateException("Not initialized"); } Instant now = clockProvider.getClock().instant(); - List batch = flush(now); + List batch = flush(now, isOrdered); expireIdempotencyProtection(now); return !batch.isEmpty(); } - private List flush(Instant now) { + private List flush(Instant now, boolean isOrdered) { log.debug("Flushing stale tasks"); var batch = transactionManager.inTransactionReturns( transaction -> { List result = new ArrayList<>(flushBatchSize); - uncheckedly(() -> persistor.selectBatch(transaction, flushBatchSize, now)) - .forEach( + uncheckedly(() -> isOrdered ? + persistor.selectBatchOrdered(transaction, flushBatchSize, now) : + persistor.selectBatch(transaction, flushBatchSize, now) + ).forEach( entry -> { log.debug("Reprocessing {}", entry.description()); try { @@ -214,7 +228,7 @@ public boolean unblock(String entryId, Object transactionContext) { } } - private T schedule(Class clazz, String uniqueRequestId) { + private T schedule(Class clazz, String uniqueRequestId, String groupId) { if (!initialized.get()) { throw new IllegalStateException("Not initialized"); } @@ -230,16 +244,19 @@ private T schedule(Class clazz, String uniqueRequestId) { extracted.getMethodName(), extracted.getParameters(), extracted.getArgs(), - uniqueRequestId); + uniqueRequestId, + groupId); validator.validate(entry); persistor.save(extracted.getTransaction(), entry); - extracted - .getTransaction() - .addPostCommitHook( - () -> { - listener.scheduled(entry); - submitNow(entry); - }); + if (submitImmediately) { + extracted + .getTransaction() + .addPostCommitHook( + () -> { + listener.scheduled(entry); + submitNow(entry); + }); + } log.debug( "Scheduled {} for running after transaction commit", entry.description()); return null; @@ -298,7 +315,7 @@ private void invoke(TransactionOutboxEntry entry, Transaction transaction) } private TransactionOutboxEntry newEntry( - Class clazz, String methodName, Class[] params, Object[] args, String uniqueRequestId) { + Class clazz, String methodName, Class[] params, Object[] args, String uniqueRequestId, String groupId) { return TransactionOutboxEntry.builder() .id(UUID.randomUUID().toString()) .invocation( @@ -311,6 +328,8 @@ private TransactionOutboxEntry newEntry( .lastAttemptTime(null) .nextAttemptTime(after(attemptFrequency)) .uniqueRequestId(uniqueRequestId) + .createTime(clockProvider.getClock().instant()) + .groupId(groupId) .build(); } @@ -388,7 +407,8 @@ public TransactionOutboxImpl build() { logLevelTemporaryFailure, serializeMdc, retentionThreshold, - initializeImmediately); + initializeImmediately, + submitImmediately); } } @@ -397,16 +417,25 @@ private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleB @Length(max = 250) private String uniqueRequestId; + @Length(max = 250) + private String groupId; + @Override public ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId) { this.uniqueRequestId = uniqueRequestId; return this; } + @Override + public ParameterizedScheduleBuilder groupId(final String groupId) { + this.groupId = groupId; + return this; + } + @Override public T schedule(Class clazz) { validator.validate(this); - return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId); + return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, groupId); } } }