Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tasks ordered processing (draft) #252

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,18 @@ class DefaultMigrationManager {
8,
"Update length of invocation column on outbox for MySQL dialects only.",
"ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT",
Map.of(Dialect.POSTGRESQL_9, "", Dialect.H2, "")));
Map.of(Dialect.POSTGRESQL_9, "", Dialect.H2, "")),
new Migration(
9,
"Add createTime column to outbox",
"ALTER TABLE TXNO_OUTBOX ADD COLUMN createTime TIMESTAMP(6) NULL AFTER invocation",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timestamps aren't guaranteed 100% reliable for ordering; it's theoretically possible to write two records at exactly the same time.

I guess the question is whether that's of particular concern. We are watching this PR with interest here, since we may have a use for it, but our use case actually doesn't care about ordering at all; we're looking to use it to throttle processing to one task per group id at a time as a way of "rationing" our batch processing capability between customers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't need to have that "super-precise" reliability in my project either, but I think it may be useful for others.

I'd probably go with a single sequence (or auto-incremented column) for a counter but use it only in tandem with the createTime field in order to mitigate a potential "max-value to min-value" problem almost completely.
Need to brush up MySQL 5 capabilities for all of that though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've used a special sequence table for this in the past, to mimic the flexibility of oracle sequences.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amseager With the potential "max-value to min-value" problem, are you referring to a sequence wrapping around from its maximum value to its minimum value?

Map.of(
Dialect.POSTGRESQL_9,
"ALTER TABLE TXNO_OUTBOX ADD COLUMN createTime TIMESTAMP(6)")),
new Migration(
10,
"Add groupId column to outbox",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether partitionId or topicId might be more obvious terms for what this is doing?

topicId probably being my favourite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, no problem, although these 2 names are fit for kafka/MQs mostly - 99.9% chance that some of that is used with the outbox pattern, but it's theoretically possible to do smth else instead of sending messages (e.g. rest calls)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't mean to imply that groupIds are an SQS queue or Kafka topic, only that they are an analog of these, so there's probably no point adding another term to the mix.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Message groups is the term AWS SNS/SQS FIFOs use for the boundary within which ordering is ensured (within a queue or topic), so the “group” term makes sense at least for those users. For Kafka users “partition” would be the equivalent (once again within a topic). So both group or partition seem to be reasonable choices.

"ALTER TABLE TXNO_OUTBOX ADD COLUMN groupId VARCHAR(250)"));

