Skip to content

Commit

Permalink
Add replayer sample (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Dec 1, 2022
1 parent b4ef139 commit 8407dca
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 0 deletions.
34 changes: 34 additions & 0 deletions replay/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Replay Sample

This sample shows you how you can verify changes to workflow code are compatible with existing
workflow histories.

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
worker:

poetry run python worker.py

This will start the worker. Then, in another terminal, run the following to execute a workflow:

poetry run python starter.py

Next, run the replayer:

poetry run python replayer.py

Which should produce some output like:

WorkflowReplayResults(replay_failures={})

Great! Replay worked. Of course, the reason for the exercise is to catch if you've changed workflow
code in a manner which is *not* compatible with the existing histories. Try it. Open up `worker.py`
and change the `JustActivity` workflow to sleep just before running the activity. Add
`await asyncio.sleep(0.1)` just before the line with `workflow.execute_activity`.

Now run the replayer again. The results from the `replay_workflows` call now indeed contains a
failure! Something like:

WorkflowReplayResults(replay_failures={'e6418672-323c-4868-9de4-ece8f34fec53': NondeterminismError('Workflow activation completion failed: Failure { failure: Some(Failure { message: "Nondeterminism(\\"Timer machine does not handle this event: HistoryEvent(id: 8, Some(ActivityTaskScheduled))\\")", source: "", stack_trace: "", encoded_attributes: None, cause: None, failure_info: Some(ApplicationFailureInfo(ApplicationFailureInfo { r#type: "", non_retryable: false, details: None })) }) }')})

This is telling you that the workflow is not compatible with the existing history. Phew! Glad we
didn't deploy that one.
Empty file added replay/__init__.py
Empty file.
22 changes: 22 additions & 0 deletions replay/replayer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio

from temporalio.client import Client
from temporalio.worker import Replayer

from replay.worker import JustActivity, JustTimer, TimerThenActivity


async def main():
# Connect client
client = await Client.connect("localhost:7233")

# Fetch the histories of the workflows to be replayed
workflows = client.list_workflows('WorkflowId="replayer-workflow-id"')
histories = workflows.map_histories()
replayer = Replayer(workflows=[JustActivity, JustTimer, TimerThenActivity])
results = await replayer.replay_workflows(histories, raise_on_replay_failure=False)
print(results)


if __name__ == "__main__":
asyncio.run(main())
42 changes: 42 additions & 0 deletions replay/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import asyncio

from temporalio.client import Client

from replay.worker import JustActivity, JustTimer, TimerThenActivity


async def main():
# Connect client
client = await Client.connect("localhost:7233")

# Run a few workflows
# Importantly, normally we would *not* advise re-using the same workflow ID for all of these,
# but we do this to avoid requiring advanced visibility when we want to fetch all the histories
# in the replayer.
result = await client.execute_workflow(
JustActivity.run,
"replayer",
id=f"replayer-workflow-id",
task_queue="replay-sample",
)
print(f"JustActivity Workflow result: {result}")

result = await client.execute_workflow(
JustTimer.run,
"replayer",
id=f"replayer-workflow-id",
task_queue="replay-sample",
)
print(f"JustTimer Workflow result: {result}")

result = await client.execute_workflow(
TimerThenActivity.run,
"replayer",
id=f"replayer-workflow-id",
task_queue="replay-sample",
)
print(f"TimerThenActivity Workflow result: {result}")


if __name__ == "__main__":
asyncio.run(main())
95 changes: 95 additions & 0 deletions replay/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import asyncio
import logging
from dataclasses import dataclass
from datetime import timedelta

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker


# While we could use multiple parameters in the activity, Temporal strongly
# encourages using a single dataclass instead which can have fields added to it
# in a backwards-compatible way.
@dataclass
class ComposeGreetingInput:
greeting: str
name: str


# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"


# A workflow which just runs an activity
@workflow.defn
class JustActivity:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running just activity workflow with parameter %s" % name)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)


# A workflow which just runs a timer
@workflow.defn
class JustTimer:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running just timer workflow with parameter %s" % name)
await asyncio.sleep(0.1)
return "Slept"


# A workflow which runs a timer then an activity
@workflow.defn
class TimerThenActivity:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info(
"Running timer then activity workflow with parameter %s" % name
)
await asyncio.sleep(0.1)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)


interrupt_event = asyncio.Event()


async def main():
# Uncomment the line below to see logging
logging.basicConfig(level=logging.INFO)

# Start client
client = await Client.connect("localhost:7233")

# Run a worker for the workflow
async with Worker(
client,
task_queue="replay-sample",
workflows=[JustActivity, JustTimer, TimerThenActivity],
activities=[compose_greeting],
):
# Wait until interrupted
print("Worker started, ctrl+c to exit")
await interrupt_event.wait()
print("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())

0 comments on commit 8407dca

Please sign in to comment.