Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for configurable MetricsCollector in Spring client #356

Merged
merged 1 commit into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.AutoConfiguration;
Expand All @@ -30,6 +31,7 @@
import com.netflix.conductor.client.http.ConductorClient;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.http.WorkflowClient;
import com.netflix.conductor.client.metrics.MetricsCollector;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor;
import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor;
Expand Down Expand Up @@ -79,7 +81,8 @@ public AnnotatedWorkerExecutor annotatedWorkerExecutor(TaskClient taskClient) {
public TaskRunnerConfigurer taskRunnerConfigurer(Environment env,
TaskClient taskClient,
ClientProperties clientProperties,
List<Worker> workers) {
List<Worker> workers,
Optional<MetricsCollector> metricsCollector) {
Map<String, Integer> taskThreadCount = new HashMap<>();
for (Worker worker : workers) {
String key = "conductor.worker." + worker.getTaskDefName() + ".threadCount";
Expand All @@ -94,15 +97,16 @@ public TaskRunnerConfigurer taskRunnerConfigurer(Environment env,
clientProperties.setTaskThreadCount(taskThreadCount);
}

return new TaskRunnerConfigurer.Builder(taskClient, workers)
TaskRunnerConfigurer.Builder builder = new TaskRunnerConfigurer.Builder(taskClient, workers)
.withTaskThreadCount(clientProperties.getTaskThreadCount())
.withThreadCount(clientProperties.getThreadCount())
.withSleepWhenRetry((int) clientProperties.getSleepWhenRetryDuration().toMillis())
.withUpdateRetryCount(clientProperties.getUpdateRetryCount())
.withTaskToDomain(clientProperties.getTaskToDomain())
.withShutdownGracePeriodSeconds(clientProperties.getShutdownGracePeriodSeconds())
.withTaskPollTimeout(clientProperties.getTaskPollTimeout())
.build();
.withTaskPollTimeout(clientProperties.getTaskPollTimeout());
metricsCollector.ifPresent(builder::withMetricsCollector);
return builder.build();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.springframework.stereotype.Component;

import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.metrics.MetricsCollector;
import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor;
import com.netflix.conductor.sdk.workflow.executor.task.WorkerConfiguration;

Expand All @@ -44,8 +45,12 @@ public void onApplicationEvent(ContextRefreshedEvent refreshedEvent) {
ApplicationContext applicationContext = refreshedEvent.getApplicationContext();
Environment environment = applicationContext.getEnvironment();
WorkerConfiguration configuration = new SpringWorkerConfiguration(environment);
AnnotatedWorkerExecutor annotatedWorkerExecutor = new AnnotatedWorkerExecutor(taskClient, configuration);

AnnotatedWorkerExecutor annotatedWorkerExecutor = new AnnotatedWorkerExecutor(taskClient, configuration);
String[] beanNames = applicationContext.getBeanNamesForType(MetricsCollector.class);
if (beanNames.length > 0) {
annotatedWorkerExecutor.setMetricsCollector(applicationContext.getBean(MetricsCollector.class));
}
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Component.class);
beans.values().forEach(annotatedWorkerExecutor::addBean);
annotatedWorkerExecutor.startPolling();
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=4.0.2-beta
version=4.0.4
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.netflix.conductor.client.automator.TaskRunnerConfigurer;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.metrics.MetricsCollector;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.sdk.workflow.task.WorkerTask;

Expand All @@ -50,6 +51,8 @@ public class AnnotatedWorkerExecutor {

protected Map<String, String> workerDomains = new HashMap<>();

private MetricsCollector metricsCollector;

private static final Set<String> scannedPackages = new HashSet<>();

private final WorkerConfiguration workerConfiguration;
Expand Down Expand Up @@ -197,12 +200,14 @@ public void startPolling() {
LOGGER.info("Starting workers with threadCount {}", workerToThreadCount);
LOGGER.info("Worker domains {}", workerDomains);

taskRunner =
new TaskRunnerConfigurer.Builder(taskClient, workers)
.withTaskThreadCount(workerToThreadCount)
.withTaskToDomain(workerDomains)
.build();
var builder = new TaskRunnerConfigurer.Builder(taskClient, workers)
.withTaskThreadCount(workerToThreadCount)
.withTaskToDomain(workerDomains);
if (metricsCollector != null) {
builder.withMetricsCollector(metricsCollector);
}

taskRunner = builder.build();
taskRunner.init();
}

Expand All @@ -215,4 +220,12 @@ List<Worker> getWorkers() {
TaskRunnerConfigurer getTaskRunner() {
return taskRunner;
}

public MetricsCollector getMetricsCollector() {
return metricsCollector;
}

public void setMetricsCollector(MetricsCollector metricsCollector) {
this.metricsCollector = metricsCollector;
}
}
Loading