diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorClientAutoConfiguration.java b/conductor-clients/java/conductor-java-sdk/conductor-client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorClientAutoConfiguration.java index 883926a8e..5c698aaef 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorClientAutoConfiguration.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorClientAutoConfiguration.java @@ -12,12 +12,15 @@ */ package com.netflix.conductor.client.spring; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; import com.netflix.conductor.client.automator.TaskRunnerConfigurer; import com.netflix.conductor.client.http.ConductorClient; @@ -27,8 +30,11 @@ import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor; import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor; +import lombok.extern.slf4j.Slf4j; + @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(ClientProperties.class) +@Slf4j public class ConductorClientAutoConfiguration { @ConditionalOnMissingBean @@ -54,17 +60,31 @@ public AnnotatedWorkerExecutor annotatedWorkerExecutor(TaskClient taskClient) { @ConditionalOnMissingBean @Bean(initMethod = "init", destroyMethod = "shutdown") - public TaskRunnerConfigurer taskRunnerConfigurer(TaskClient taskClient, + public TaskRunnerConfigurer taskRunnerConfigurer(Environment env, + TaskClient taskClient, ClientProperties clientProperties, List workers) { - return new TaskRunnerConfigurer.Builder(taskClient, workers) - .withTaskThreadCount(clientProperties.getTaskThreadCount()) - .withThreadCount(clientProperties.getThreadCount()) - .withSleepWhenRetry((int) clientProperties.getSleepWhenRetryDuration().toMillis()) - .withUpdateRetryCount(clientProperties.getUpdateRetryCount()) - .withTaskToDomain(clientProperties.getTaskToDomain()) - .withShutdownGracePeriodSeconds(clientProperties.getShutdownGracePeriodSeconds()) - .build(); + Map taskThreadCount = new HashMap<>(); + for (Worker worker : workers) { + String key = "conductor.worker." + worker.getTaskDefName() + ".threadCount"; + int threadCount = env.getProperty(key, Integer.class, 10); + log.info("Using {} threads for {} worker", threadCount, worker.getTaskDefName()); + taskThreadCount.put(worker.getTaskDefName(), threadCount); + } + + if (clientProperties.getTaskThreadCount() != null) { + clientProperties.getTaskThreadCount().putAll(taskThreadCount); + } else { + clientProperties.setTaskThreadCount(taskThreadCount); + } + return new TaskRunnerConfigurer.Builder(taskClient, workers) + .withTaskThreadCount(clientProperties.getTaskThreadCount()) + .withThreadCount(clientProperties.getThreadCount()) + .withSleepWhenRetry((int) clientProperties.getSleepWhenRetryDuration().toMillis()) + .withUpdateRetryCount(clientProperties.getUpdateRetryCount()) + .withTaskToDomain(clientProperties.getTaskToDomain()) + .withShutdownGracePeriodSeconds(clientProperties.getShutdownGracePeriodSeconds()) + .build(); } @Bean diff --git a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/ApiClient.java b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/ApiClient.java index cf4147769..76b7127c0 100644 --- a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/ApiClient.java +++ b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/ApiClient.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.jetbrains.annotations.NotNull; @@ -32,6 +33,7 @@ import okhttp3.Call; import okhttp3.Callback; +import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; @@ -48,6 +50,13 @@ public ApiClient(String rootUri, String keyId, String secret) { .addHeaderSupplier(new OrkesAuthentication(keyId, secret))); } + public ApiClient(String rootUri, String keyId, String secret, Consumer configurer) { + super(ConductorClient.builder() + .basePath(rootUri) + .configureOkHttp(configurer) + .addHeaderSupplier(new OrkesAuthentication(keyId, secret))); + } + public ApiClient(String rootUri) { super(rootUri); } diff --git a/conductor-clients/java/conductor-java-sdk/orkes-spring/src/main/java/io/orkes/conductor/client/spring/OrkesConductorClientAutoConfiguration.java b/conductor-clients/java/conductor-java-sdk/orkes-spring/src/main/java/io/orkes/conductor/client/spring/OrkesConductorClientAutoConfiguration.java index dc544d021..2149c93cc 100644 --- a/conductor-clients/java/conductor-java-sdk/orkes-spring/src/main/java/io/orkes/conductor/client/spring/OrkesConductorClientAutoConfiguration.java +++ b/conductor-clients/java/conductor-java-sdk/orkes-spring/src/main/java/io/orkes/conductor/client/spring/OrkesConductorClientAutoConfiguration.java @@ -13,6 +13,7 @@ package io.orkes.conductor.client.spring; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; @@ -42,6 +43,7 @@ public class OrkesConductorClientAutoConfiguration { public static final String CONDUCTOR_CLIENT_SECRET = "conductor.client.secret"; @Bean + @ConditionalOnMissingBean public ApiClient orkesConductorClient(Environment env) { String basePath = env.getProperty(CONDUCTOR_CLIENT_BASE_PATH); if (basePath == null) {