-
Notifications
You must be signed in to change notification settings - Fork 39
Workflow dispatcher
Workflow dispatcher is a component responsible for fetching workflow instances from nflow_workflow
table of the nFlow database and passing them to the workflow instance executor to be processed.
Every nFlow node that does not have nflow.autostart=false in the properties runs an instance of workflow dispatcher.
The main requirements for the dispatchers are as follows:
- Only one dispatcher at a time may fetch the same workflow instance to be processed
- All workflow instances that are due for processing must be eventually fetched by a dispatcher
Workflow dispatcher runs in a single thread. First it runs a database query to get a number of workflow instances that can be processed, and then reserves the workflow instances for this node. Workflow instance can be processed when
- it has not been reserved by any node yet (
executor_id
is null) - it is due for processing (
next_activation
<=current_timestamp
) -
status
is eithercreated
orinProgress
and - it belongs to the same executor group as the dispatcher.
Workflow instance is reserved by setting the executor_id
. Only instances for which the executor_id
is successfully updated will be passed to the workflow instance executor. If there are no instances to be processed or this dispatcher failed to reserve any, the dispatcher will sleep for a while before trying again. The sleep time (in milliseconds) is a random value between 0 and the value of nflow.dispatcher.sleep.ms
configuration property.
Before fetching more workflow instances for processing, dispatcher will check the status of the workflow instance executor queue. If there is not enough room in the queue, dispatcher will sleep for a while before trying again. The sleep time is determined as above.
If there is room in the queue and a new workflow instance that is due for processing is inserted to the database, the dispatcher will be notified immediately. If the dispatcher was already running this has no effect, but it removes some delay if the dispatcher was sleeping.
A warning will be logged in case the dispatcher detects that one or more workflow instance executor threads have not been able to complete the processing of a workflow instance for a period of time (defined by nflow.executor.stuckThreadThreshold.seconds
), as this indicates that the processing may be stuck. If it is normal that the processing of a workflow instance takes a long time, you can either ignore this warning or configure a bigger threshold.