Skip to content

Commit

Permalink
Improve query performance
Browse files Browse the repository at this point in the history
  • Loading branch information
MohamedSabthar committed Oct 29, 2024
1 parent fbe911b commit bd9b754
Showing 1 changed file with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -41,7 +42,8 @@ private SQLWorkerThreadPool() {

// This is similar to cachedThreadPool util from Executors.newCachedThreadPool(..); but with upper cap on threads
public static final ExecutorService SQL_EXECUTOR_SERVICE = new ThreadPoolExecutor(0, MAX_POOL_SIZE,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new SQLThreadFactory());
60L, TimeUnit.SECONDS, new BlockingTaskQueue(), new SQLThreadFactory(),
new RetryTaskRejectionPolicy());

static class SQLThreadFactory implements ThreadFactory {
@Override
Expand All @@ -51,4 +53,32 @@ public Thread newThread(Runnable r) {
return ballerinaSql;
}
}


static class BlockingTaskQueue extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = 1L;

@Override
public boolean offer(Runnable task) {
// By returning false, we signal the ThreadPoolExecutor to bypass this queue and attempt to
// spawn a new thread if it hasn't reached the maximum pool size. This approach favors creating
// new threads over queuing tasks, thereby enabling more aggressive parallelism.
return false;
}

public void retryTask(Runnable task) {
if (!super.offer(task)) {
throw new IllegalStateException("Failed to requeue task: " + task);
}
}
}

static class RetryTaskRejectionPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
if (executor.getQueue() instanceof BlockingTaskQueue cbq) {
cbq.retryTask(task);
}
}
}
}

0 comments on commit bd9b754

Please sign in to comment.