-
Notifications
You must be signed in to change notification settings - Fork 56
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
9ba977b
commit 1c1e69c
Showing
6 changed files
with
192 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# Update With Start: Shopping Cart | ||
|
||
This sample illustrates the use of update-with-start to send Updates to a Workflow, starting the Workflow if | ||
it is not running yet. The Workflow represents a Shopping Cart in an e-commerce application, and | ||
update-with-start is used to add items to the cart, receiving back the updated cart subtotal. | ||
|
||
Run the following from this directory: | ||
|
||
poetry run python worker.py | ||
|
||
Then, in another terminal: | ||
|
||
poetry run python starter.py | ||
|
||
This will start a worker to run your workflow and activities, then simulate a backend application receiving | ||
requests to add items to a shopping cart, before finalizing the order. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
TASK_QUEUE = "uws" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
import asyncio | ||
from dataclasses import dataclass | ||
from decimal import Decimal | ||
from typing import Optional | ||
|
||
from temporalio import activity | ||
|
||
|
||
@dataclass | ||
class ShoppingCartItem: | ||
sku: str | ||
quantity: int | ||
|
||
|
||
@activity.defn | ||
async def get_price(item: ShoppingCartItem) -> Optional[str]: | ||
await asyncio.sleep(0.1) | ||
price = None if item.sku == "sku-456" else Decimal("5.99") | ||
if price is None: | ||
return None | ||
return str(price * item.quantity) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
import asyncio | ||
from decimal import Decimal | ||
from typing import Optional, Tuple | ||
|
||
from temporalio import common | ||
from temporalio.client import ( | ||
Client, | ||
WithStartWorkflowOperation, | ||
WorkflowHandle, | ||
WorkflowUpdateFailedError, | ||
) | ||
|
||
from message_passing.update_shopping_cart.workflows import ( | ||
ShoppingCartItem, | ||
ShoppingCartWorkflow, | ||
) | ||
|
||
|
||
async def handle_add_item_request( | ||
session_id: str, item_id: str, quantity: int, temporal_client: Client | ||
) -> Tuple[Optional[Decimal], WorkflowHandle]: | ||
""" | ||
Handle a client request to add an item to the shopping cart. The user is not logged in, but a session ID is | ||
available from a cookie, and we use this as the cart ID. The Temporal client was created at service-start | ||
time and is shared by all request handlers. | ||
A Workflow Type exists that can be used to represent a shopping cart. The method uses update-with-start to | ||
add an item to the shopping cart, creating the cart if it doesn't already exist. | ||
Note that the workflow handle is available, even if the Update fails. | ||
""" | ||
cart_id = f"cart-{session_id}" | ||
start_op = WithStartWorkflowOperation( | ||
ShoppingCartWorkflow.run, | ||
id=cart_id, | ||
id_conflict_policy=common.WorkflowIDConflictPolicy.USE_EXISTING, | ||
task_queue="uws", | ||
) | ||
try: | ||
price = Decimal( | ||
await temporal_client.execute_update_with_start( | ||
ShoppingCartWorkflow.add_item, | ||
ShoppingCartItem(sku=item_id, quantity=quantity), | ||
start_workflow_operation=start_op, | ||
) | ||
) | ||
except WorkflowUpdateFailedError: | ||
price = None | ||
|
||
return price, await start_op.workflow_handle() | ||
|
||
|
||
async def main(): | ||
print("🛒") | ||
temporal_client = await Client.connect("localhost:7233") | ||
subtotal_1, _ = await handle_add_item_request( | ||
"session-777", "sku-123", 1, temporal_client | ||
) | ||
subtotal_2, wf_handle = await handle_add_item_request( | ||
"session-777", "sku-456", 1, temporal_client | ||
) | ||
print(f"subtotals were, {[subtotal_1, subtotal_2]}") | ||
await wf_handle.signal(ShoppingCartWorkflow.checkout) | ||
final_order = await wf_handle.result() | ||
print(f"final order: {final_order}") | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
import asyncio | ||
import logging | ||
|
||
from temporalio.client import Client | ||
from temporalio.worker import Worker | ||
|
||
from message_passing.update_shopping_cart import TASK_QUEUE, workflows | ||
|
||
interrupt_event = asyncio.Event() | ||
|
||
|
||
async def main(): | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
client = await Client.connect("localhost:7233") | ||
|
||
async with Worker( | ||
client, | ||
task_queue=TASK_QUEUE, | ||
workflows=[workflows.ShoppingCartWorkflow], | ||
): | ||
logging.info("Worker started, ctrl+c to exit") | ||
await interrupt_event.wait() | ||
logging.info("Shutting down") | ||
|
||
|
||
if __name__ == "__main__": | ||
loop = asyncio.new_event_loop() | ||
try: | ||
loop.run_until_complete(main()) | ||
except KeyboardInterrupt: | ||
interrupt_event.set() | ||
loop.run_until_complete(loop.shutdown_asyncgens()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
from dataclasses import dataclass | ||
from decimal import Decimal | ||
from typing import Tuple | ||
|
||
from temporalio import workflow | ||
from temporalio.exceptions import ApplicationError | ||
|
||
from message_passing.update_shopping_cart.activities import ShoppingCartItem, get_price | ||
|
||
|
||
@dataclass | ||
class FinalizedOrder: | ||
id: str | ||
items: list[Tuple[ShoppingCartItem, str]] | ||
total: str | ||
|
||
|
||
@workflow.defn | ||
class ShoppingCartWorkflow: | ||
def __init__(self): | ||
self.items: list[Tuple[ShoppingCartItem, Decimal]] = [] | ||
self.order_submitted = False | ||
|
||
@workflow.run | ||
async def run(self) -> FinalizedOrder: | ||
await workflow.wait_condition( | ||
lambda: workflow.all_handlers_finished() and self.order_submitted | ||
) | ||
return FinalizedOrder( | ||
id=workflow.info().workflow_id, | ||
items=[(item, str(price)) for item, price in self.items], | ||
total=str( | ||
sum(item.quantity * price for item, price in self.items) | ||
or Decimal("0.00") | ||
), | ||
) | ||
|
||
@workflow.update | ||
async def add_item(self, item: ShoppingCartItem) -> str: | ||
price = await get_price(item) | ||
if price is None: | ||
raise ApplicationError( | ||
f"Item unavailable: {item}", | ||
) | ||
self.items.append((item, Decimal(price))) | ||
return str( | ||
sum(item.quantity * price for item, price in self.items) or Decimal("0.00") | ||
) | ||
|
||
@workflow.signal | ||
def checkout(self): | ||
self.order_submitted = True |