Skip to content

Commit

Permalink
Final updates to the MSSqlServer implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
wynan committed May 15, 2024
1 parent 29e4306 commit 4a33f69
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ static Builder builder(String name) {

@Getter private final String name;
@Getter private final String deleteExpired;
@Getter private final String delete;
@Getter private final String selectBatch;
@Getter private final String lock;
@Getter private final String checkSql;
Expand Down Expand Up @@ -58,6 +59,7 @@ public Stream<Migration> getMigrations() {
@Accessors(fluent = true)
static final class Builder {
private final String name;
private String delete = "DELETE FROM {{table}} WHERE id = ? and version = ?";
private String deleteExpired =
"DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false"
+ " LIMIT {{batchSize}}";
Expand Down Expand Up @@ -175,6 +177,7 @@ Dialect build() {
return new DefaultDialect(
name,
deleteExpired,
delete,
selectBatch,
lock,
checkSql,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ private void setNextSequence(Transaction tx, TransactionOutboxEntry entry) throw
}

private boolean indexViolation(Exception e) {
System.out.println(e);
return (e instanceof SQLIntegrityConstraintViolationException)
|| (e.getClass().getName().equals("org.postgresql.util.PSQLException")
&& e.getMessage().contains("constraint"))
Expand Down Expand Up @@ -208,9 +207,7 @@ private void setupInsert(
public void delete(Transaction tx, TransactionOutboxEntry entry) throws Exception {
//noinspection resource
try (PreparedStatement stmt =
// language=MySQL
tx.connection()
.prepareStatement("DELETE FROM " + tableName + " WHERE id = ? and version = ?")) {
tx.connection().prepareStatement(dialect.getDelete().replace("{{table}}", tableName))) {
stmt.setString(1, entry.getId());
stmt.setInt(2, entry.getVersion());
if (stmt.executeUpdate() != 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

/** The SQL dialects supported by {@link DefaultPersistor}. */
public interface Dialect {
String getDelete();

/**
* @return Format string for the SQL required to delete expired retained records.
*/
Expand Down Expand Up @@ -140,6 +142,7 @@ public interface Dialect {
"SELECT TOP ({{batchSize}}) {{allFields}} FROM {{table}} "
+ "WITH (UPDLOCK, ROWLOCK, READPAST) WHERE nextAttemptTime < ? AND topic = '*' "
+ "AND blocked = 0 AND processed = 0")
.delete("DELETE FROM {{table}} WITH (ROWLOCK, READPAST) WHERE id = ? and version = ?")
.deleteExpired(
"DELETE TOP ({{batchSize}}) FROM {{table}} "
+ "WHERE nextAttemptTime < ? AND processed = 1 AND blocked = 0")
Expand All @@ -150,6 +153,8 @@ public interface Dialect {
+ " AND seq = ("
+ "SELECT MIN(seq) FROM {{table}} b WHERE b.topic=a.topic AND b.processed = 0"
+ ")")
.fetchNextSequence(
"SELECT seq FROM TXNO_SEQUENCE WITH (UPDLOCK, ROWLOCK, READPAST) WHERE topic = ?")
.booleanValueFrom(v -> v ? "1" : "0")
.changeMigration(
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down

0 comments on commit 4a33f69

Please sign in to comment.