Skip to content

Commit

Permalink
updated taskdef
Browse files Browse the repository at this point in the history
  • Loading branch information
Shailesh Jagannath Padave committed Nov 19, 2024
1 parent a85ca3c commit da987b6
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ public boolean isRetriable() {
@ProtoField(id = 42)
private boolean subworkflowChanged;

@ProtoField(id = 43)
private long firstStartTime;

// If the task is an event associated with a parent task, the id of the parent task
private String parentTaskId;

Expand Down Expand Up @@ -736,6 +739,13 @@ public void setSubworkflowChanged(boolean subworkflowChanged) {
this.subworkflowChanged = subworkflowChanged;
}

public long getFirstStartTime() {
return firstStartTime;
}

public void setFirstStartTime(long firstStartTime) {
this.firstStartTime = firstStartTime;
}
public String getSubWorkflowId() {
// For backwards compatibility
if (StringUtils.isNotBlank(subWorkflowId)) {
Expand Down Expand Up @@ -813,6 +823,7 @@ public Task copy() {
public Task deepCopy() {
Task deepCopy = copy();
deepCopy.setStartTime(startTime);
deepCopy.setFirstStartTime(firstStartTime);
deepCopy.setScheduledTime(scheduledTime);
deepCopy.setEndTime(endTime);
deepCopy.setWorkerId(workerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public enum RetryLogic {

private long timeoutSeconds;

private long totalTimeoutSeconds;

private List<String> inputKeys = new ArrayList<>();

private List<String> outputKeys = new ArrayList<>();
Expand Down Expand Up @@ -100,20 +102,22 @@ public TaskDef(String name, String description) {
this.description = description;
}

public TaskDef(String name, String description, int retryCount, long timeoutSeconds) {
public TaskDef(String name, String description, int retryCount, long timeoutSeconds, long totalTimeoutSeconds) {
this.name = name;
this.description = description;
this.retryCount = retryCount;
this.timeoutSeconds = timeoutSeconds;
this.totalTimeoutSeconds = totalTimeoutSeconds;
}

public TaskDef(String name, String description, String ownerEmail, int retryCount, long timeoutSeconds, long responseTimeoutSeconds) {
public TaskDef(String name, String description, String ownerEmail, int retryCount, long timeoutSeconds, long responseTimeoutSeconds, long totalTimeoutSeconds) {
this.name = name;
this.description = description;
this.ownerEmail = ownerEmail;
this.retryCount = retryCount;
this.timeoutSeconds = timeoutSeconds;
this.responseTimeoutSeconds = responseTimeoutSeconds;
this.totalTimeoutSeconds = totalTimeoutSeconds;
}

/**
Expand Down Expand Up @@ -416,6 +420,14 @@ public void setEnforceSchema(boolean enforceSchema) {
this.enforceSchema = enforceSchema;
}

public long getTotalTimeoutSeconds() {
return totalTimeoutSeconds;
}

public void setTotalTimeoutSeconds(long totalTimeoutSeconds) {
this.totalTimeoutSeconds = totalTimeoutSeconds;
}

public String toString() {
return name;
}
Expand All @@ -428,10 +440,10 @@ public boolean equals(Object o) {
return false;
}
TaskDef taskDef = (TaskDef) o;
return getRetryCount() == taskDef.getRetryCount() && getTimeoutSeconds() == taskDef.getTimeoutSeconds() && getRetryDelaySeconds() == taskDef.getRetryDelaySeconds() && getBackoffScaleFactor() == taskDef.getBackoffScaleFactor() && getResponseTimeoutSeconds() == taskDef.getResponseTimeoutSeconds() && Objects.equals(getName(), taskDef.getName()) && Objects.equals(getDescription(), taskDef.getDescription()) && Objects.equals(getInputKeys(), taskDef.getInputKeys()) && Objects.equals(getOutputKeys(), taskDef.getOutputKeys()) && getTimeoutPolicy() == taskDef.getTimeoutPolicy() && getRetryLogic() == taskDef.getRetryLogic() && Objects.equals(getConcurrentExecLimit(), taskDef.getConcurrentExecLimit()) && Objects.equals(getRateLimitPerFrequency(), taskDef.getRateLimitPerFrequency()) && Objects.equals(getInputTemplate(), taskDef.getInputTemplate()) && Objects.equals(getIsolationGroupId(), taskDef.getIsolationGroupId()) && Objects.equals(getExecutionNameSpace(), taskDef.getExecutionNameSpace()) && Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail()) && Objects.equals(getBaseType(), taskDef.getBaseType()) && Objects.equals(getInputSchema(), taskDef.getInputSchema()) && Objects.equals(getOutputSchema(), taskDef.getOutputSchema());
return getTotalTimeoutSeconds() == taskDef.getTotalTimeoutSeconds() && getRetryCount() == taskDef.getRetryCount() && getTimeoutSeconds() == taskDef.getTimeoutSeconds() && getRetryDelaySeconds() == taskDef.getRetryDelaySeconds() && getBackoffScaleFactor() == taskDef.getBackoffScaleFactor() && getResponseTimeoutSeconds() == taskDef.getResponseTimeoutSeconds() && Objects.equals(getName(), taskDef.getName()) && Objects.equals(getDescription(), taskDef.getDescription()) && Objects.equals(getInputKeys(), taskDef.getInputKeys()) && Objects.equals(getOutputKeys(), taskDef.getOutputKeys()) && getTimeoutPolicy() == taskDef.getTimeoutPolicy() && getRetryLogic() == taskDef.getRetryLogic() && Objects.equals(getConcurrentExecLimit(), taskDef.getConcurrentExecLimit()) && Objects.equals(getRateLimitPerFrequency(), taskDef.getRateLimitPerFrequency()) && Objects.equals(getInputTemplate(), taskDef.getInputTemplate()) && Objects.equals(getIsolationGroupId(), taskDef.getIsolationGroupId()) && Objects.equals(getExecutionNameSpace(), taskDef.getExecutionNameSpace()) && Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail()) && Objects.equals(getBaseType(), taskDef.getBaseType()) && Objects.equals(getInputSchema(), taskDef.getInputSchema()) && Objects.equals(getOutputSchema(), taskDef.getOutputSchema());
}

public int hashCode() {
return Objects.hash(getName(), getDescription(), getRetryCount(), getTimeoutSeconds(), getInputKeys(), getOutputKeys(), getTimeoutPolicy(), getRetryLogic(), getRetryDelaySeconds(), getBackoffScaleFactor(), getResponseTimeoutSeconds(), getConcurrentExecLimit(), getRateLimitPerFrequency(), getInputTemplate(), getIsolationGroupId(), getExecutionNameSpace(), getOwnerEmail(), getBaseType(), getInputSchema(), getOutputSchema());
return Objects.hash(getName(), getDescription(), getRetryCount(), getTimeoutSeconds(), getTotalTimeoutSeconds(), getInputKeys(), getOutputKeys(), getTimeoutPolicy(), getRetryLogic(), getRetryDelaySeconds(), getBackoffScaleFactor(), getResponseTimeoutSeconds(), getConcurrentExecLimit(), getRateLimitPerFrequency(), getInputTemplate(), getIsolationGroupId(), getExecutionNameSpace(), getOwnerEmail(), getBaseType(), getInputSchema(), getOutputSchema());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
//for the long running tasks, timeout if the task does not get updated in COMPLETED or IN_PROGRESS status in 60 seconds after the last update
taskDef.setResponseTimeoutSeconds(60);

taskDef.setTotalTimeoutSeconds(600);
//only allow 100 executions in a 10-second window! -- Note, this is complementary to concurrent_exec_limit
taskDef.setRateLimitPerFrequency(100);
taskDef.setRateLimitFrequencyInSeconds(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void createTaskDefinitions() {
taskDef2.setDescription("task to notify users");
taskDef2.setOwnerEmail("[email protected]");
taskDef2.setResponseTimeoutSeconds(10);
taskDef2.setTotalTimeoutSeconds(120);
taskDef2.setRetryCount(3);
// At any given time, max 10 executions of this task will be allowed. Tasks to be scheduled
// after reaching max
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {

if (task.getStatus() == TaskModel.Status.SCHEDULED) {
task.setStartTime(System.currentTimeMillis());
task.setFirstStartTime(System.currentTimeMillis());
Monitors.recordQueueWaitTime(task.getTaskType(), task.getQueueWaitTime());
systemTask.start(workflow, task, workflowExecutor);
} else if (task.getStatus() == TaskModel.Status.IN_PROGRESS) {
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/com/netflix/conductor/model/TaskModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public boolean isRetriable() {
/** Time when the task was last updated */
private long updateTime;

/** Time when first task started */
private long firstStartTime;

private int startDelayInSeconds;

private String retriedTaskId;
Expand Down Expand Up @@ -535,6 +538,14 @@ public void setIteration(int iteration) {
this.iteration = iteration;
}

public long getFirstStartTime() {
return firstStartTime;
}

public void setFirstStartTime(long firstStartTime) {
this.firstStartTime = firstStartTime;
}

public String getSubWorkflowId() {
// For backwards compatibility
if (StringUtils.isNotBlank(subWorkflowId)) {
Expand Down

0 comments on commit da987b6

Please sign in to comment.