diff --git a/transactionoutbox-virtthreads/pom.xml b/transactionoutbox-virtthreads/pom.xml
index 507b0f69..1160527d 100644
--- a/transactionoutbox-virtthreads/pom.xml
+++ b/transactionoutbox-virtthreads/pom.xml
@@ -42,6 +42,12 @@
${project.version}
test
+
+ me.escoffier.loom
+ loom-unit
+ 0.3.0
+ test
+
org.testcontainers
testcontainers
diff --git a/transactionoutbox-virtthreads/src/test/java/com/gruelbox/transactionoutbox/virtthreads/AbstractVirtualThreadsTest.java b/transactionoutbox-virtthreads/src/test/java/com/gruelbox/transactionoutbox/virtthreads/AbstractVirtualThreadsTest.java
index 4312a1d8..2208563f 100644
--- a/transactionoutbox-virtthreads/src/test/java/com/gruelbox/transactionoutbox/virtthreads/AbstractVirtualThreadsTest.java
+++ b/transactionoutbox-virtthreads/src/test/java/com/gruelbox/transactionoutbox/virtthreads/AbstractVirtualThreadsTest.java
@@ -9,43 +9,56 @@
import com.gruelbox.transactionoutbox.testing.BaseTest;
import com.gruelbox.transactionoutbox.testing.InterfaceProcessor;
import java.time.Duration;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
+import me.escoffier.loom.loomunit.LoomUnitExtension;
+import me.escoffier.loom.loomunit.ShouldNotPin;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
@Slf4j
+@ExtendWith(LoomUnitExtension.class)
+@ShouldNotPin
abstract class AbstractVirtualThreadsTest extends BaseTest {
+ private static final String VIRTUAL_THREAD_SCHEDULER_PARALLELISM =
+ "jdk.virtualThreadScheduler.parallelism";
+
@Test
final void highVolumeVirtualThreads() throws Exception {
- var count = 10;
+ var count = 100;
var latch = new CountDownLatch(count * 10);
var transactionManager = txManager();
var results = new ConcurrentHashMap();
var duplicates = new ConcurrentHashMap();
- try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
- var outbox =
- TransactionOutbox.builder()
- .transactionManager(transactionManager)
- .persistor(Persistor.forDialect(connectionDetails().dialect()))
- .instantiator(Instantiator.using(clazz -> (InterfaceProcessor) (foo, bar) -> {}))
- .submitter(Submitter.withExecutor(executor))
- .attemptFrequency(Duration.ofMillis(500))
- .flushBatchSize(1000)
- .listener(
- new TransactionOutboxListener() {
- @Override
- public void success(TransactionOutboxEntry entry) {
- Integer i = (Integer) entry.getInvocation().getArgs()[0];
- if (results.putIfAbsent(i, i) != null) {
- duplicates.put(i, i);
- }
- latch.countDown();
+ var outbox =
+ TransactionOutbox.builder()
+ .transactionManager(transactionManager)
+ .persistor(Persistor.forDialect(connectionDetails().dialect()))
+ .instantiator(Instantiator.using(clazz -> (InterfaceProcessor) (foo, bar) -> {}))
+ .submitter(Submitter.withExecutor(Thread::startVirtualThread))
+ .attemptFrequency(Duration.ofMillis(500))
+ .flushBatchSize(1000)
+ .listener(
+ new TransactionOutboxListener() {
+ @Override
+ public void success(TransactionOutboxEntry entry) {
+ Integer i = (Integer) entry.getInvocation().getArgs()[0];
+ if (results.putIfAbsent(i, i) != null) {
+ duplicates.put(i, i);
}
- })
- .build();
+ latch.countDown();
+ }
+ })
+ .build();
+ var parallelism = System.getProperty(VIRTUAL_THREAD_SCHEDULER_PARALLELISM);
+ System.setProperty(VIRTUAL_THREAD_SCHEDULER_PARALLELISM, "1");
+ try {
withRunningFlusher(
outbox,
() -> {
@@ -53,7 +66,7 @@ public void success(TransactionOutboxEntry entry) {
IntStream.range(0, count)
.mapToObj(
i ->
- CompletableFuture.runAsync(
+ new FutureTask(
() ->
transactionManager.inTransaction(
() -> {
@@ -63,18 +76,27 @@ public void success(TransactionOutboxEntry entry) {
.process(i * 10 + j, "Whee");
}
}),
- executor))
- .toArray(CompletableFuture[]::new);
- CompletableFuture.allOf(futures).join();
+ null))
+ .toList();
+ futures.forEach(Thread::startVirtualThread);
+ for (var future : futures) {
+ future.get();
+ }
assertTrue(latch.await(30, TimeUnit.SECONDS), "Latch not opened in time");
});
-
- assertThat(
- "Should never get duplicates running to full completion", duplicates.keySet(), empty());
- assertThat(
- "Only got: " + results.keySet(),
- results.keySet(),
- containsInAnyOrder(IntStream.range(0, count * 10).boxed().toArray()));
+ } finally {
+ if (parallelism == null) {
+ System.clearProperty(VIRTUAL_THREAD_SCHEDULER_PARALLELISM);
+ } else {
+ System.setProperty(VIRTUAL_THREAD_SCHEDULER_PARALLELISM, parallelism);
+ }
}
+
+ assertThat(
+ "Should never get duplicates running to full completion", duplicates.keySet(), empty());
+ assertThat(
+ "Only got: " + results.keySet(),
+ results.keySet(),
+ containsInAnyOrder(IntStream.range(0, count * 10).boxed().toArray()));
}
}