Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update event-task.md #300

Merged
merged 1 commit into from
Nov 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 91 additions & 28 deletions docs/documentation/configuration/workflowdef/systemtasks/event-task.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
"type" : "EVENT"
```

The `EVENT` task is a task used to publish an event into one of the supported eventing systems in Conductor.
The `EVENT` task type in Conductor is used to publish events to supported eventing systems. It enables event-based dependencies within workflows and tasks, making it possible to trigger external systems like SQS, NATS, or AMQP as part of the workflow execution.

## Use Cases
Consider a use case where at some point in the execution, an event is published to an external eventing system such as SQS.
Event tasks are useful for creating event based dependencies for workflows and tasks.
An EVENT task can be configured to send an event to an external system at any specified point in a workflow, enabling integration with event-based dependencies.

## Supported Queuing Systems
Conductor supports the the following eventing models:
Conductor supports the following queuing systems for EVENT tasks:

1. Conductor internal events (prefix: `conductor`)
2. SQS (prefix: `sqs`)
Expand All @@ -20,60 +19,124 @@ Conductor supports the the following eventing models:


## Configuration
The following parameters are specified at the top level of the task configuration.

| Attribute | Description |
| ------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| sink | External event queue in the format of `prefix:location`. Prefix is either `sqs` or `conductor` and `location` specifies the actual queue name. e.g. `sqs:send_email_queue` |
| asyncComplete | Boolean. [See below](#asynccomplete) |
| sink | The sink specifies the target event queue in the format `prefix:location`, where the prefix denotes the queuing system (e.g., `conductor`, `sqs`, `nats`, or `amqp`/`amqp_exchange`), and the location represents the specific queue name (e.g., `send_email_queue`). |
| asyncComplete | Setting `false` marks the status as COMPLETED upon execution, while setting `true` keeps the status IN_PROGRESS, awaiting completion from an external event. |

### asyncComplete
* ```false``` to mark status COMPLETED upon execution
* ```true``` to keep it IN_PROGRESS, wait for an external event (via Conductor or SQS or EventHandler) to complete it.

### Conductor sink
When producing an event with Conductor as sink, the event name follows the structure:
### Conductor Sink
When producing an event with Conductor as a sink, the event name follows the structure:
```conductor:<workflow_name>:<task_reference_name>```

When using Conductor as sink, you have two options: defining the sink as `conductor` in which case the queue name will default to the taskReferenceName of the Event Task, or specifying the queue name in the sink, as `conductor:<queue_name>`. The queue name is in the `event` value of the event Handler, as `conductor:<workflow_name>:<queue_name>`.
When using Conductor as sink, you have two options: defining the sink as `conductor`, in which case the queue name will default to the taskReferenceName of the Event Task, or specifying the queue name in the sink as `conductor:<queue_name>`. The queue name is in the `event` value of the event Handler, as `conductor:<workflow_name>:<queue_name>`.

### SQS sink
For SQS, use the **name** of the queue and NOT the URI. Conductor looks up the URI based on the name.
### SQS/NATS/AMQP Sink
Use the **queue's name**, NOT the URI. Conductor looks up the URI based on the name.

## Output
Tasks's output are sent as a payload to the external event. In case of SQS the task's output is sent to the SQS message a a payload.
Upon execution, the tasks output is sent to the external event queue. The payload contains:


| name | type | description |
| ------------------ | ------- | ------------------------------------- |
| workflowInstanceId | String | Workflow id |
| workflowInstanceId | String | Workflow ID |
| workflowType | String | Workflow Name |
| workflowVersion | Integer | Workflow Version |
| correlationId | String | Workflow CorrelationId |
| sink | String | Copy of the input data "sink" |
| asyncComplete | Boolean | Copy of the input data "asyncComplete |
| correlationId | String | Workflow Correlation ID |
| sink | String | Copy of input data for "sink" |
| asyncComplete | Boolean | Copy of input data for "asyncComplete |
| event_produced | String | Name of the event produced |

The published event's payload is identical to the output of the task (except "event_produced").

The published event's payload is identical to the task output (except "event_produced").

## Examples

Consider an example where we want to publish an event into SQS to notify an external system.

**Conductor Event:**
```json
{
"type": "EVENT",
"sink": "sqs:sqs_queue_name",
"sink": "conductor:internal_event_name",
"asyncComplete": false
}
```

An example where we want to publish a messase to conductor's internal queuing system.
**SQS Event:**
```json
{
"type": "EVENT",
"sink": "conductor:internal_event_name",
"sink": "sqs:sqs_queue_name",
"asyncComplete": false
}
```

**NATS Event:**
```json
{

"type": "EVENT",

"sink": "nats:nats_queue_name",

"asyncComplete": false

}
```

**AMQP Event:**
```json
{

"type": "EVENT",

"sink": "amqp:amqp_queue_name",

"asyncComplete": false

}
```

## Event Queue Published Artifacts

Group: `com.netflix.conductor`

| Published Artifact | Description |
| ------------------------------- | ----------------------------------- |
| conductor-amqp | Support for integration with AMQP |
| conductor-nats | Support for integration with NATS |

### Modules

#### AMQP

Provides the capability to publish and consume messages from AMQP-compatible brokers.

Configuration (default values shown below):

```java
conductor.event-queues.amqp.enabled=true
conductor.event-queues.amqp.hosts=localhost
conductor.event-queues.amqp.port=5672
conductor.event-queues.amqp.username=guest
conductor.event-queues.amqp.password=guest
conductor.event-queues.amqp.virtualhost=/
conductor.event-queues.amqp.useSslProtocol=false
#milliseconds
conductor.event-queues.amqp.connectionTimeout=60000
conductor.event-queues.amqp.useExchange=true
conductor.event-queues.amqp.listenerQueuePrefix=
```

#### NATS

Provides the capability to publish and consume messages from NATS queues.

Configuration (default values shown below):

```java
conductor.event-queues.nats.enabled=true
conductor.event-queues.nats-stream.clusterId=test-cluster
conductor.event-queues.nats-stream.durableName=
conductor.event-queues.nats-stream.url=nats://localhost:4222
conductor.event-queues.nats-stream.listenerQueuePrefix=
```
Loading