From d2954b1e3486af51a51e4de0bcdb30f9c6597a28 Mon Sep 17 00:00:00 2001 From: Eileen Yu Date: Mon, 25 Nov 2024 15:54:47 -0800 Subject: [PATCH] refactor: update python workflow example with daprWorkflowClient Signed-off-by: Eileen Yu --- workflows/python/sdk/README.md | 2 +- workflows/python/sdk/order-processor/app.py | 72 +++++++++---------- .../python/sdk/order-processor/workflow.py | 21 ++++-- 3 files changed, 48 insertions(+), 47 deletions(-) diff --git a/workflows/python/sdk/README.md b/workflows/python/sdk/README.md index 52c603428..7919e3bfb 100644 --- a/workflows/python/sdk/README.md +++ b/workflows/python/sdk/README.md @@ -34,7 +34,7 @@ cd .. name: Running this example expected_stdout_lines: - "There are now 90 cars left in stock" - - "Workflow completed! Result: Completed" + - "Workflow completed!" output_match_mode: substring background: true timeout_seconds: 120 diff --git a/workflows/python/sdk/order-processor/app.py b/workflows/python/sdk/order-processor/app.py index 4d2003897..b46787280 100644 --- a/workflows/python/sdk/order-processor/app.py +++ b/workflows/python/sdk/order-processor/app.py @@ -4,14 +4,12 @@ from dapr.clients import DaprClient from dapr.conf import settings -from dapr.ext.workflow import WorkflowRuntime +from dapr.ext.workflow import DaprWorkflowClient, WorkflowStatus -from workflow import order_processing_workflow, notify_activity, process_payment_activity, \ - verify_inventory_activity, update_inventory_activity, requst_approval_activity +from workflow import wfr, order_processing_workflow from model import InventoryItem, OrderPayload store_name = "statestore" -workflow_component = "dapr" workflow_name = "order_processing_workflow" default_item_name = "cars" @@ -19,39 +17,35 @@ class WorkflowConsoleApp: def main(self): print("*** Welcome to the Dapr Workflow console app sample!", flush=True) print("*** Using this app, you can place orders that start workflows.", flush=True) + + wfr.start() # Wait for the sidecar to become available sleep(5) - workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT) - workflowRuntime.register_workflow(order_processing_workflow) - workflowRuntime.register_activity(notify_activity) - workflowRuntime.register_activity(requst_approval_activity) - workflowRuntime.register_activity(verify_inventory_activity) - workflowRuntime.register_activity(process_payment_activity) - workflowRuntime.register_activity(update_inventory_activity) - workflowRuntime.start() + wfClient = DaprWorkflowClient() + + baseInventory = { + "paperclip": InventoryItem("Paperclip", 5, 100), + "cars": InventoryItem("Cars", 15000, 100), + "computers": InventoryItem("Computers", 500, 100), + } - daprClient = DaprClient(address=f'{settings.DAPR_RUNTIME_HOST}:{settings.DAPR_GRPC_PORT}') - baseInventory = {} - baseInventory["paperclip"] = InventoryItem("Paperclip", 5, 100) - baseInventory["cars"] = InventoryItem("Cars", 15000, 100) - baseInventory["computers"] = InventoryItem("Computers", 500, 100) + daprClient = DaprClient(address=f'{settings.DAPR_RUNTIME_HOST}:{settings.DAPR_GRPC_PORT}') self.restock_inventory(daprClient, baseInventory) print("==========Begin the purchase of item:==========", flush=True) item_name = default_item_name order_quantity = 10 - total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost) + print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True) - start_resp = daprClient.start_workflow(workflow_component=workflow_component, - workflow_name=workflow_name, - input=order) - _id = start_resp.instance_id + instance_id = wfClient.schedule_new_workflow( + workflow=order_processing_workflow, input=order.to_json()) + _id = instance_id - def prompt_for_approval(daprClient: DaprClient): + def prompt_for_approval(wfClient: DaprWorkflowClient): """This is a helper function to prompt for approval. Not using the prompt here ACTUALLY, as quickstart validation is required to be automated. @@ -65,9 +59,9 @@ def prompt_for_approval(daprClient: DaprClient): if state.runtime_status.name == "COMPLETED": return if approved.lower() == "y": - client.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': True}) + wfClient.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': True}) else: - client.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': False}) + wfClient.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': False}) ## Additionally, you would need to import signal and define timeout_error: # import signal @@ -76,32 +70,32 @@ def prompt_for_approval(daprClient: DaprClient): # signal.signal(signal.SIGALRM, timeout_error) """ - daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component, - event_name="manager_approval", event_data={'approval': True}) + wfClient.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': True}) approval_seeked = False start_time = datetime.now() while True: time_delta = datetime.now() - start_time - state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) + state = wfClient.get_workflow_state(instance_id=_id) + if not state: print("Workflow not found!") # not expected - elif state.runtime_status == "Completed" or\ - state.runtime_status == "Failed" or\ - state.runtime_status == "Terminated": + break + + if state.runtime_status in {WorkflowStatus.COMPLETED, WorkflowStatus.FAILED, WorkflowStatus.TERMINATED}: print(f'Workflow completed! Result: {state.runtime_status}', flush=True) break + + if time_delta.total_seconds() >= 10: - state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) - if total_cost > 50000 and ( - state.runtime_status != "Completed" or - state.runtime_status != "Failed" or - state.runtime_status != "Terminated" - ) and not approval_seeked: + state = wfClient.get_workflow_state(instance_id=_id) + if total_cost > 50000 and state not in {WorkflowStatus.COMPLETED, WorkflowStatus.FAILED, WorkflowStatus.TERMINATED} and not approval_seeked: approval_seeked = True - threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start() + threading.Thread(target=prompt_for_approval(wfClient), daemon=True).start() + + wfr.shutdown() + - print("Purchase of item is ", state.runtime_status, flush=True) def restock_inventory(self, daprClient: DaprClient, baseInventory): for key, item in baseInventory.items(): diff --git a/workflows/python/sdk/order-processor/workflow.py b/workflows/python/sdk/order-processor/workflow.py index e93443fd5..8df23e63b 100644 --- a/workflows/python/sdk/order-processor/workflow.py +++ b/workflows/python/sdk/order-processor/workflow.py @@ -1,9 +1,8 @@ - from datetime import timedelta import logging import json -from dapr.ext.workflow import DaprWorkflowContext, WorkflowActivityContext, when_any +from dapr.ext.workflow import DaprWorkflowContext, WorkflowActivityContext, WorkflowRuntime, when_any from dapr.clients import DaprClient from dapr.conf import settings @@ -12,10 +11,13 @@ store_name = "statestore" +wfr = WorkflowRuntime() + logging.basicConfig(level=logging.INFO) -def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: OrderPayload): +@wfr.workflow(name="order_processing_workflow") +def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: str): """Defines the order processing workflow. When the order is received, the inventory is checked to see if there is enough inventory to fulfill the order. If there is enough inventory, the payment is processed and the inventory is @@ -39,7 +41,7 @@ def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: Order return OrderResult(processed=False) if order_payload["total_cost"] > 50000: - yield ctx.call_activity(requst_approval_activity, input=order_payload) + yield ctx.call_activity(request_approval_activity, input=order_payload) approval_task = ctx.wait_for_external_event("manager_approval") timeout_event = ctx.create_timer(timedelta(seconds=200)) winner = yield when_any([approval_task, timeout_event]) @@ -76,7 +78,7 @@ def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: Order message=f'Order {order_id} has completed!')) return OrderResult(processed=True) - +@wfr.activity(name="notify_activity") def notify_activity(ctx: WorkflowActivityContext, input: Notification): """Defines Notify Activity. This is used by the workflow to send out a notification""" # Create a logger @@ -84,7 +86,7 @@ def notify_activity(ctx: WorkflowActivityContext, input: Notification): logger.info(input.message) - +@wfr.activity(name="process_payment_activity") def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest): """Defines Process Payment Activity.This is used by the workflow to process a payment""" logger = logging.getLogger('ProcessPaymentActivity') @@ -94,6 +96,7 @@ def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest logger.info(f'Payment for request ID {input.request_id} processed successfully') +@wfr.activity(name="verify_inventory_activity") def verify_inventory_activity(ctx: WorkflowActivityContext, input: InventoryRequest) -> InventoryResult: """Defines Verify Inventory Activity. This is used by the workflow to verify if inventory @@ -117,6 +120,8 @@ def verify_inventory_activity(ctx: WorkflowActivityContext, return InventoryResult(False, None) + +@wfr.activity(name="update_inventory_activity") def update_inventory_activity(ctx: WorkflowActivityContext, input: PaymentRequest) -> InventoryResult: """Defines Update Inventory Activity. This is used by the workflow to check if inventory @@ -139,7 +144,9 @@ def update_inventory_activity(ctx: WorkflowActivityContext, logger.info(f'There are now {new_quantity} {input.item_being_purchased} left in stock') -def requst_approval_activity(ctx: WorkflowActivityContext, + +@wfr.activity(name="request_approval_activity") +def request_approval_activity(ctx: WorkflowActivityContext, input: OrderPayload): """Defines Request Approval Activity. This is used by the workflow to request approval for payment of an order. This activity is used only if the order total cost is greater than