From bd9b7543567cfcc2895d8bb3e1b2d8bdfae8c55d Mon Sep 17 00:00:00 2001 From: MohamedSabthar Date: Tue, 29 Oct 2024 11:05:12 +0530 Subject: [PATCH] Improve query performance --- .../sql/datasource/SQLWorkerThreadPool.java | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/native/src/main/java/io/ballerina/stdlib/sql/datasource/SQLWorkerThreadPool.java b/native/src/main/java/io/ballerina/stdlib/sql/datasource/SQLWorkerThreadPool.java index e1443075..4eb40c84 100644 --- a/native/src/main/java/io/ballerina/stdlib/sql/datasource/SQLWorkerThreadPool.java +++ b/native/src/main/java/io/ballerina/stdlib/sql/datasource/SQLWorkerThreadPool.java @@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -41,7 +42,8 @@ private SQLWorkerThreadPool() { // This is similar to cachedThreadPool util from Executors.newCachedThreadPool(..); but with upper cap on threads public static final ExecutorService SQL_EXECUTOR_SERVICE = new ThreadPoolExecutor(0, MAX_POOL_SIZE, - 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new SQLThreadFactory()); + 60L, TimeUnit.SECONDS, new BlockingTaskQueue(), new SQLThreadFactory(), + new RetryTaskRejectionPolicy()); static class SQLThreadFactory implements ThreadFactory { @Override @@ -51,4 +53,32 @@ public Thread newThread(Runnable r) { return ballerinaSql; } } + + + static class BlockingTaskQueue extends LinkedBlockingQueue { + private static final long serialVersionUID = 1L; + + @Override + public boolean offer(Runnable task) { + // By returning false, we signal the ThreadPoolExecutor to bypass this queue and attempt to + // spawn a new thread if it hasn't reached the maximum pool size. This approach favors creating + // new threads over queuing tasks, thereby enabling more aggressive parallelism. + return false; + } + + public void retryTask(Runnable task) { + if (!super.offer(task)) { + throw new IllegalStateException("Failed to requeue task: " + task); + } + } + } + + static class RetryTaskRejectionPolicy implements RejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { + if (executor.getQueue() instanceof BlockingTaskQueue cbq) { + cbq.retryTask(task); + } + } + } }