static void migrate(TransactionManager transactionManager, @NotNull Dialect dialect) {
transactionManager.inTransaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
public class DefaultPersistor implements Persistor {

private static final String ALL_FIELDS =
"id, uniqueRequestId, invocation, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version";
"id, uniqueRequestId, invocation, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version, "
+ "createTime, groupId";

/**
* @param writeLockTimeoutSeconds How many seconds to wait before timing out on obtaining a write
Expand Down Expand Up @@ -93,7 +94,7 @@ public void migrate(TransactionManager transactionManager) {
public void save(Transaction tx, TransactionOutboxEntry entry)
throws SQLException, AlreadyScheduledException {
var insertSql =
"INSERT INTO " + tableName + " (" + ALL_FIELDS + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
"INSERT INTO " + tableName + " (" + ALL_FIELDS + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
var writer = new StringWriter();
serializer.serializeInvocation(entry.getInvocation(), writer);
if (entry.getUniqueRequestId() == null) {
Expand Down Expand Up @@ -133,6 +134,9 @@ private void setupInsert(
stmt.setBoolean(7, entry.isBlocked());
stmt.setBoolean(8, entry.isProcessed());
stmt.setInt(9, entry.getVersion());
stmt.setTimestamp(
10, entry.getCreateTime() == null ? null : Timestamp.from(entry.getCreateTime()));
stmt.setString(11, entry.getGroupId());
}

@Override
Expand Down Expand Up @@ -241,14 +245,53 @@ public List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, I
+ ALL_FIELDS
+ " FROM "
+ tableName
+ " WHERE nextAttemptTime < ? AND blocked = false AND processed = false LIMIT ?"
+ " WHERE nextAttemptTime < ? AND blocked = false AND processed = false and groupid is null LIMIT ?"
+ forUpdate)) {
stmt.setTimestamp(1, Timestamp.from(now));
stmt.setInt(2, batchSize);
return gatherResults(batchSize, stmt);
}
}

@Override
public List<TransactionOutboxEntry> selectBatchOrdered(
final Transaction tx, final int batchSize, final Instant now) throws Exception {
String forUpdate = dialect.isSupportsSkipLock() ? " FOR UPDATE SKIP LOCKED" : "";
try (PreparedStatement stmt =
tx.connection()
.prepareStatement(
dialect.isSupportsWindowFunctions()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly, supports window functions and CTEs. Worth having two flags here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just ditch the CTE and replace it with a subquery...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably yes, it's just happened that the 3 RDBMS which are used here and support WF also support CTE (I think the latter appeared to the SQL standard prior to the former)
about a subquery - IIRC I had some problems with that here (hence moved to CTE) but I'll try to do it again

? "WITH t AS"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you done an EXPLAIN on these queries on the different DBs and determined if we have suitable indexes?

I have a feeling not having groupId anywhere is going to make this pretty slow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of disagree about groupId because here we fetch all groups at once so the cardinality of this column is effectively 50/50 (filled or not), and I'm not sure will RDBMS ever use that index even after collecting proper statistics.

At the same time, I think we can add an index for processed (it's technically 50/50 too but obviously we'll always have much fewer unprocessed rows that processed ones while selecting, and also hints/statistics can help here).
We actually have a composite index for processed, blacklisted, nextAttemptTime already which should work for a single processed column in a where clause too (at least in PG, need to check it it other supported DBs)

+ " ("
+ " SELECT rank() over (PARTITION BY groupid ORDER BY createtime) AS rn, " + ALL_FIELDS
+ " FROM " + tableName
+ " WHERE processed = false"
+ " ) "
+ " SELECT " + ALL_FIELDS
+ " FROM t "
+ " WHERE rn = 1 AND nextAttemptTime < ? AND blocked = false AND groupid IS NOT null LIMIT ?"
+ forUpdate
:
"SELECT "
+ ALL_FIELDS
+ " FROM"
+ " ("
+ " SELECT groupid AS group_id, MIN(createtime) AS min_create_time"
+ " FROM " + tableName
+ " WHERE processed = false"
+ " GROUP BY group_id"
+ " ) AS t"
+ " JOIN " + tableName + " t1"
+ " ON t1.groupid = t.group_id AND t1.createtime = t.min_create_time"
+ " WHERE nextAttemptTime < ? AND blocked = false AND groupid IS NOT null LIMIT ?"
+ forUpdate
)) {
stmt.setTimestamp(1, Timestamp.from(now));
stmt.setInt(2, batchSize);
return gatherResults(batchSize, stmt);
}
}

@Override
public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now)
throws Exception {
Expand Down Expand Up @@ -289,6 +332,11 @@ private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOExceptio
.blocked(rs.getBoolean("blocked"))
.processed(rs.getBoolean("processed"))
.version(rs.getInt("version"))
.createTime(
rs.getTimestamp("createTime") == null
? null
: rs.getTimestamp("createTime").toInstant())
.groupId(rs.getString("groupId"))
.build();
log.debug("Found {}", entry);
return entry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
@Getter
@Beta
public enum Dialect {
MY_SQL_5(false, Constants.DEFAULT_DELETE_EXPIRED_STMT),
MY_SQL_8(true, Constants.DEFAULT_DELETE_EXPIRED_STMT),
MY_SQL_5(false, false, Constants.DEFAULT_DELETE_EXPIRED_STMT),
MY_SQL_8(true, true, Constants.DEFAULT_DELETE_EXPIRED_STMT),
POSTGRESQL_9(
true,
true,
"DELETE FROM {{table}} WHERE ctid IN (SELECT ctid FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?)"),
H2(false, Constants.DEFAULT_DELETE_EXPIRED_STMT);
H2(false, true, Constants.DEFAULT_DELETE_EXPIRED_STMT);

/**
* @return True if hot row support ({@code SKIP LOCKED}) is available, increasing performance when
Expand All @@ -28,6 +29,8 @@ public enum Dialect {
@SuppressWarnings("JavaDoc")
private final boolean supportsSkipLock;

private final boolean supportsWindowFunctions;

/** @return Format string for the SQL required to delete expired retained records. */
@SuppressWarnings("JavaDoc")
private final String deleteExpired;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,8 @@ static DefaultPersistor forDialect(Dialect dialect) {
List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, Instant now)
throws Exception;

List<TransactionOutboxEntry> selectBatchOrdered(Transaction tx, int batchSize, Instant now)
throws Exception;

int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ public List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, I
return List.of();
}

@Override
public List<TransactionOutboxEntry> selectBatchOrdered(
final Transaction tx, final int batchSize, final Instant now) {
return List.of();
}

@Override
public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ static TransactionOutboxBuilder builder() {
@SuppressWarnings("UnusedReturnValue")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc needs updating to reflect that grouped records are ignored and these should be handled by flushOrdered

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, they will be added (I've already written some of them but haven't pushed yet)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amseager Would you be able to push those changes?

boolean flush();

boolean flushOrdered();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushTopics() (if you agree with me on the naming)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also needs Javadoc


/**
* Unblocks a blocked entry and resets the attempt count so that it will be retried again.
* Requires an active transaction and a transaction manager that supports thread local context.
Expand Down Expand Up @@ -138,6 +140,7 @@ abstract class TransactionOutboxBuilder {
protected Boolean serializeMdc;
protected Duration retentionThreshold;
protected Boolean initializeImmediately;
protected Boolean submitImmediately;

protected TransactionOutboxBuilder() {}

Expand Down Expand Up @@ -284,6 +287,16 @@ public TransactionOutboxBuilder initializeImmediately(boolean initializeImmediat
return this;
}

/**
* @param submitImmediately If true, a task will be submitted immediately after scheduling.
* Defaults to true.
* @return Builder.
*/
public TransactionOutboxBuilder submitImmediately(boolean submitImmediately) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, you can do groupId(...) without submitImmediately(...) and it looks like the behaviour is unexpected; because the task is submitted immediately, it ignores ordering.

Can we set submitImmediately to true automatically and refuse to allow submitImmediately(false) in combination with a groupId?

Better still: is this option required at all? I can't see that it adds anything other than a delay, if you're not using a groupId. So why not just remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, added it by analogy to other settings initially but now I see that it's mostly useless

Copy link

@markushc markushc Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The submit immediately option might not be required to implement ordered processing. However, it could be useful if you want to separate transaction-processing compute from task-processing compute, i.e. only process tasks using flush from a separate JVM. This would also make the transaction-outbox project more suitable for use with serverless compute like AWS Lambda functions, where tasks cannot be processed in background threads.

this.submitImmediately = submitImmediately;
return this;
}

/**
* Creates and initialises the {@link TransactionOutbox}.
*
Expand All @@ -310,6 +323,10 @@ interface ParameterizedScheduleBuilder {
*/
ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId);

