Skip to content

Commit

Permalink
Add a SetCorrelationId system task
Browse files Browse the repository at this point in the history
  • Loading branch information
bjpirt committed Feb 9, 2024
1 parent b95d37c commit 35ffc81
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum TaskType {
KAFKA_PUBLISH,
JSON_JQ_TRANSFORM,
SET_VARIABLE,
SET_CORRELATION_ID,
NOOP;

/**
Expand Down Expand Up @@ -69,6 +70,7 @@ public enum TaskType {
public static final String TASK_TYPE_KAFKA_PUBLISH = "KAFKA_PUBLISH";
public static final String TASK_TYPE_JSON_JQ_TRANSFORM = "JSON_JQ_TRANSFORM";
public static final String TASK_TYPE_SET_VARIABLE = "SET_VARIABLE";
public static final String TASK_TYPE_SET_CORRELATION_ID = "SET_CORRELATION_ID";
public static final String TASK_TYPE_FORK = "FORK";
public static final String TASK_TYPE_NOOP = "NOOP";

Expand All @@ -81,6 +83,7 @@ public enum TaskType {
BUILT_IN_TASKS.add(TASK_TYPE_JOIN);
BUILT_IN_TASKS.add(TASK_TYPE_EXCLUSIVE_JOIN);
BUILT_IN_TASKS.add(TASK_TYPE_DO_WHILE);
BUILT_IN_TASKS.add(TASK_TYPE_SET_CORRELATION_ID);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2022 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.mapper;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.model.TaskModel;

@Component
public class SetCorrelationIdTaskMapper implements TaskMapper {

public static final Logger LOGGER = LoggerFactory.getLogger(SetCorrelationIdTaskMapper.class);

@Override
public String getTaskType() {
return TaskType.SET_CORRELATION_ID.name();
}

@Override
public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
throws TerminateWorkflowException {
LOGGER.debug("TaskMapperContext {} in SetCorrelationIdMapper", taskMapperContext);

TaskModel varTask = taskMapperContext.createTaskModel();
varTask.setStartTime(System.currentTimeMillis());
varTask.setInputData(taskMapperContext.getTaskInput());
varTask.setStatus(TaskModel.Status.IN_PROGRESS);

return List.of(varTask);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2022 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.tasks;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SET_CORRELATION_ID;
import static com.netflix.conductor.model.TaskModel.Status.*;

@Component(TASK_TYPE_SET_CORRELATION_ID)
public class SetCorrelationId extends WorkflowSystemTask {

private static Logger LOGGER = LoggerFactory.getLogger(SetCorrelationId.class);

public SetCorrelationId() {
super(TASK_TYPE_SET_CORRELATION_ID);
}

@Override
public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {
task.setStatus(TaskModel.Status.CANCELED);
}

@Override
public boolean execute(
WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {
LOGGER.info("Setting Correlation ID");
Object correlationId = task.getInputData().get("correlationId");

if (correlationId == null
|| correlationId.getClass() != String.class
|| ((String) correlationId).isEmpty()) {
task.setReasonForIncompletion(
"A non-empty String value must be provided for 'correlationId'");
task.setStatus(TaskModel.Status.FAILED_WITH_TERMINAL_ERROR);
return false;
}

workflow.setCorrelationId((String) correlationId);
task.addOutput("correlationId", correlationId);
task.setStatus(TaskModel.Status.COMPLETED);
return true;
}

public boolean isAsync() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class WorkflowTestService {
operators.add(TaskType.TASK_TYPE_JOIN);
operators.add(TaskType.TASK_TYPE_DO_WHILE);
operators.add(TaskType.TASK_TYPE_SET_VARIABLE);
operators.add(TaskType.TASK_TYPE_SET_CORRELATION_ID);
operators.add(TaskType.TASK_TYPE_FORK);
operators.add(TaskType.TASK_TYPE_INLINE);
operators.add(TaskType.TASK_TYPE_TERMINATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public boolean isValid(WorkflowTask workflowTask, ConstraintValidatorContext con
case TaskType.TASK_TYPE_WAIT:
valid = isWaitTaskValid(workflowTask, context);
break;
case TaskType.TASK_TYPE_SET_CORRELATION_ID:
valid = isSetCorrelationIdTaskValid(workflowTask, context);
break;
}

return valid;
Expand Down Expand Up @@ -345,11 +348,30 @@ private boolean isWaitTaskValid(
return valid;
}

private boolean isSetCorrelationIdTaskValid(
WorkflowTask workflowTask, ConstraintValidatorContext context) {
if (workflowTask.getInputParameters() == null
|| !workflowTask.getInputParameters().containsKey("correlationId")
|| workflowTask.getInputParameters().get("correlationId").getClass()
!= String.class) {
String message =
String.format(
PARAM_REQUIRED_STRING_FORMAT,
"correlationId",
TaskType.SET_CORRELATION_ID,
workflowTask.getName());
context.buildConstraintViolationWithTemplate(message).addConstraintViolation();
return false;
}
return true;
}

private boolean isDynamicForkJoinValid(
WorkflowTask workflowTask, ConstraintValidatorContext context) {
boolean valid = true;

// For DYNAMIC_FORK_JOIN_TASK support dynamicForkJoinTasksParam or combination of
// For DYNAMIC_FORK_JOIN_TASK support dynamicForkJoinTasksParam or combination
// of
// dynamicForkTasksParam and dynamicForkTasksInputParamName.
// Both are not allowed.
if (workflowTask.getDynamicForkJoinTasksParam() != null
Expand Down Expand Up @@ -397,7 +419,8 @@ private boolean isHttpTaskValid(
boolean isInputParameterSet = false;
boolean isInputTemplateSet = false;

// Either http_request in WorkflowTask inputParam should be set or in inputTemplate
// Either http_request in WorkflowTask inputParam should be set or in
// inputTemplate
// Taskdef should be set
if (workflowTask.getInputParameters() != null
&& workflowTask.getInputParameters().containsKey("http_request")) {
Expand Down Expand Up @@ -476,7 +499,8 @@ private boolean isKafkaPublishTaskValid(
boolean isInputParameterSet = false;
boolean isInputTemplateSet = false;

// Either kafka_request in WorkflowTask inputParam should be set or in inputTemplate
// Either kafka_request in WorkflowTask inputParam should be set or in
// inputTemplate
// Taskdef should be set
if (workflowTask.getInputParameters() != null
&& workflowTask.getInputParameters().containsKey("kafka_request")) {
Expand Down Expand Up @@ -531,7 +555,8 @@ private boolean isJSONJQTransformTaskValid(
boolean isInputParameterSet = false;
boolean isInputTemplateSet = false;

// Either queryExpression in WorkflowTask inputParam should be set or in inputTemplate
// Either queryExpression in WorkflowTask inputParam should be set or in
// inputTemplate
// Taskdef should be set
if (workflowTask.getInputParameters() != null
&& workflowTask.getInputParameters().containsKey("queryExpression")) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Set Variable

```json
"type" : "SET_CORRELATION_ID"
```

The `SET_CORRELATION_ID` task allows users to set the correlation ID on a workflow from within the workflow. Normally the correlation ID would be set by the process starting the workflow, however in certain occasions, such as starting a workflow from an event subscription, this is not possible.

## Use Cases

If you're using event handlers to start workflows and you're able to pass in the correlation ID in the event data, then this task enables you to set the current workflow to have that correlation ID too, making it easier to correlate workflows across system boundaries.

## Configuration

Set the new correlation ID in the `correlationId` input variable.

## Example

In this example, there is an S3 bucket that receives an XML file (which includes the correlation ID), this triggers an event notification in SQS for a new file which triggers this workflow. There is a SIMPLE task which parses the XML and outputs JSON including the correlation ID in the data structure. This task output is then used to set the correlatin ID on this workflow.

```json
{
"name": "Set_Correlation_Id_Workflow",
"description": "Set the correlation id to a value",
"version": 1,
"tasks": [
{
"name": "convert_xml",
"taskReferenceName": "convert_xml",
"inputParameters": {
"s3Path": "${workflow.input.s3Path}"
},
"type": "SIMPLE"
},
{
"name": "set_correlation_id",
"taskReferenceName": "set_correlation_id",
"type": "SET_CORRELATION_ID",
"inputParameters": {
"correlationId": "${convert_xml.output.correlationId}"
}
},
{
"name": "process_data",
"taskReferenceName": "process_data",
"inputParameters": {
"saved_name": "${convert_xml.output.data}"
},
"type": "SIMPLE"
}
],
"restartable": true,
"ownerEmail": "[email protected]",
"workflowStatusListenerEnabled": true,
"schemaVersion": 2
}
```

In the above example, it can be seen that the task `Set_Name` is a Set Variable Task and
the variable `name` is set to `Foo` and later in the workflow it is referenced by
`"${workflow.variables.name}"` in another task.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2022 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.sdk.workflow.def.tasks;

import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.sdk.workflow.def.WorkflowBuilder;

public class SetCorrelationId extends Task<SetCorrelationId> {
/**
* Sets the value of the variable in workflow. Used for workflow state management. Workflow
* state is a Map that is initialized using @see {@link WorkflowBuilder#variables(Object)}
*
* @param taskReferenceName Use input methods to set the variable values
*/
public SetCorrelationId(String taskReferenceName) {
super(taskReferenceName, TaskType.SET_CORRELATION_ID);
}

SetCorrelationId(WorkflowTask workflowTask) {
super(workflowTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public static void initTaskImplementations() {
TaskRegistry.register(TaskType.INLINE.name(), Javascript.class);
TaskRegistry.register(TaskType.JOIN.name(), Join.class);
TaskRegistry.register(TaskType.JSON_JQ_TRANSFORM.name(), JQ.class);
TaskRegistry.register(TaskType.SET_CORRELATION_ID.name(), SetCorrelationId.class);
TaskRegistry.register(TaskType.SET_VARIABLE.name(), SetVariable.class);
TaskRegistry.register(TaskType.SIMPLE.name(), SimpleTask.class);
TaskRegistry.register(TaskType.SUB_WORKFLOW.name(), SubWorkflow.class);
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ nav:
- documentation/configuration/workflowdef/systemtasks/inline-task.md
- documentation/configuration/workflowdef/systemtasks/json-jq-transform-task.md
- documentation/configuration/workflowdef/systemtasks/kafka-publish-task.md
- documentation/configuration/workflowdef/systemtasks/set-correlation-id-task.md
- documentation/configuration/workflowdef/systemtasks/wait-task.md
- documentation/configuration/taskdef.md
- documentation/configuration/eventhandlers.md
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2022 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.test.integration

import com.netflix.conductor.common.metadata.tasks.Task
import com.netflix.conductor.common.run.Workflow
import com.netflix.conductor.test.base.AbstractSpecification

import spock.lang.Shared

class SetCorrelationIdTaskSpec extends AbstractSpecification {

@Shared
def SET_CORRELATION_ID_WF = 'test_set_correlation_id_wf'

def setup() {
workflowTestUtil.registerWorkflows(
'simple_set_correlation_id_workflow_integration_test.json'
)
}

def "Test workflow with set correlation id task"() {
given: "workflow input"
def workflowInput = new HashMap()
workflowInput['correlationId'] = "my-unique-correlation-id"

when: "Start the workflow which has the set correlation id task"
def workflowInstanceId = startWorkflow(SET_CORRELATION_ID_WF, 1,
'', workflowInput, null)

then: "verify that the task is completed and variables were set"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 1
tasks[0].taskType == 'SET_CORRELATION_ID'
tasks[0].status == Task.Status.COMPLETED
output as String == '[correlationId:my-unique-correlation-id]'
}
}

def "Test workflow with missing correlation id"() {
given: "workflow input"
def workflowInput = new HashMap()

when: "Start the workflow which has the set variable task"
def workflowInstanceId = startWorkflow(SET_CORRELATION_ID_WF, 1,
'', workflowInput, null)

def expectedErrorMessage = "A non-empty String value must be provided for 'correlationId'"

then: "verify that the task failed with a message"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.FAILED
tasks.size() == 1
tasks[0].taskType == 'SET_CORRELATION_ID'
tasks[0].status == Task.Status.FAILED_WITH_TERMINAL_ERROR
tasks[0].reasonForIncompletion == expectedErrorMessage
}
}
}
Loading

0 comments on commit 35ffc81

Please sign in to comment.