Skip to content

Commit

Permalink
Changed systemTaskQueuePopTimeout type to Duration
Browse files Browse the repository at this point in the history
  • Loading branch information
jmigueprieto committed Jan 7, 2025
1 parent bf90790 commit c214292
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,11 @@ public class ConductorProperties {
200; // 10 seconds based on default systemTaskWorkerPollInterval of 50ms

/**
* Timeout in milliseconds used by {@link
* com.netflix.conductor.core.execution.tasks.SystemTaskWorker} when polling, i.e.: call to
* {@link com.netflix.conductor.dao.QueueDAO#pop(String, int, int)}.
* Timeout used by {@link com.netflix.conductor.core.execution.tasks.SystemTaskWorker} when
* polling, i.e.: call to {@link com.netflix.conductor.dao.QueueDAO#pop(String, int, int)}.
*/
private int systemTaskQueuePopTimeout = 200;
@DurationUnit(ChronoUnit.MILLIS)
private Duration systemTaskQueuePopTimeout = Duration.ofMillis(100);

public String getStack() {
return stack;
Expand Down Expand Up @@ -597,11 +597,11 @@ public int getSystemTaskPostponeThreshold() {
return systemTaskPostponeThreshold;
}

public int getSystemTaskQueuePopTimeout() {
public Duration getSystemTaskQueuePopTimeout() {
return systemTaskQueuePopTimeout;
}

public void setSystemTaskQueuePopTimeout(int systemTaskQueuePopTimeout) {
public void setSystemTaskQueuePopTimeout(Duration systemTaskQueuePopTimeout) {
this.systemTaskQueuePopTimeout = systemTaskQueuePopTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class SystemTaskWorker extends LifecycleAwareComponent {
private final AsyncSystemTaskExecutor asyncSystemTaskExecutor;
private final ConductorProperties properties;
private final ExecutionService executionService;
private final int queuePopTimeout;

ConcurrentHashMap<String, ExecutionConfig> queueExecutionConfigMap = new ConcurrentHashMap<>();

Expand All @@ -67,6 +68,7 @@ public SystemTaskWorker(
this.queueDAO = queueDAO;
this.pollInterval = properties.getSystemTaskWorkerPollInterval().toMillis();
this.executionService = executionService;
this.queuePopTimeout = (int) properties.getSystemTaskQueuePopTimeout().toMillis();

LOGGER.info("SystemTaskWorker initialized with {} threads", threadCount);
}
Expand Down Expand Up @@ -115,10 +117,7 @@ void pollAndExecute(WorkflowSystemTask systemTask, String queueName) {
LOGGER.debug("Polling queue: {} with {} slots acquired", queueName, messagesToAcquire);

List<String> polledTaskIds =
queueDAO.pop(
queueName,
messagesToAcquire,
properties.getSystemTaskQueuePopTimeout());
queueDAO.pop(queueName, messagesToAcquire, queuePopTimeout);

Monitors.recordTaskPoll(queueName);
LOGGER.debug("Polling queue:{}, got {} tasks", queueName, polledTaskIds.size());
Expand Down

0 comments on commit c214292

Please sign in to comment.