From 8407dca0921cee3da825d94634c55c7ac2330e85 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 1 Dec 2022 15:53:47 -0800 Subject: [PATCH] Add replayer sample (#26) --- replay/README.md | 34 +++++++++++++++++ replay/__init__.py | 0 replay/replayer.py | 22 +++++++++++ replay/starter.py | 42 ++++++++++++++++++++ replay/worker.py | 95 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 193 insertions(+) create mode 100644 replay/README.md create mode 100644 replay/__init__.py create mode 100644 replay/replayer.py create mode 100644 replay/starter.py create mode 100644 replay/worker.py diff --git a/replay/README.md b/replay/README.md new file mode 100644 index 00000000..3a67868a --- /dev/null +++ b/replay/README.md @@ -0,0 +1,34 @@ +# Replay Sample + +This sample shows you how you can verify changes to workflow code are compatible with existing +workflow histories. + +To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the +worker: + + poetry run python worker.py + +This will start the worker. Then, in another terminal, run the following to execute a workflow: + + poetry run python starter.py + +Next, run the replayer: + + poetry run python replayer.py + +Which should produce some output like: + + WorkflowReplayResults(replay_failures={}) + +Great! Replay worked. Of course, the reason for the exercise is to catch if you've changed workflow +code in a manner which is *not* compatible with the existing histories. Try it. Open up `worker.py` +and change the `JustActivity` workflow to sleep just before running the activity. Add +`await asyncio.sleep(0.1)` just before the line with `workflow.execute_activity`. + +Now run the replayer again. The results from the `replay_workflows` call now indeed contains a +failure! Something like: + + WorkflowReplayResults(replay_failures={'e6418672-323c-4868-9de4-ece8f34fec53': NondeterminismError('Workflow activation completion failed: Failure { failure: Some(Failure { message: "Nondeterminism(\\"Timer machine does not handle this event: HistoryEvent(id: 8, Some(ActivityTaskScheduled))\\")", source: "", stack_trace: "", encoded_attributes: None, cause: None, failure_info: Some(ApplicationFailureInfo(ApplicationFailureInfo { r#type: "", non_retryable: false, details: None })) }) }')}) + +This is telling you that the workflow is not compatible with the existing history. Phew! Glad we +didn't deploy that one. diff --git a/replay/__init__.py b/replay/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/replay/replayer.py b/replay/replayer.py new file mode 100644 index 00000000..49f16313 --- /dev/null +++ b/replay/replayer.py @@ -0,0 +1,22 @@ +import asyncio + +from temporalio.client import Client +from temporalio.worker import Replayer + +from replay.worker import JustActivity, JustTimer, TimerThenActivity + + +async def main(): + # Connect client + client = await Client.connect("localhost:7233") + + # Fetch the histories of the workflows to be replayed + workflows = client.list_workflows('WorkflowId="replayer-workflow-id"') + histories = workflows.map_histories() + replayer = Replayer(workflows=[JustActivity, JustTimer, TimerThenActivity]) + results = await replayer.replay_workflows(histories, raise_on_replay_failure=False) + print(results) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/replay/starter.py b/replay/starter.py new file mode 100644 index 00000000..daf07098 --- /dev/null +++ b/replay/starter.py @@ -0,0 +1,42 @@ +import asyncio + +from temporalio.client import Client + +from replay.worker import JustActivity, JustTimer, TimerThenActivity + + +async def main(): + # Connect client + client = await Client.connect("localhost:7233") + + # Run a few workflows + # Importantly, normally we would *not* advise re-using the same workflow ID for all of these, + # but we do this to avoid requiring advanced visibility when we want to fetch all the histories + # in the replayer. + result = await client.execute_workflow( + JustActivity.run, + "replayer", + id=f"replayer-workflow-id", + task_queue="replay-sample", + ) + print(f"JustActivity Workflow result: {result}") + + result = await client.execute_workflow( + JustTimer.run, + "replayer", + id=f"replayer-workflow-id", + task_queue="replay-sample", + ) + print(f"JustTimer Workflow result: {result}") + + result = await client.execute_workflow( + TimerThenActivity.run, + "replayer", + id=f"replayer-workflow-id", + task_queue="replay-sample", + ) + print(f"TimerThenActivity Workflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/replay/worker.py b/replay/worker.py new file mode 100644 index 00000000..3aebc099 --- /dev/null +++ b/replay/worker.py @@ -0,0 +1,95 @@ +import asyncio +import logging +from dataclasses import dataclass +from datetime import timedelta + +from temporalio import activity, workflow +from temporalio.client import Client +from temporalio.worker import Worker + + +# While we could use multiple parameters in the activity, Temporal strongly +# encourages using a single dataclass instead which can have fields added to it +# in a backwards-compatible way. +@dataclass +class ComposeGreetingInput: + greeting: str + name: str + + +# Basic activity that logs and does string concatenation +@activity.defn +async def compose_greeting(input: ComposeGreetingInput) -> str: + activity.logger.info("Running activity with parameter %s" % input) + return f"{input.greeting}, {input.name}!" + + +# A workflow which just runs an activity +@workflow.defn +class JustActivity: + @workflow.run + async def run(self, name: str) -> str: + workflow.logger.info("Running just activity workflow with parameter %s" % name) + return await workflow.execute_activity( + compose_greeting, + ComposeGreetingInput("Hello", name), + start_to_close_timeout=timedelta(seconds=10), + ) + + +# A workflow which just runs a timer +@workflow.defn +class JustTimer: + @workflow.run + async def run(self, name: str) -> str: + workflow.logger.info("Running just timer workflow with parameter %s" % name) + await asyncio.sleep(0.1) + return "Slept" + + +# A workflow which runs a timer then an activity +@workflow.defn +class TimerThenActivity: + @workflow.run + async def run(self, name: str) -> str: + workflow.logger.info( + "Running timer then activity workflow with parameter %s" % name + ) + await asyncio.sleep(0.1) + return await workflow.execute_activity( + compose_greeting, + ComposeGreetingInput("Hello", name), + start_to_close_timeout=timedelta(seconds=10), + ) + + +interrupt_event = asyncio.Event() + + +async def main(): + # Uncomment the line below to see logging + logging.basicConfig(level=logging.INFO) + + # Start client + client = await Client.connect("localhost:7233") + + # Run a worker for the workflow + async with Worker( + client, + task_queue="replay-sample", + workflows=[JustActivity, JustTimer, TimerThenActivity], + activities=[compose_greeting], + ): + # Wait until interrupted + print("Worker started, ctrl+c to exit") + await interrupt_event.wait() + print("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens())