Skip to content

Commit

Permalink
Samples for message passing docs (#133)
Browse files Browse the repository at this point in the history
* Message passing introduction sample (used in docs)
  • Loading branch information
dandavison authored Sep 16, 2024
1 parent 7e5aba8 commit 8903768
Show file tree
Hide file tree
Showing 16 changed files with 389 additions and 8 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ Some examples require extra dependencies. See each sample's directory for specif
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [gevent_async](gevent_async) - Combine gevent and Temporal.
* [langchain](langchain) - Orchestrate workflows for LangChain.
* [message-passing introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [prometheus](prometheus) - Configure Prometheus metrics on clients/workers.
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
* [safe_message_handlers](updates_and_signals/safe_message_handlers/) - Safely handling updates and signals.
* [safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
* [sentry](sentry) - Report errors to Sentry.
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
Expand Down
File renamed without changes.
18 changes: 18 additions & 0 deletions message_passing/introduction/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Introduction to message-passing

This sample provides an introduction to using Query, Signal, and Update.

See https://docs.temporal.io/develop/python/message-passing.

To run, first see the main [README.md](../../README.md) for prerequisites.

Then create two terminals and `cd` to this directory.

Run the worker in one terminal:

poetry run python worker.py

And execute the workflow in the other terminal:

poetry run python starter.py

13 changes: 13 additions & 0 deletions message_passing/introduction/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from enum import IntEnum

TASK_QUEUE = "message-passing-introduction-task-queue"


class Language(IntEnum):
ARABIC = 1
CHINESE = 2
ENGLISH = 3
FRENCH = 4
HINDI = 5
PORTUGUESE = 6
SPANISH = 7
25 changes: 25 additions & 0 deletions message_passing/introduction/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import asyncio
from typing import Optional

from temporalio import activity

from message_passing.introduction import Language


@activity.defn
async def call_greeting_service(to_language: Language) -> Optional[str]:
"""
An Activity that simulates a call to a remote greeting service.
The remote greeting service supports the full range of languages.
"""
greetings = {
Language.ARABIC: "مرحبا بالعالم",
Language.CHINESE: "你好,世界",
Language.ENGLISH: "Hello, world",
Language.FRENCH: "Bonjour, monde",
Language.HINDI: "नमस्ते दुनिया",
Language.PORTUGUESE: "Olá mundo",
Language.SPANISH: "¡Hola mundo",
}
await asyncio.sleep(0.2) # Simulate a network call
return greetings.get(to_language)
52 changes: 52 additions & 0 deletions message_passing/introduction/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import asyncio
from typing import Optional

from temporalio.client import Client, WorkflowUpdateStage

from message_passing.introduction import TASK_QUEUE
from message_passing.introduction.workflows import (
ApproveInput,
GetLanguagesInput,
GreetingWorkflow,
Language,
)


async def main(client: Optional[Client] = None):
client = client or await Client.connect("localhost:7233")
wf_handle = await client.start_workflow(
GreetingWorkflow.run,
id="greeting-workflow-1234",
task_queue=TASK_QUEUE,
)

# 👉 Send a Query
supported_languages = await wf_handle.query(
GreetingWorkflow.get_languages, GetLanguagesInput(include_unsupported=False)
)
print(f"supported languages: {supported_languages}")

# 👉 Execute an Update
previous_language = await wf_handle.execute_update(
GreetingWorkflow.set_language, Language.CHINESE
)
current_language = await wf_handle.query(GreetingWorkflow.get_language)
print(f"language changed: {previous_language.name} -> {current_language.name}")

# 👉 Start an Update and then wait for it to complete
update_handle = await wf_handle.start_update(
GreetingWorkflow.set_language_using_activity,
Language.ARABIC,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
)
previous_language = await update_handle.result()
current_language = await wf_handle.query(GreetingWorkflow.get_language)
print(f"language changed: {previous_language.name} -> {current_language.name}")

# 👉 Send a Signal
await wf_handle.signal(GreetingWorkflow.approve, ApproveInput(name=""))
print(await wf_handle.result())


if __name__ == "__main__":
asyncio.run(main())
36 changes: 36 additions & 0 deletions message_passing/introduction/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.worker import Worker

from message_passing.introduction import TASK_QUEUE
from message_passing.introduction.activities import call_greeting_service
from message_passing.introduction.workflows import GreetingWorkflow

interrupt_event = asyncio.Event()


async def main():
logging.basicConfig(level=logging.INFO)

client = await Client.connect("localhost:7233")

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[GreetingWorkflow],
activities=[call_greeting_service],
):
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
115 changes: 115 additions & 0 deletions message_passing/introduction/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from typing import List, Optional

from temporalio import workflow
from temporalio.exceptions import ApplicationError

with workflow.unsafe.imports_passed_through():
from message_passing.introduction import Language
from message_passing.introduction.activities import call_greeting_service


@dataclass
class GetLanguagesInput:
include_unsupported: bool


@dataclass
class ApproveInput:
name: str


@workflow.defn
class GreetingWorkflow:
"""
A workflow that that returns a greeting in one of two languages.
It supports a Query to obtain the current language, an Update to change the
current language and receive the previous language in response, and a Signal
to approve the Workflow so that it is allowed to return its result.
"""

# 👉 This Workflow does not use any async handlers and so cannot use any
# Activities. It only supports two languages, whose greetings are hardcoded
# in the Workflow definition. See GreetingWorkflowWithAsyncHandler below for
# a Workflow that uses an async Update handler to call an Activity.

def __init__(self) -> None:
self.approved_for_release = False
self.approver_name: Optional[str] = None
self.greetings = {
Language.CHINESE: "你好,世界",
Language.ENGLISH: "Hello, world",
}
self.language = Language.ENGLISH
self.lock = asyncio.Lock() # used by the async handler below

@workflow.run
async def run(self) -> str:
# 👉 In addition to waiting for the `approve` Signal, we also wait for
# all handlers to finish. Otherwise, the Workflow might return its
# result while an async set_language_using_activity Update is in
# progress.
await workflow.wait_condition(
lambda: self.approved_for_release and workflow.all_handlers_finished()
)
return self.greetings[self.language]

@workflow.query
def get_languages(self, input: GetLanguagesInput) -> List[Language]:
# 👉 A Query handler returns a value: it can inspect but must not mutate the Workflow state.
if input.include_unsupported:
return sorted(Language)
else:
return sorted(self.greetings)

@workflow.signal
def approve(self, input: ApproveInput) -> None:
# 👉 A Signal handler mutates the Workflow state but cannot return a value.
self.approved_for_release = True
self.approver_name = input.name

@workflow.update
def set_language(self, language: Language) -> Language:
# 👉 An Update handler can mutate the Workflow state and return a value.
previous_language, self.language = self.language, language
return previous_language

@set_language.validator
def validate_language(self, language: Language) -> None:
if language not in self.greetings:
# 👉 In an Update validator you raise any exception to reject the Update.
raise ValueError(f"{language.name} is not supported")

@workflow.update
async def set_language_using_activity(self, language: Language) -> Language:
# 👉 This update handler is async, so it can execute an activity.
if language not in self.greetings:
# 👉 We use a lock so that, if this handler is executed multiple
# times, each execution can schedule the activity only when the
# previously scheduled activity has completed. This ensures that
# multiple calls to set_language are processed in order.
async with self.lock:
greeting = await workflow.execute_activity(
call_greeting_service,
language,
start_to_close_timeout=timedelta(seconds=10),
)
if greeting is None:
# 👉 An update validator cannot be async, so cannot be used
# to check that the remote call_greeting_service supports
# the requested language. Raising ApplicationError will fail
# the Update, but the WorkflowExecutionUpdateAccepted event
# will still be added to history.
raise ApplicationError(
f"Greeting service does not support {language.name}"
)
self.greetings[language] = greeting
previous_language, self.language = self.language, language
return previous_language

@workflow.query
def get_language(self) -> Language:
return self.language
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from temporalio import common
from temporalio.client import Client, WorkflowHandle

from updates_and_signals.safe_message_handlers.workflow import (
from message_passing.safe_message_handlers.workflow import (
ClusterManagerAssignNodesToJobInput,
ClusterManagerDeleteJobInput,
ClusterManagerInput,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from temporalio.client import Client
from temporalio.worker import Worker

from updates_and_signals.safe_message_handlers.workflow import (
from message_passing.safe_message_handlers.workflow import (
ClusterManagerWorkflow,
assign_nodes_to_job,
find_bad_nodes,
Expand All @@ -15,7 +15,6 @@


async def main():
# Connect client
client = await Client.connect("localhost:7233")

async with Worker(
Expand All @@ -24,7 +23,6 @@ async def main():
workflows=[ClusterManagerWorkflow],
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
):
# Wait until interrupted
logging.info("ClusterManagerWorkflow worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from temporalio.common import RetryPolicy
from temporalio.exceptions import ApplicationError

from updates_and_signals.safe_message_handlers.activities import (
from message_passing.safe_message_handlers.activities import (
AssignNodesToJobInput,
FindBadNodesInput,
UnassignNodesForJobInput,
Expand Down
Loading

0 comments on commit 8903768

Please sign in to comment.