default ParameterizedScheduleBuilder groupId(String groupId) {
return this;
}

/**
* Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters
* to the request as configured using {@link TransactionOutbox#with()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ public class TransactionOutboxEntry {
@Setter
private int version;

@Getter private Instant createTime;

@Getter private String groupId;

@EqualsAndHashCode.Exclude @ToString.Exclude private volatile boolean initialized;
@EqualsAndHashCode.Exclude @ToString.Exclude private String description;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class TransactionOutboxImpl implements TransactionOutbox {
private final boolean serializeMdc;
private final Validator validator;
@NotNull private final Duration retentionThreshold;
@NotNull private final boolean submitImmediately;
private final AtomicBoolean initialized = new AtomicBoolean();
private final ProxyFactory proxyFactory = new ProxyFactory();

Expand All @@ -62,7 +63,8 @@ private TransactionOutboxImpl(
Level logLevelTemporaryFailure,
Boolean serializeMdc,
Duration retentionThreshold,
Boolean initializeImmediately) {
Boolean initializeImmediately,
Boolean submitImmediately) {
this.transactionManager = transactionManager;
this.instantiator = Utils.firstNonNull(instantiator, Instantiator::usingReflection);
this.persistor = persistor;
Expand All @@ -76,6 +78,7 @@ private TransactionOutboxImpl(
this.validator = new Validator(this.clockProvider);
this.serializeMdc = serializeMdc == null || serializeMdc;
this.retentionThreshold = retentionThreshold == null ? Duration.ofDays(7) : retentionThreshold;
this.submitImmediately = submitImmediately == null || submitImmediately;
this.validator.validate(this);
if (initializeImmediately == null || initializeImmediately) {
initialize();
Expand All @@ -100,7 +103,7 @@ public void initialize() {

@Override
public <T> T schedule(Class<T> clazz) {
return schedule(clazz, null);
return schedule(clazz, null, null);
}

@Override
Expand All @@ -111,23 +114,34 @@ public ParameterizedScheduleBuilder with() {
@SuppressWarnings("UnusedReturnValue")
@Override
public boolean flush() {
return flush(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just do both flush types in the one call to flush()?

We just want to clear out all work. Why complicate it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to use separate TransactionOutbox instances for flushing ordered and non-ordered tasks because the scheduler params like a delay between iterations etc. (and flush frequency itself) are planned to be much fewer in the 1st case than in the 2nd. What do you think about this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I'll ponder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

  • flush() - does everything
  • flushUnordered()
  • flushAllTopics()
  • flushTopics(String...)

At least this way flush() continues to do everything, so if someone doesn't change their existing scheduled jobs, everything continues to work. If someone wants more granularity, they have it.

}

@Override
public boolean flushOrdered() {
return flush(true);
}

private boolean flush(boolean isOrdered) {
if (!initialized.get()) {
throw new IllegalStateException("Not initialized");
}
Instant now = clockProvider.getClock().instant();
List<TransactionOutboxEntry> batch = flush(now);
List<TransactionOutboxEntry> batch = flush(now, isOrdered);
expireIdempotencyProtection(now);
return !batch.isEmpty();
}

private List<TransactionOutboxEntry> flush(Instant now) {
private List<TransactionOutboxEntry> flush(Instant now, boolean isOrdered) {
log.debug("Flushing stale tasks");
var batch =
transactionManager.inTransactionReturns(
transaction -> {
List<TransactionOutboxEntry> result = new ArrayList<>(flushBatchSize);
uncheckedly(() -> persistor.selectBatch(transaction, flushBatchSize, now))
.forEach(
uncheckedly(() -> isOrdered ?
Copy link
Member

@badgerwithagun badgerwithagun Feb 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tasks will run in parallel at the moment, even if you submit them in order.

I think I'd use CompletableFuture chaining to put each task on the end of a future chain for the groupId. You just need to hold onto the last CompletableFuture you submitted for each groupId, until the task completes:

ConcurrentMap<String, CompletableFuture> tasks;

Each task can remove itself from the map when it completes, so it doesn't grow.

Copy link
Contributor Author

@amseager amseager Feb 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't get about the concurrency problem tbh, could you elaborate pls?
Here, we have only 1st task per group selected in a single flush iteration, and it seems it should be impossible to select the 2nd one before completing the 1st because of processed condition (but probably I don't see smth).
So I thought the problem can occur only in a multi-instance environment which could be solved as it did in the existing (batched) select (with locks etc.)

About future chains - is it for a solution when we select all tasks of each group at one call instead? I'm going to experiment with this approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so the idea is that you select all tasks for all groups, but queue up each group separately, something like this:

ConcurrentMap<String, CompletableFuture<Void>> tasks = new ConcurrentMap<>();
...
persistor.selectBatchOrdered(transaction, flushBatchSize, now)
  .forEach(entry ->
    tasks.compute(
      entry.groupId(),
      (groupId, future) -> future == null
        ? CompletableFuture.runAsync(() -> processNow(entry), executor)
        : future.thenRunAsync(() -> processNow(entry), executor));

This ignores the presence of Submitter, which complicates things slightly, but it demonstrates my thinking/

The result of this is a chain of CompletableFutures for each group, resulting in each task in a group running after the last task in the group completes.

persistor.selectBatchOrdered(transaction, flushBatchSize, now) :
persistor.selectBatch(transaction, flushBatchSize, now)
).forEach(
entry -> {
log.debug("Reprocessing {}", entry.description());
try {
Expand Down Expand Up @@ -214,7 +228,7 @@ public boolean unblock(String entryId, Object transactionContext) {
}
}

private <T> T schedule(Class<T> clazz, String uniqueRequestId) {
private <T> T schedule(Class<T> clazz, String uniqueRequestId, String groupId) {
if (!initialized.get()) {
throw new IllegalStateException("Not initialized");
}
Expand All @@ -230,16 +244,19 @@ private <T> T schedule(Class<T> clazz, String uniqueRequestId) {
extracted.getMethodName(),
extracted.getParameters(),
extracted.getArgs(),
uniqueRequestId);
uniqueRequestId,
groupId);
validator.validate(entry);
persistor.save(extracted.getTransaction(), entry);
extracted
.getTransaction()
.addPostCommitHook(
() -> {
listener.scheduled(entry);
submitNow(entry);
});
if (submitImmediately) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (groupId != null)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed above, I'll remove submitImmediately and replace it with a groupId != null check

extracted
.getTransaction()
.addPostCommitHook(
() -> {
listener.scheduled(entry);
submitNow(entry);
});
}
log.debug(
"Scheduled {} for running after transaction commit", entry.description());
return null;
Expand Down Expand Up @@ -298,7 +315,7 @@ private void invoke(TransactionOutboxEntry entry, Transaction transaction)
}

private TransactionOutboxEntry newEntry(
Class<?> clazz, String methodName, Class<?>[] params, Object[] args, String uniqueRequestId) {
Class<?> clazz, String methodName, Class<?>[] params, Object[] args, String uniqueRequestId, String groupId) {
return TransactionOutboxEntry.builder()
.id(UUID.randomUUID().toString())
.invocation(
Expand All @@ -311,6 +328,8 @@ private TransactionOutboxEntry newEntry(
.lastAttemptTime(null)
.nextAttemptTime(after(attemptFrequency))
.uniqueRequestId(uniqueRequestId)
.createTime(clockProvider.getClock().instant())
.groupId(groupId)
.build();
}

Expand Down Expand Up @@ -388,7 +407,8 @@ public TransactionOutboxImpl build() {
logLevelTemporaryFailure,
serializeMdc,
retentionThreshold,
initializeImmediately);
initializeImmediately,
submitImmediately);
}
}

Expand All @@ -397,16 +417,25 @@ private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleB
@Length(max = 250)
private String uniqueRequestId;

@Length(max = 250)
private String groupId;

@Override
public ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId) {
this.uniqueRequestId = uniqueRequestId;
return this;
}

@Override
public ParameterizedScheduleBuilder groupId(final String groupId) {
this.groupId = groupId;
return this;
}

@Override
public <T> T schedule(Class<T> clazz) {
validator.validate(this);
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId);
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, groupId);
}
}
}