Skip to content

Commit

Permalink
Message passing introduction sample (used in docs)
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Aug 9, 2024
1 parent bb6ba95 commit afa9b0b
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 0 deletions.
48 changes: 48 additions & 0 deletions message_passing/introduction/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import asyncio

from temporalio import common
from temporalio.client import Client, WorkflowUpdateStage

from message_passing.introduction.workflow import (
ApproveInput,
GetLanguagesInput,
GreetingWorkflow,
Language,
)


async def main():
client = await Client.connect("localhost:7233")
wf_handle = await client.start_workflow(
GreetingWorkflow.run,
id="greeting-workflow-1234",
task_queue="message-passing-introduction-task-queue",
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
)

supported_languages = await wf_handle.query(
GreetingWorkflow.get_languages, GetLanguagesInput(include_unsupported=False)
)
print(f"supported languages: {supported_languages}")

previous_language = await wf_handle.execute_update(
GreetingWorkflow.set_language, Language.CHINESE
)
current_language = await wf_handle.query(GreetingWorkflow.get_language)
print(f"language changed: {previous_language.name} -> {current_language.name}")

update_handle = await wf_handle.start_update(
GreetingWorkflow.set_language,
Language.ENGLISH,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
)
previous_language = await update_handle.result()
current_language = await wf_handle.query(GreetingWorkflow.get_language)
print(f"language changed: {previous_language.name} -> {current_language.name}")

await wf_handle.signal(GreetingWorkflow.approve, ApproveInput(name=""))
print(await wf_handle.result())


if __name__ == "__main__":
asyncio.run(main())
33 changes: 33 additions & 0 deletions message_passing/introduction/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.worker import Worker

from message_passing.introduction.workflow import GreetingWorkflow

interrupt_event = asyncio.Event()


async def main():
logging.basicConfig(level=logging.INFO)

client = await Client.connect("localhost:7233")

async with Worker(
client,
task_queue="message-passing-introduction-task-queue",
workflows=[GreetingWorkflow],
):
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
70 changes: 70 additions & 0 deletions message_passing/introduction/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional

from temporalio import workflow


class Language(IntEnum):
CHINESE = 1
ENGLISH = 2
FRENCH = 3
SPANISH = 4
PORTUGUESE = 5


@dataclass
class GetLanguagesInput:
include_unsupported: bool


@dataclass
class ApproveInput:
name: str


@workflow.defn
class GreetingWorkflow:
def __init__(self) -> None:
self.approved_for_release = False
self.approver_name: Optional[str] = None
self.language = Language.ENGLISH
self.greetings = {
Language.ENGLISH: "Hello, world",
Language.CHINESE: "你好,世界",
}

@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: self.approved_for_release)
return self.greetings[self.language]

@workflow.query
def get_languages(self, input: GetLanguagesInput) -> list[Language]:
# 👉 A Query handler returns a value: it can inspect but must not mutate the Workflow state.
if input.include_unsupported:
return list(Language)
else:
return list(self.greetings)

@workflow.signal
def approve(self, input: ApproveInput) -> None:
# 👉 A Signal handler mutates the Workflow state but cannot return a value.
self.approved_for_release = True
self.approver_name = input.name

@workflow.update
def set_language(self, language: Language) -> Language:
# 👉 An Update handler can mutate the Workflow state and return a value.
previous_language, self.language = self.language, language
return previous_language

@set_language.validator
def validate_language(self, language: Language) -> None:
if language not in self.greetings:
# 👉 In an Update validator you raise any exception to reject the Update.
raise ValueError(f"{language.name} is not supported")

@workflow.query
def get_language(self) -> Language:
return self.language

0 comments on commit afa9b0b

Please sign in to comment.