From 6d4152b45df5933d044e8ee018a3527782760744 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Tue, 26 Dec 2023 18:11:58 +0000 Subject: [PATCH] Force and detect thread pinning --- transactionoutbox-virtthreads/pom.xml | 6 ++ .../AbstractVirtualThreadsTest.java | 88 ++++++++++++------- 2 files changed, 61 insertions(+), 33 deletions(-) diff --git a/transactionoutbox-virtthreads/pom.xml b/transactionoutbox-virtthreads/pom.xml index 507b0f69..1160527d 100644 --- a/transactionoutbox-virtthreads/pom.xml +++ b/transactionoutbox-virtthreads/pom.xml @@ -42,6 +42,12 @@ ${project.version} test + + me.escoffier.loom + loom-unit + 0.3.0 + test + org.testcontainers testcontainers diff --git a/transactionoutbox-virtthreads/src/test/java/com/gruelbox/transactionoutbox/virtthreads/AbstractVirtualThreadsTest.java b/transactionoutbox-virtthreads/src/test/java/com/gruelbox/transactionoutbox/virtthreads/AbstractVirtualThreadsTest.java index 4312a1d8..2208563f 100644 --- a/transactionoutbox-virtthreads/src/test/java/com/gruelbox/transactionoutbox/virtthreads/AbstractVirtualThreadsTest.java +++ b/transactionoutbox-virtthreads/src/test/java/com/gruelbox/transactionoutbox/virtthreads/AbstractVirtualThreadsTest.java @@ -9,43 +9,56 @@ import com.gruelbox.transactionoutbox.testing.BaseTest; import com.gruelbox.transactionoutbox.testing.InterfaceProcessor; import java.time.Duration; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import lombok.extern.slf4j.Slf4j; +import me.escoffier.loom.loomunit.LoomUnitExtension; +import me.escoffier.loom.loomunit.ShouldNotPin; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; @Slf4j +@ExtendWith(LoomUnitExtension.class) +@ShouldNotPin abstract class AbstractVirtualThreadsTest extends BaseTest { + private static final String VIRTUAL_THREAD_SCHEDULER_PARALLELISM = + "jdk.virtualThreadScheduler.parallelism"; + @Test final void highVolumeVirtualThreads() throws Exception { - var count = 10; + var count = 100; var latch = new CountDownLatch(count * 10); var transactionManager = txManager(); var results = new ConcurrentHashMap(); var duplicates = new ConcurrentHashMap(); - try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { - var outbox = - TransactionOutbox.builder() - .transactionManager(transactionManager) - .persistor(Persistor.forDialect(connectionDetails().dialect())) - .instantiator(Instantiator.using(clazz -> (InterfaceProcessor) (foo, bar) -> {})) - .submitter(Submitter.withExecutor(executor)) - .attemptFrequency(Duration.ofMillis(500)) - .flushBatchSize(1000) - .listener( - new TransactionOutboxListener() { - @Override - public void success(TransactionOutboxEntry entry) { - Integer i = (Integer) entry.getInvocation().getArgs()[0]; - if (results.putIfAbsent(i, i) != null) { - duplicates.put(i, i); - } - latch.countDown(); + var outbox = + TransactionOutbox.builder() + .transactionManager(transactionManager) + .persistor(Persistor.forDialect(connectionDetails().dialect())) + .instantiator(Instantiator.using(clazz -> (InterfaceProcessor) (foo, bar) -> {})) + .submitter(Submitter.withExecutor(Thread::startVirtualThread)) + .attemptFrequency(Duration.ofMillis(500)) + .flushBatchSize(1000) + .listener( + new TransactionOutboxListener() { + @Override + public void success(TransactionOutboxEntry entry) { + Integer i = (Integer) entry.getInvocation().getArgs()[0]; + if (results.putIfAbsent(i, i) != null) { + duplicates.put(i, i); } - }) - .build(); + latch.countDown(); + } + }) + .build(); + var parallelism = System.getProperty(VIRTUAL_THREAD_SCHEDULER_PARALLELISM); + System.setProperty(VIRTUAL_THREAD_SCHEDULER_PARALLELISM, "1"); + try { withRunningFlusher( outbox, () -> { @@ -53,7 +66,7 @@ public void success(TransactionOutboxEntry entry) { IntStream.range(0, count) .mapToObj( i -> - CompletableFuture.runAsync( + new FutureTask( () -> transactionManager.inTransaction( () -> { @@ -63,18 +76,27 @@ public void success(TransactionOutboxEntry entry) { .process(i * 10 + j, "Whee"); } }), - executor)) - .toArray(CompletableFuture[]::new); - CompletableFuture.allOf(futures).join(); + null)) + .toList(); + futures.forEach(Thread::startVirtualThread); + for (var future : futures) { + future.get(); + } assertTrue(latch.await(30, TimeUnit.SECONDS), "Latch not opened in time"); }); - - assertThat( - "Should never get duplicates running to full completion", duplicates.keySet(), empty()); - assertThat( - "Only got: " + results.keySet(), - results.keySet(), - containsInAnyOrder(IntStream.range(0, count * 10).boxed().toArray())); + } finally { + if (parallelism == null) { + System.clearProperty(VIRTUAL_THREAD_SCHEDULER_PARALLELISM); + } else { + System.setProperty(VIRTUAL_THREAD_SCHEDULER_PARALLELISM, parallelism); + } } + + assertThat( + "Should never get duplicates running to full completion", duplicates.keySet(), empty()); + assertThat( + "Only got: " + results.keySet(), + results.keySet(), + containsInAnyOrder(IntStream.range(0, count * 10).boxed().toArray())); } }