Skip to content

Commit

Permalink
Merge pull request #547 from gruelbox/test-virtual-threads
Browse files Browse the repository at this point in the history
Add testing that things seem to generally work OK with virtual threads
  • Loading branch information
badgerwithagun authored Mar 11, 2024
2 parents f3cdee5 + 9d96f86 commit 8f24998
Show file tree
Hide file tree
Showing 15 changed files with 519 additions and 117 deletions.
15 changes: 12 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -286,6 +286,15 @@
<module>transactionoutbox-spring</module>
</modules>
</profile>
<profile>
<id>java-21-modules</id>
<activation>
<jdk>[21,)</jdk>
</activation>
<modules>
<module>transactionoutbox-virtthreads</module>
</modules>
</profile>
<profile>
<id>release</id>
<properties>
Expand Down
4 changes: 2 additions & 2 deletions transactionoutbox-acceptance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
<artifactId>ojdbc11</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
Expand Down
4 changes: 2 additions & 2 deletions transactionoutbox-jooq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
<artifactId>ojdbc11</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.gruelbox.transactionoutbox.testing;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.junit.jupiter.api.Assertions.*;

import com.gruelbox.transactionoutbox.*;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -22,40 +23,22 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import lombok.Builder;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
public abstract class AbstractAcceptanceTest {
public abstract class AbstractAcceptanceTest extends BaseTest {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAcceptanceTest.class);
private final ExecutorService unreliablePool =
new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(16));

private static final Random random = new Random();
protected HikariDataSource dataSource;

@BeforeEach
final void baseBeforeEach() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(connectionDetails().url());
config.setUsername(connectionDetails().user());
config.setPassword(connectionDetails().password());
config.addDataSourceProperty("cachePrepStmts", "true");
dataSource = new HikariDataSource(config);
}

@AfterEach
final void baseAfterEach() {
dataSource.close();
}

