Skip to content

Workflow dispatcher

Edvard Fonsell edited this page Apr 11, 2017 · 2 revisions

What is it?

Workflow dispatcher is a component responsible for fetching workflow instances from nflow_workflow table of the nFlow database and passing them to the workflow instance executor to be processed.

Every nFlow node that does not have nflow.autostart=false in the properties runs an instance of workflow dispatcher.

The main requirements for the dispatchers are as follows:

  • Only one dispatcher at a time may fetch the same workflow instance to be processed
  • All workflow instances that are due for processing must be eventually fetched by a dispatcher

How does it work?

Workflow dispatcher runs in a single thread. First it runs a database query to get a number of workflow instances that can be processed, and then reserves the workflow instances for this node. Workflow instance can be processed when

  • it has not been reserved by any node yet (executor_id is null)
  • it is due for processing (next_activation <= current_timestamp)
  • status is either created or inProgress and
  • it belongs to the same executor group as the dispatcher.

Workflow instance is reserved by setting the executor_id. Only instances for which the executor_id is successfully updated will be passed to the workflow instance executor. If there are no instances to be processed or this dispatcher failed to reserve any, the dispatcher will sleep for a while before trying again. The sleep time (in milliseconds) is a random value between 0 and the value of nflow.dispatcher.sleep.ms configuration property.

Before fetching more workflow instances for processing, dispatcher will check the status of the workflow instance executor queue. If there is not enough room in the queue, dispatcher will sleep for a while before trying again. The sleep time is determined as above.

If there is room in the queue and a new workflow instance that is due for processing is inserted to the database, the dispatcher will be notified immediately. If the dispatcher was already running this has no effect, but it removes some delay if the dispatcher was sleeping.

A warning will be logged in case the dispatcher detects that one or more workflow instance executor threads have not been able to complete the processing of a workflow instance for a period of time (defined by nflow.executor.stuckThreadThreshold.seconds), as this indicates that the processing may be stuck. If it is normal that the processing of a workflow instance takes a long time, you can either ignore this warning or configure a bigger threshold.