Skip to content

Commit

Permalink
Merge pull request #605 from gruelbox/ordering-refinements
Browse files Browse the repository at this point in the history
Various cleanups to sequenced processing
  • Loading branch information
badgerwithagun authored Apr 5, 2024
2 parents 4650924 + 91e036f commit d33a29c
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ static Builder builder(String name) {
@Getter private final String selectBatch;
@Getter private final String lock;
@Getter private final String checkSql;
@Getter private final String fetchAndLockNextInTopic;
@Getter private final String fetchNextInAllTopics;
private final Collection<Migration> migrations;

@Override
Expand Down Expand Up @@ -68,9 +68,12 @@ static final class Builder {
private Map<Integer, Migration> migrations;
private Function<Boolean, String> booleanValueFrom;
private SQLAction createVersionTableBy;
private String fetchAndLockNextInTopic =
"SELECT {{allFields}} FROM {{table}} "
+ "WHERE topic = ? AND processed = false ORDER BY seq ASC LIMIT 1 FOR UPDATE";
private String fetchNextInAllTopics =
"SELECT {{allFields}} FROM {{table}} a"
+ " WHERE processed = false AND topic <> '*' AND nextAttemptTime < ?"
+ " AND seq = ("
+ "SELECT MIN(seq) FROM {{table}} b WHERE b.topic=a.topic AND b.processed = false"
+ ") LIMIT {{batchSize}}";

Builder(String name) {
this.name = name;
Expand Down Expand Up @@ -171,7 +174,7 @@ Dialect build() {
selectBatch,
lock,
checkSql,
fetchAndLockNextInTopic,
fetchNextInAllTopics,
migrations.values()) {
@Override
public String booleanValue(boolean criteriaValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,37 +315,20 @@ public List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, I
}

@Override
public List<String> selectActiveTopics(Transaction tx) throws Exception {
var sql = "SELECT DISTINCT topic FROM %s WHERE topic <> '*' AND processed = %s";
String falseStr = dialect.booleanValue(false);
public Collection<TransactionOutboxEntry> selectNextInTopics(
Transaction tx, int batchSize, Instant now) throws Exception {
var sql =
dialect
.getFetchNextInAllTopics()
.replace("{{table}}", tableName)
.replace("{{batchSize}}", Integer.toString(batchSize))
.replace("{{allFields}}", ALL_FIELDS);
//noinspection resource
try (PreparedStatement stmt =
tx.connection().prepareStatement(String.format(sql, tableName, falseStr, falseStr))) {
var result = new ArrayList<String>();
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
result.add(rs.getString(1));
}
}
return result;
}
}

@Override
public Optional<TransactionOutboxEntry> nextInTopic(Transaction tx, String topic)
throws Exception {
//noinspection resource
try (PreparedStatement stmt =
tx.connection()
.prepareStatement(
dialect
.getFetchAndLockNextInTopic()
.replace("{{table}}", tableName)
.replace("{{allFields}}", ALL_FIELDS))) {
stmt.setString(1, topic);
var results = new ArrayList<TransactionOutboxEntry>(1);
try (PreparedStatement stmt = tx.connection().prepareStatement(sql)) {
stmt.setTimestamp(1, Timestamp.from(now));
var results = new ArrayList<TransactionOutboxEntry>();
gatherResults(stmt, results);
return results.stream().findFirst();
return results;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface Dialect {

String getCheckSql();

String getFetchAndLockNextInTopic();
String getFetchNextInAllTopics();

String booleanValue(boolean criteriaValue);

Expand All @@ -29,10 +29,10 @@ public interface Dialect {
Dialect MY_SQL_5 = DefaultDialect.builder("MY_SQL_5").build();
Dialect MY_SQL_8 =
DefaultDialect.builder("MY_SQL_8")
.fetchAndLockNextInTopic(
"SELECT {{allFields}} FROM {{table}} "
+ "WHERE topic = ? AND processed = false ORDER BY seq ASC LIMIT 1 FOR "
+ "UPDATE")
.fetchNextInAllTopics(
"WITH raw AS(SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn"
+ " FROM {{table}} WHERE processed = false AND topic <> '*')"
+ " SELECT * FROM raw WHERE rn = 1 AND nextAttemptTime < ? LIMIT {{batchSize}}")
.deleteExpired(
"DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false"
+ " LIMIT {{batchSize}}")
Expand All @@ -46,10 +46,10 @@ public interface Dialect {
.build();
Dialect POSTGRESQL_9 =
DefaultDialect.builder("POSTGRESQL_9")
.fetchAndLockNextInTopic(
"SELECT {{allFields}} FROM {{table}} "
+ "WHERE topic = ? AND processed = false ORDER BY seq ASC LIMIT 1 FOR "
+ "UPDATE")
.fetchNextInAllTopics(
"WITH raw AS(SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn"
+ " FROM {{table}} WHERE processed = false AND topic <> '*')"
+ " SELECT * FROM raw WHERE rn = 1 AND nextAttemptTime < ? LIMIT {{batchSize}}")
.deleteExpired(
"DELETE FROM {{table}} WHERE id IN "
+ "(SELECT id FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT {{batchSize}})")
Expand All @@ -75,13 +75,10 @@ public interface Dialect {
.build();
Dialect ORACLE =
DefaultDialect.builder("ORACLE")
.fetchAndLockNextInTopic(
"SELECT {{allFields}} FROM {{table}} outer"
+ " WHERE outer.topic = ?"
+ " AND outer.processed = 0"
+ " AND outer.seq = ("
+ "SELECT MIN(seq) FROM {{table}} inner WHERE inner.topic=outer.topic AND inner"
+ ".processed = 0) FOR UPDATE")
.fetchNextInAllTopics(
"WITH cte1 AS (SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn"
+ " FROM {{table}} WHERE processed = 0 AND topic <> '*')"
+ " SELECT * FROM cte1 WHERE rn = 1 AND nextAttemptTime < ? AND ROWNUM <= {{batchSize}}")
.deleteExpired(
"DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = 1 AND blocked = 0 "
+ "AND ROWNUM <= {{batchSize}}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.gruelbox.transactionoutbox;

import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Optional;

/**
* Saves and loads {@link TransactionOutboxEntry}s. For most use cases, just use {@link
Expand Down Expand Up @@ -107,29 +107,22 @@ List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, Instant
throws Exception;

/**
* Selects the list of topics with work awaiting processing.
* Selects the next items in all the open topics as a batch for processing. Does not lock.
*
* @param tx The current {@link Transaction}.
* @return The topics.
* @param batchSize The maximum number of records to select.
* @param now The time to use when selecting records.
* @return The records.
* @throws Exception Any exception.
*/
List<String> selectActiveTopics(final Transaction tx) throws Exception;

/**
* Fetches and locks the next available piece of work on the specified topic.
*
* @param tx The current {@link Transaction}.
* @param topic The topic.
* @return The next available piece of work on the selected topic.
* @throws Exception ANy exception.
*/
Optional<TransactionOutboxEntry> nextInTopic(Transaction tx, String topic) throws Exception;
Collection<TransactionOutboxEntry> selectNextInTopics(Transaction tx, int batchSize, Instant now)
throws Exception;

/**
* Deletes records which have processed and passed their expiry time, in specified batch sizes.
*
* @param tx The current {@link Transaction}.
* @param batchSize The number of records to select.
* @param batchSize The maximum number of records to select.
* @param now The time to use when selecting records.
* @return The number of records deleted.
* @throws Exception Any exception.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.gruelbox.transactionoutbox;

import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import lombok.Builder;

/** Stub implementation of {@link Persistor}. */
Expand Down Expand Up @@ -47,15 +47,11 @@ public List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, I
}

@Override
public List<String> selectActiveTopics(Transaction tx) {
public Collection<TransactionOutboxEntry> selectNextInTopics(
Transaction tx, int flushBatchSize, Instant now) {
return List.of();
}

@Override
public Optional<TransactionOutboxEntry> nextInTopic(Transaction tx, String topic) {
return Optional.empty();
}

@Override
public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) {
return 0;
Expand Down
Loading

0 comments on commit d33a29c

Please sign in to comment.