-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathhello_activity_multiprocess.py
82 lines (69 loc) · 2.85 KB
/
hello_activity_multiprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import asyncio
import multiprocessing
import os
import time
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import SharedStateManager, Worker
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
@activity.defn
def compose_greeting(input: ComposeGreetingInput) -> str:
# We'll wait for 3 seconds, heartbeating in between (like all long-running
# activities should do), then return the greeting
for _ in range(0, 3):
print(f"Heartbeating activity on PID {os.getpid()}")
activity.heartbeat()
time.sleep(1)
return f"{input.greeting}, {input.name}!"
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
# Always set a heartbeat timeout for long-running activities
heartbeat_timeout=timedelta(seconds=2),
)
async def main():
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-multiprocess-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
# Synchronous activities are not allowed unless we provide some kind of
# executor. Here we are giving a process pool executor which means the
# activity will actually run in a separate process. This same executor
# could be passed to multiple workers if desired.
activity_executor=ProcessPoolExecutor(5),
# Since we are using an executor that is not a thread pool executor,
# Temporal needs some kind of manager to share state such as
# cancellation info and heartbeat info between the host and the
# activity. Therefore, we must provide a shared_state_manager here. A
# helper is provided to create it from a multiprocessing manager.
shared_state_manager=SharedStateManager.create_from_multiprocessing(
multiprocessing.Manager()
),
):
# While the worker is running, use the client to run the workflow and
# print out its result. Note, in many production setups, the client
# would be in a completely separate process from the worker.
result = await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="hello-activity-multiprocess-workflow-id",
task_queue="hello-activity-multiprocess-task-queue",
)
print(f"Result on PID {os.getpid()}: {result}")
if __name__ == "__main__":
asyncio.run(main())