Skip to content

Commit

Permalink
Force and detect thread pinning
Browse files Browse the repository at this point in the history
  • Loading branch information
badgerwithagun committed Dec 26, 2023
1 parent a8f49f0 commit 6d4152b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 33 deletions.
6 changes: 6 additions & 0 deletions transactionoutbox-virtthreads/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>me.escoffier.loom</groupId>
<artifactId>loom-unit</artifactId>
<version>0.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,51 +9,64 @@
import com.gruelbox.transactionoutbox.testing.BaseTest;
import com.gruelbox.transactionoutbox.testing.InterfaceProcessor;
import java.time.Duration;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
import me.escoffier.loom.loomunit.LoomUnitExtension;
import me.escoffier.loom.loomunit.ShouldNotPin;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@Slf4j
@ExtendWith(LoomUnitExtension.class)
@ShouldNotPin
abstract class AbstractVirtualThreadsTest extends BaseTest {

private static final String VIRTUAL_THREAD_SCHEDULER_PARALLELISM =
"jdk.virtualThreadScheduler.parallelism";

@Test
final void highVolumeVirtualThreads() throws Exception {
var count = 10;
var count = 100;
var latch = new CountDownLatch(count * 10);
var transactionManager = txManager();
var results = new ConcurrentHashMap<Integer, Integer>();
var duplicates = new ConcurrentHashMap<Integer, Integer>();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.instantiator(Instantiator.using(clazz -> (InterfaceProcessor) (foo, bar) -> {}))
.submitter(Submitter.withExecutor(executor))
.attemptFrequency(Duration.ofMillis(500))
.flushBatchSize(1000)
.listener(
new TransactionOutboxListener() {
@Override
public void success(TransactionOutboxEntry entry) {
Integer i = (Integer) entry.getInvocation().getArgs()[0];
if (results.putIfAbsent(i, i) != null) {
duplicates.put(i, i);
}
latch.countDown();
var outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.instantiator(Instantiator.using(clazz -> (InterfaceProcessor) (foo, bar) -> {}))
.submitter(Submitter.withExecutor(Thread::startVirtualThread))
.attemptFrequency(Duration.ofMillis(500))
.flushBatchSize(1000)
.listener(
new TransactionOutboxListener() {
@Override
public void success(TransactionOutboxEntry entry) {
Integer i = (Integer) entry.getInvocation().getArgs()[0];
if (results.putIfAbsent(i, i) != null) {
duplicates.put(i, i);
}
})
.build();
latch.countDown();
}
})
.build();

var parallelism = System.getProperty(VIRTUAL_THREAD_SCHEDULER_PARALLELISM);
System.setProperty(VIRTUAL_THREAD_SCHEDULER_PARALLELISM, "1");
try {
withRunningFlusher(
outbox,
() -> {
var futures =
IntStream.range(0, count)
.mapToObj(
i ->
CompletableFuture.runAsync(
new FutureTask<Void>(
() ->
transactionManager.inTransaction(
() -> {
Expand All @@ -63,18 +76,27 @@ public void success(TransactionOutboxEntry entry) {
.process(i * 10 + j, "Whee");
}
}),
executor))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
null))
.toList();
futures.forEach(Thread::startVirtualThread);
for (var future : futures) {
future.get();
}
assertTrue(latch.await(30, TimeUnit.SECONDS), "Latch not opened in time");
});

assertThat(
"Should never get duplicates running to full completion", duplicates.keySet(), empty());
assertThat(
"Only got: " + results.keySet(),
results.keySet(),
containsInAnyOrder(IntStream.range(0, count * 10).boxed().toArray()));
} finally {
if (parallelism == null) {
System.clearProperty(VIRTUAL_THREAD_SCHEDULER_PARALLELISM);
} else {
System.setProperty(VIRTUAL_THREAD_SCHEDULER_PARALLELISM, parallelism);
}
}

assertThat(
"Should never get duplicates running to full completion", duplicates.keySet(), empty());
assertThat(
"Only got: " + results.keySet(),
results.keySet(),
containsInAnyOrder(IntStream.range(0, count * 10).boxed().toArray()));
}
}

0 comments on commit 6d4152b

Please sign in to comment.