From 6865c52f7197538d2adc5db309cd96ad7f386680 Mon Sep 17 00:00:00 2001 From: Yury Yarashevich Date: Wed, 17 Aug 2022 00:10:46 +0200 Subject: [PATCH] Use ForkJoinPool as executor service. --- .../src/main/java/io/objectbox/BoxStore.java | 5 +- .../java/io/objectbox/BoxStoreBuilder.java | 12 +++ .../internal/ObjectBoxThreadPool.java | 91 +++++++++++-------- 3 files changed, 71 insertions(+), 37 deletions(-) diff --git a/objectbox-java/src/main/java/io/objectbox/BoxStore.java b/objectbox-java/src/main/java/io/objectbox/BoxStore.java index bcc31d99..ce6950b9 100644 --- a/objectbox-java/src/main/java/io/objectbox/BoxStore.java +++ b/objectbox-java/src/main/java/io/objectbox/BoxStore.java @@ -36,6 +36,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -227,7 +228,7 @@ public static boolean isSyncServerAvailable() { private final int[] allEntityTypeIds; private final Map, Box> boxes = new ConcurrentHashMap<>(); private final Set transactions = Collections.newSetFromMap(new WeakHashMap<>()); - private final ExecutorService threadPool = new ObjectBoxThreadPool(this); + private final ExecutorService threadPool; private final ObjectClassPublisher objectClassPublisher; final boolean debugTxRead; final boolean debugTxWrite; @@ -257,6 +258,8 @@ public static boolean isSyncServerAvailable() { private SyncClient syncClient; BoxStore(BoxStoreBuilder builder) { + threadPool = Executors.unconfigurableExecutorService( + new ObjectBoxThreadPool(this, builder.executorServiceParallelism)); context = builder.context; relinker = builder.relinker; NativeLibraryLoader.ensureLoaded(); diff --git a/objectbox-java/src/main/java/io/objectbox/BoxStoreBuilder.java b/objectbox-java/src/main/java/io/objectbox/BoxStoreBuilder.java index 1497f1f6..f156f117 100644 --- a/objectbox-java/src/main/java/io/objectbox/BoxStoreBuilder.java +++ b/objectbox-java/src/main/java/io/objectbox/BoxStoreBuilder.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ForkJoinPool; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -94,6 +95,8 @@ public class BoxStoreBuilder { int maxReaders; boolean noReaderThreadLocals; + int executorServiceParallelism = ForkJoinPool.getCommonPoolParallelism(); + int queryAttempts; /** For DebugCursor. */ @@ -319,6 +322,15 @@ public BoxStoreBuilder noReaderThreadLocals() { return this; } + /** + * Sets the maximum allowed level of parallelism allowed by executor service + * used by BoxStore. The default value is equal to {@ref ForkJoinPool#getCommonPoolParallelism())} + */ + public BoxStoreBuilder executorServiceParallelism(int parallelism) { + this.executorServiceParallelism = parallelism; + return this; + } + @Internal public void entity(EntityInfo entityInfo) { entityInfoList.add(entityInfo); diff --git a/objectbox-java/src/main/java/io/objectbox/internal/ObjectBoxThreadPool.java b/objectbox-java/src/main/java/io/objectbox/internal/ObjectBoxThreadPool.java index 41b2ccdd..c51082b5 100644 --- a/objectbox-java/src/main/java/io/objectbox/internal/ObjectBoxThreadPool.java +++ b/objectbox-java/src/main/java/io/objectbox/internal/ObjectBoxThreadPool.java @@ -17,65 +17,84 @@ package io.objectbox.internal; import java.util.concurrent.Executors; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import io.objectbox.BoxStore; import io.objectbox.annotation.apihint.Internal; /** - * Custom thread pool similar to {@link Executors#newCachedThreadPool()} with the following adjustments: + * Custom executor service similar to {@link Executors#newWorkStealingPool()} with the following adjustments: *
    - *
  • Release thread local resources ({@link BoxStore#closeThreadResources()})
  • - *
  • Reduce keep-alive time for threads to 20 seconds
  • - *
  • Uses a ThreadFactory to name threads like "ObjectBox-1-Thread-1"
  • + *
  • Release thread local resources ({@link BoxStore#closeThreadResources()}) after task execution
  • + *
  • Uses a custom thread factory to name threads like "ObjectBox-ForkJoinPool-1-Thread-1"
  • *
* */ @Internal -public class ObjectBoxThreadPool extends ThreadPoolExecutor { +public final class ObjectBoxThreadPool extends AbstractExecutorService { private final BoxStore boxStore; + private final ExecutorService executorImpl; - public ObjectBoxThreadPool(BoxStore boxStore) { - super(0, Integer.MAX_VALUE, 20L, TimeUnit.SECONDS, new SynchronousQueue<>(), - new ObjectBoxThreadFactory()); + public ObjectBoxThreadPool(BoxStore boxStore, int parallelism) { this.boxStore = boxStore; + this.executorImpl = Executors.unconfigurableExecutorService( + new ForkJoinPool( + parallelism, + pool -> { + ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + // Priority and daemon status are inherited from calling thread; ensure to reset if required + if (thread.getPriority() != Thread.NORM_PRIORITY) { + thread.setPriority(Thread.NORM_PRIORITY); + } + if (thread.isDaemon()) { + thread.setDaemon(false); + } + thread.setName("ObjectBox-" + thread.getName()); + return thread; + }, + null, + false)); } + @Override - protected void afterExecute(Runnable runnable, Throwable throwable) { - super.afterExecute(runnable, throwable); - boxStore.closeThreadResources(); + public void shutdown() { + executorImpl.shutdown(); } - static class ObjectBoxThreadFactory implements ThreadFactory { - private static final AtomicInteger POOL_COUNT = new AtomicInteger(); + @Override + public List shutdownNow() { + return executorImpl.shutdownNow(); + } - private final ThreadGroup group; - private final String namePrefix = "ObjectBox-" + POOL_COUNT.incrementAndGet() + "-Thread-"; - private final AtomicInteger threadCount = new AtomicInteger(); + @Override + public boolean isShutdown() { + return executorImpl.isShutdown(); + } - ObjectBoxThreadFactory() { - SecurityManager securityManager = System.getSecurityManager(); - group = (securityManager != null) ? securityManager.getThreadGroup() : - Thread.currentThread().getThreadGroup(); - } + @Override + public boolean isTerminated() { + return executorImpl.isTerminated(); + } - public Thread newThread(Runnable runnable) { - String name = namePrefix + threadCount.incrementAndGet(); - Thread thread = new Thread(group, runnable, name); + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return executorImpl.awaitTermination(timeout, unit); + } - // Priority and daemon status are inherited from calling thread; ensure to reset if required - if (thread.getPriority() != Thread.NORM_PRIORITY) { - thread.setPriority(Thread.NORM_PRIORITY); - } - if (thread.isDaemon()) { - thread.setDaemon(false); + @Override + public void execute(Runnable command) { + executorImpl.execute(() -> { + try { + command.run(); + } finally { + boxStore.closeThreadResources(); } - return thread; - } + }); } }