@@ -518,85 +518,68 @@ def main():
518
518
### Workflow
519
519
520
520
``` python
521
- from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
522
- from dapr.clients import DaprClient
521
+ from time import sleep
523
522
524
- instanceId = " exampleInstanceID"
525
- workflowComponent = " dapr"
526
- workflowName = " hello_world_wf"
527
- eventName = " event1"
528
- eventData = " eventData"
523
+ import dapr.ext.workflow as wf
529
524
530
- def main ():
531
- with DaprClient() as d:
532
- host = settings.DAPR_RUNTIME_HOST
533
- port = settings.DAPR_GRPC_PORT
534
- workflowRuntime = WorkflowRuntime(host, port)
535
- workflowRuntime = WorkflowRuntime()
536
- workflowRuntime.register_workflow(hello_world_wf)
537
- workflowRuntime.register_activity(hello_act)
538
- workflowRuntime.start()
539
-
540
- # Start the workflow
541
- start_resp = d.start_workflow(instance_id = instanceId, workflow_component = workflowComponent,
542
- workflow_name = workflowName, input = inputData, workflow_options = workflowOptions)
543
- print (f " start_resp { start_resp.instance_id} " )
544
-
545
- # ...
546
-
547
- # Pause Test
548
- d.pause_workflow(instance_id = instanceId, workflow_component = workflowComponent)
549
- getResponse = d.get_workflow(instance_id = instanceId, workflow_component = workflowComponent)
550
- print (f " Get response from { workflowName} after pause call: { getResponse.runtime_status} " )
551
-
552
- # Resume Test
553
- d.resume_workflow(instance_id = instanceId, workflow_component = workflowComponent)
554
- getResponse = d.get_workflow(instance_id = instanceId, workflow_component = workflowComponent)
555
- print (f " Get response from { workflowName} after resume call: { getResponse.runtime_status} " )
556
-
557
- sleep(1 )
558
- # Raise event
559
- d.raise_workflow_event(instance_id = instanceId, workflow_component = workflowComponent,
560
- event_name = eventName, event_data = eventData)
561
525
562
- sleep(5 )
563
- # Purge Test
564
- d.purge_workflow(instance_id = instanceId, workflow_component = workflowComponent)
565
- try :
566
- getResponse = d.get_workflow(instance_id = instanceId, workflow_component = workflowComponent)
567
- except DaprInternalError as err:
568
- if nonExistentIDError in err._message:
569
- print (" Instance Successfully Purged" )
570
-
571
-
572
- # Kick off another workflow for termination purposes
573
- # This will also test using the same instance ID on a new workflow after
574
- # the old instance was purged
575
- start_resp = d.start_workflow(instance_id = instanceId, workflow_component = workflowComponent,
576
- workflow_name = workflowName, input = inputData, workflow_options = workflowOptions)
577
- print (f " start_resp { start_resp.instance_id} " )
578
-
579
- # Terminate Test
580
- d.terminate_workflow(instance_id = instanceId, workflow_component = workflowComponent)
581
- sleep(1 )
582
- getResponse = d.get_workflow(instance_id = instanceId, workflow_component = workflowComponent)
583
- print (f " Get response from { workflowName} after terminate call: { getResponse.runtime_status} " )
584
-
585
- # Purge Test
586
- d.purge_workflow(instance_id = instanceId, workflow_component = workflowComponent)
587
- try :
588
- getResponse = d.get_workflow(instance_id = instanceId, workflow_component = workflowComponent)
589
- except DaprInternalError as err:
590
- if nonExistentIDError in err._message:
591
- print (" Instance Successfully Purged" )
526
+ wfr = wf.WorkflowRuntime()
527
+
528
+
529
+ @wfr.workflow (name = ' random_workflow' )
530
+ def task_chain_workflow (ctx : wf.DaprWorkflowContext, wf_input : int ):
531
+ try :
532
+ result1 = yield ctx.call_activity(step1, input = wf_input)
533
+ result2 = yield ctx.call_activity(step2, input = result1)
534
+ except Exception as e:
535
+ yield ctx.call_activity(error_handler, input = str (e))
536
+ raise
537
+ # TODO update to set custom status
538
+ return [result1, result2]
539
+
592
540
593
- workflowRuntime.shutdown()
541
+ @wfr.activity (name = ' step1' )
542
+ def step1 (ctx , activity_input ):
543
+ print (f ' Step 1: Received input: { activity_input} . ' )
544
+ # Do some work
545
+ return activity_input + 1
546
+
547
+
548
+ @wfr.activity
549
+ def step2 (ctx , activity_input ):
550
+ print (f ' Step 2: Received input: { activity_input} . ' )
551
+ # Do some work
552
+ return activity_input * 2
553
+
554
+ @wfr.activity
555
+ def error_handler (ctx , error ):
556
+ print (f ' Executing error handler: { error} . ' )
557
+ # Do some compensating work
558
+
559
+
560
+ if __name__ == ' __main__' :
561
+ wfr.start()
562
+ sleep(10 ) # wait for workflow runtime to start
563
+
564
+ wf_client = wf.DaprWorkflowClient()
565
+ instance_id = wf_client.schedule_new_workflow(workflow = task_chain_workflow, input = 42 )
566
+ print (f ' Workflow started. Instance ID: { instance_id} ' )
567
+ state = wf_client.wait_for_workflow_completion(instance_id)
568
+ print (f ' Workflow completed! Status: { state.runtime_status} ' )
569
+
570
+ wfr.shutdown()
594
571
```
595
572
596
573
- Learn more about authoring and managing workflows:
597
574
- [ How-To: Author a workflow] ({{< ref howto-author-workflow.md >}}).
598
575
- [ How-To: Manage a workflow] ({{< ref howto-manage-workflow.md >}}).
599
- - Visit [ Python SDK examples] ( https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py ) for code samples and instructions to try out Dapr Workflow.
576
+ - Visit [ Python SDK examples] ( https://github.com/dapr/python-sdk/tree/main/examples/workflow ) for code samples and instructions to try out Dapr Workflow:
577
+ - [ Task chaining example] ( https://github.com/dapr/python-sdk/blob/main/examples/workflow/task_chaining.py )
578
+ - [ Fan-out/Fan-in example] ( https://github.com/dapr/python-sdk/blob/main/examples/workflow/fan_out_fan_in.py )
579
+ - [ Child workflow example] ( https://github.com/dapr/python-sdk/blob/main/examples/workflow/child_workflow.py )
580
+ - [ Human approval example] ( https://github.com/dapr/python-sdk/blob/main/examples/workflow/human_approval.py )
581
+ - [ Monitor example] ( https://github.com/dapr/python-sdk/blob/main/examples/workflow/monitor.py )
582
+
600
583
601
584
602
585
## Related links
0 commit comments