/**
* Uses a simple direct transaction manager and connection manager and attempts to fire an
Expand Down Expand Up @@ -107,15 +90,15 @@ public void success(TransactionOutboxEntry entry) {
outbox.schedule(InterfaceProcessor.class).process(3, "Whee");
try {
// Should not be fired until after commit
assertFalse(latch.await(2, TimeUnit.SECONDS));
assertFalse(latch.await(2, SECONDS));
} catch (InterruptedException e) {
fail("Interrupted");
}
});

// Should be fired after commit
assertTrue(chainedLatch.await(2, TimeUnit.SECONDS));
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertTrue(chainedLatch.await(2, SECONDS));
assertTrue(latch.await(1, SECONDS));
assertTrue(gotScheduled.get());
}

Expand Down Expand Up @@ -244,7 +227,7 @@ public void success(TransactionOutboxEntry entry) {
.schedule(ClassProcessor.class)
.process("6"));

MatcherAssert.assertThat(ids, containsInAnyOrder("1", "2", "4", "6"));
assertThat(ids, containsInAnyOrder("1", "2", "4", "6"));
}

/**
Expand Down Expand Up @@ -272,7 +255,7 @@ final void dataSourceConnectionProviderReflectionInstantiatorConcreteClass()

transactionManager.inTransaction(() -> outbox.schedule(ClassProcessor.class).process(myId));

assertTrue(latch.await(2, TimeUnit.SECONDS));
assertTrue(latch.await(2, SECONDS));
assertEquals(List.of(myId), ClassProcessor.PROCESSED);
}
}
Expand Down Expand Up @@ -364,7 +347,7 @@ public <T, E extends Exception> T requireTransactionReturns(
}
postCommitHooks.forEach(Runnable::run);

assertTrue(latch.await(2, TimeUnit.SECONDS));
assertTrue(latch.await(2, SECONDS));
assertEquals(List.of(myId), ClassProcessor.PROCESSED);
}
}
Expand Down Expand Up @@ -395,7 +378,7 @@ final void retryBehaviour() throws Exception {
() -> {
transactionManager.inTransaction(
() -> outbox.schedule(InterfaceProcessor.class).process(3, "Whee"));
assertTrue(latch.await(15, TimeUnit.SECONDS));
assertTrue(latch.await(15, SECONDS));
});
}

Expand Down Expand Up @@ -467,12 +450,12 @@ final void lastAttemptTime_updatesEveryTime() throws Exception {
() -> {
transactionManager.inTransaction(
() -> outbox.schedule(InterfaceProcessor.class).process(3, "Whee"));
assertTrue(blockLatch.await(10, TimeUnit.SECONDS));
assertTrue(blockLatch.await(10, SECONDS));
assertTrue(
(Boolean)
transactionManager.inTransactionReturns(
tx -> outbox.unblock(orderedEntryListener.getBlocked().getId())));
assertTrue(successLatch.await(10, TimeUnit.SECONDS));
assertTrue(successLatch.await(10, SECONDS));
var orderedEntryEvents = orderedEntryListener.getOrderedEntries();
log.info("The entry life cycle is: {}", orderedEntryEvents);

Expand Down Expand Up @@ -521,12 +504,12 @@ final void blockAndThenUnblockForRetry() throws Exception {
() -> {
transactionManager.inTransaction(
() -> outbox.schedule(InterfaceProcessor.class).process(3, "Whee"));
assertTrue(blockLatch.await(3, TimeUnit.SECONDS));
assertTrue(blockLatch.await(3, SECONDS));
assertTrue(
(Boolean)
transactionManager.inTransactionReturns(
tx -> outbox.unblock(latchListener.getBlocked().getId())));
assertTrue(successLatch.await(3, TimeUnit.SECONDS));
assertTrue(successLatch.await(3, SECONDS));
});
}

Expand Down Expand Up @@ -574,79 +557,17 @@ public void success(TransactionOutboxEntry entry) {
outbox.schedule(InterfaceProcessor.class).process(i * 10 + j, "Whee");
}
}));
assertTrue(latch.await(30, TimeUnit.SECONDS), "Latch not opened in time");
assertTrue(latch.await(30, SECONDS), "Latch not opened in time");
});

MatcherAssert.assertThat(
assertThat(
"Should never get duplicates running to full completion", duplicates.keySet(), empty());
MatcherAssert.assertThat(
assertThat(
"Only got: " + results.keySet(),
results.keySet(),
containsInAnyOrder(IntStream.range(0, count * 10).boxed().toArray()));
}

protected ConnectionDetails connectionDetails() {
return ConnectionDetails.builder()
.dialect(Dialect.H2)
.driverClassName("org.h2.Driver")
.url(
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DEFAULT_LOCK_TIMEOUT=60000;LOB_TIMEOUT=2000;MV_STORE=TRUE;DATABASE_TO_UPPER=FALSE")
.user("test")
.password("test")
.build();
}

protected TransactionManager txManager() {
return TransactionManager.fromDataSource(dataSource);
}

protected Persistor persistor() {
return Persistor.forDialect(connectionDetails().dialect());
}

protected void clearOutbox() {
DefaultPersistor persistor = Persistor.forDialect(connectionDetails().dialect());
TransactionManager transactionManager = txManager();
transactionManager.inTransaction(
tx -> {
try {
persistor.clear(tx);
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
}

protected void withRunningFlusher(TransactionOutbox outbox, ThrowingRunnable runnable)
throws Exception {
Thread backgroundThread =
new Thread(
() -> {
while (!Thread.interrupted()) {
try {
// Keep flushing work until there's nothing left to flush
//noinspection StatementWithEmptyBody
while (outbox.flush()) {}
} catch (Exception e) {
log.error("Error flushing transaction outbox. Pausing", e);
}
try {
//noinspection BusyWait
Thread.sleep(250);
} catch (InterruptedException e) {
break;
}
}
});
backgroundThread.start();
try {
runnable.run();
} finally {
backgroundThread.interrupt();
backgroundThread.join();
}
}

private static class FailingInstantiator implements Instantiator {

private final AtomicInteger attempts;
Expand Down Expand Up @@ -699,15 +620,4 @@ public Object getInstance(String name) {
}
}
}

@Value
@Accessors(fluent = true)
@Builder
public static class ConnectionDetails {
String driverClassName;
String url;
String user;
String password;
Dialect dialect;
}
}
Loading

0 comments on commit 8f24998

Please sign in to comment.