Skip to content

Commit

Permalink
Saving WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jmigueprieto committed Sep 10, 2024
1 parent 6738d81 commit 19a8879
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
*/
package com.netflix.conductor.client.spring;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

import com.netflix.conductor.client.automator.TaskRunnerConfigurer;
import com.netflix.conductor.client.http.ConductorClient;
Expand All @@ -27,8 +30,11 @@
import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor;
import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor;

import lombok.extern.slf4j.Slf4j;

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(ClientProperties.class)
@Slf4j
public class ConductorClientAutoConfiguration {

@ConditionalOnMissingBean
Expand All @@ -54,17 +60,31 @@ public AnnotatedWorkerExecutor annotatedWorkerExecutor(TaskClient taskClient) {

@ConditionalOnMissingBean
@Bean(initMethod = "init", destroyMethod = "shutdown")
public TaskRunnerConfigurer taskRunnerConfigurer(TaskClient taskClient,
public TaskRunnerConfigurer taskRunnerConfigurer(Environment env,
TaskClient taskClient,
ClientProperties clientProperties,
List<Worker> workers) {
return new TaskRunnerConfigurer.Builder(taskClient, workers)
.withTaskThreadCount(clientProperties.getTaskThreadCount())
.withThreadCount(clientProperties.getThreadCount())
.withSleepWhenRetry((int) clientProperties.getSleepWhenRetryDuration().toMillis())
.withUpdateRetryCount(clientProperties.getUpdateRetryCount())
.withTaskToDomain(clientProperties.getTaskToDomain())
.withShutdownGracePeriodSeconds(clientProperties.getShutdownGracePeriodSeconds())
.build();
Map<String, Integer> taskThreadCount = new HashMap<>();
for (Worker worker : workers) {
String key = "conductor.worker." + worker.getTaskDefName() + ".threadCount";
int threadCount = env.getProperty(key, Integer.class, 10);
log.info("Using {} threads for {} worker", threadCount, worker.getTaskDefName());
taskThreadCount.put(worker.getTaskDefName(), threadCount);
}

if (clientProperties.getTaskThreadCount() != null) {
clientProperties.getTaskThreadCount().putAll(taskThreadCount);
} else {
clientProperties.setTaskThreadCount(taskThreadCount);
}
return new TaskRunnerConfigurer.Builder(taskClient, workers)
.withTaskThreadCount(clientProperties.getTaskThreadCount())
.withThreadCount(clientProperties.getThreadCount())
.withSleepWhenRetry((int) clientProperties.getSleepWhenRetryDuration().toMillis())
.withUpdateRetryCount(clientProperties.getUpdateRetryCount())
.withTaskToDomain(clientProperties.getTaskToDomain())
.withShutdownGracePeriodSeconds(clientProperties.getShutdownGracePeriodSeconds())
.build();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.jetbrains.annotations.NotNull;
Expand All @@ -32,6 +33,7 @@

import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;

Expand All @@ -48,6 +50,13 @@ public ApiClient(String rootUri, String keyId, String secret) {
.addHeaderSupplier(new OrkesAuthentication(keyId, secret)));
}

public ApiClient(String rootUri, String keyId, String secret, Consumer<OkHttpClient.Builder> configurer) {
super(ConductorClient.builder()
.basePath(rootUri)
.configureOkHttp(configurer)
.addHeaderSupplier(new OrkesAuthentication(keyId, secret)));
}

public ApiClient(String rootUri) {
super(rootUri);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package io.orkes.conductor.client.spring;


import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
Expand Down Expand Up @@ -42,6 +43,7 @@ public class OrkesConductorClientAutoConfiguration {
public static final String CONDUCTOR_CLIENT_SECRET = "conductor.client.secret";

@Bean
@ConditionalOnMissingBean
public ApiClient orkesConductorClient(Environment env) {
String basePath = env.getProperty(CONDUCTOR_CLIENT_BASE_PATH);
if (basePath == null) {
Expand Down

0 comments on commit 19a8879

Please sign in to comment.