Skip to content

Commit

Permalink
Merge pull request #745 from MohamedSabthar/2201.9.x
Browse files Browse the repository at this point in the history
[2201.9.x] Improve query performance
  • Loading branch information
MohamedSabthar authored Oct 29, 2024
2 parents 6d7dbf4 + d149436 commit c8f701b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
2 changes: 1 addition & 1 deletion ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ modules = [
[[package]]
org = "ballerina"
name = "http"
version = "2.11.3"
version = "2.11.5"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "auth"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -34,7 +35,8 @@ private SQLWorkerThreadPool() {

// This is similar to cachedThreadPool util from Executors.newCachedThreadPool(..); but with upper cap on threads
public static final ExecutorService SQL_EXECUTOR_SERVICE = new ThreadPoolExecutor(0, 50,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new SQLThreadFactory());
60L, TimeUnit.SECONDS, new BlockingTaskQueue(), new SQLThreadFactory(),
new RetryTaskRejectionPolicy());

static class SQLThreadFactory implements ThreadFactory {
@Override
Expand All @@ -44,4 +46,31 @@ public Thread newThread(Runnable r) {
return ballerinaSql;
}
}

static class BlockingTaskQueue extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = 1L;

@Override
public boolean offer(Runnable task) {
// By returning false, we signal the ThreadPoolExecutor to bypass this queue and attempt to
// spawn a new thread if it hasn't reached the maximum pool size. This approach favors creating
// new threads over queuing tasks, thereby enabling more aggressive parallelism.
return false;
}

public void retryTask(Runnable task) {
if (!super.offer(task)) {
throw new IllegalStateException("Falied to requeue task: " + task);
}
}
}

static class RetryTaskRejectionPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
if (executor.getQueue() instanceof BlockingTaskQueue cbq) {
cbq.retryTask(task);
}
}
}
}

0 comments on commit c8f701b

Please sign in to comment.