Skip to content

Commit

Permalink
Gevent sample (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Oct 20, 2023
1 parent 2cb0fdd commit c587d20
Show file tree
Hide file tree
Showing 13 changed files with 530 additions and 2 deletions.
10 changes: 9 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: true
matrix:
python: ["3.7", "3.10"]
python: ["3.7", "3.11"]
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
Expand All @@ -32,3 +32,11 @@ jobs:
- run: poe test -s -o log_cli_level=DEBUG
- run: poe test -s -o log_cli_level=DEBUG --workflow-environment time-skipping

# On non-3.7, run gevent test
- name: Gevent test
if: ${{ matrix.python != '3.7' }}
run: |
poetry install --with gevent
poetry run python gevent_async/test/run_combined.py
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [gevent_async](gevent_async) - Combine gevent and Temporal.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
Expand Down
28 changes: 28 additions & 0 deletions gevent_async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Gevent Sample

This sample shows how to run Temporal in an environment that gevent has patched.

Gevent is built to patch Python libraries to attempt to seamlessly convert threaded code into coroutine-based code.
However, it is well known within the gevent community that it does not work well with `asyncio`, which is the modern
Python approach to coroutines. Temporal leverages `asyncio` which means by default it is incompatible with gevent. Users
are encouraged to abandon gevent in favor of more modern approaches where they can but it is not always possible.

This sample shows how to use a customized gevent executor to run `asyncio` Temporal clients, workers, activities, and
workflows.

For this sample, the optional `gevent` dependency group must be included. To include, run:

poetry install --with gevent

To run the sample, first see [README.md](../README.md) for prerequisites such as having a localhost Temporal server
running. Then, run the following from this directory to start the worker:

poetry run python worker.py

This will start the worker. The worker has a workflow and two activities, one `asyncio` based and one gevent based. Now
in another terminal, run the following from this directory to execute the workflow:

poetry run python starter.py

The workflow should run and complete with the hello result. Note on the worker terminal there will be logs of the
workflow and activity executions.
Empty file added gevent_async/__init__.py
Empty file.
25 changes: 25 additions & 0 deletions gevent_async/activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from dataclasses import dataclass

import gevent
from temporalio import activity


@dataclass
class ComposeGreetingInput:
greeting: str
name: str


@activity.defn
async def compose_greeting_async(input: ComposeGreetingInput) -> str:
activity.logger.info(f"Running async activity with parameter {input}")
return f"{input.greeting}, {input.name}!"


@activity.defn
def compose_greeting_sync(input: ComposeGreetingInput) -> str:
activity.logger.info(
f"Running sync activity with parameter {input}, "
f"in greenlet: {gevent.getcurrent()}"
)
return f"{input.greeting}, {input.name}!"
41 changes: 41 additions & 0 deletions gevent_async/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import functools
from concurrent.futures import Future
from typing import Callable, TypeVar

from gevent import threadpool
from typing_extensions import ParamSpec

T = TypeVar("T")
P = ParamSpec("P")


class GeventExecutor(threadpool.ThreadPoolExecutor):
def submit(
self, fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs
) -> Future[T]:
# Gevent's returned futures do not map well to Python futures, so we
# must translate. We can't just use set_result/set_exception because
# done callbacks are not always called in gevent's case and it doesn't
# seem to support cancel, so we instead wrap the caller function.
python_fut: Future[T] = Future()

@functools.wraps(fn)
def wrapper(*w_args: P.args, **w_kwargs: P.kwargs) -> None:
try:
result = fn(*w_args, **w_kwargs)
# Swallow InvalidStateError in case Python future was cancelled
try:
python_fut.set_result(result)
except:
pass
except Exception as exc:
# Swallow InvalidStateError in case Python future was cancelled
try:
python_fut.set_exception(exc)
except:
pass

# Submit our wrapper to gevent
super().submit(wrapper, *args, **kwargs)
# Return Python future to user
return python_fut
40 changes: 40 additions & 0 deletions gevent_async/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Init gevent
from gevent import monkey

monkey.patch_all()

import asyncio
import logging

from temporalio.client import Client

from gevent_async import workflow
from gevent_async.executor import GeventExecutor


def main():
logging.basicConfig(level=logging.INFO)

# Create single-worker gevent executor and run asyncio.run(async_main()) in
# it, waiting for result. This executor cannot be used for anything else in
# Temporal, it is just a single thread for running asyncio.
with GeventExecutor(max_workers=1) as executor:
executor.submit(asyncio.run, async_main()).result()


async def async_main():
# Connect client
client = await Client.connect("localhost:7233")

# Run workflow
result = await client.execute_workflow(
workflow.GreetingWorkflow.run,
"Temporal",
id="gevent_async-workflow-id",
task_queue="gevent_async-task-queue",
)
logging.info(f"Workflow result: {result}")


if __name__ == "__main__":
main()
Empty file added gevent_async/test/__init__.py
Empty file.
56 changes: 56 additions & 0 deletions gevent_async/test/run_combined.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Init gevent
from gevent import monkey

monkey.patch_all()

import asyncio
import logging

from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from gevent_async import activity, workflow
from gevent_async.executor import GeventExecutor

# This basically combines ../worker.py and ../starter.py for use by CI to
# confirm this works in all environments


def main():
logging.basicConfig(level=logging.INFO)
with GeventExecutor(max_workers=1) as executor:
executor.submit(asyncio.run, async_main()).result()


async def async_main():
logging.info("Starting local server")
async with await WorkflowEnvironment.start_local() as env:
logging.info("Starting worker")
with GeventExecutor(max_workers=200) as executor:
async with Worker(
env.client,
task_queue="gevent_async-task-queue",
workflows=[workflow.GreetingWorkflow],
activities=[
activity.compose_greeting_async,
activity.compose_greeting_sync,
],
activity_executor=executor,
workflow_task_executor=executor,
max_concurrent_activities=100,
max_concurrent_workflow_tasks=100,
):
logging.info("Running workflow")
result = await env.client.execute_workflow(
workflow.GreetingWorkflow.run,
"Temporal",
id="gevent_async-workflow-id",
task_queue="gevent_async-task-queue",
)
if result != "Hello, Temporal!":
raise RuntimeError(f"Unexpected result: {result}")
logging.info(f"Workflow complete, result: {result}")


if __name__ == "__main__":
main()
76 changes: 76 additions & 0 deletions gevent_async/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Init gevent
from gevent import monkey

monkey.patch_all()

import asyncio
import logging
import signal

import gevent
from temporalio.client import Client
from temporalio.worker import Worker

from gevent_async import activity, workflow
from gevent_async.executor import GeventExecutor


def main():
logging.basicConfig(level=logging.INFO)

# Create single-worker gevent executor and run asyncio.run(async_main()) in
# it, waiting for result. This executor cannot be used for anything else in
# Temporal, it is just a single thread for running asyncio. This means that
# inside of async_main we must create another executor specifically for
# executing activity and workflow tasks.
with GeventExecutor(max_workers=1) as executor:
executor.submit(asyncio.run, async_main()).result()


async def async_main():
# Create ctrl+c handler. We do this by telling gevent on SIGINT to set the
# asyncio event. But asyncio calls are not thread safe, so we have to invoke
# it via call_soon_threadsafe.
interrupt_event = asyncio.Event()
gevent.signal_handler(
signal.SIGINT,
asyncio.get_running_loop().call_soon_threadsafe,
interrupt_event.set,
)

# Connect client
client = await Client.connect("localhost:7233")

# Create an executor for use by Temporal. This cannot be the outer one
# running this async main. The max_workers here needs to have enough room to
# support the max concurrent activities/workflows settings.
with GeventExecutor(max_workers=200) as executor:

# Run a worker for the workflow and activities
async with Worker(
client,
task_queue="gevent_async-task-queue",
workflows=[workflow.GreetingWorkflow],
activities=[
activity.compose_greeting_async,
activity.compose_greeting_sync,
],
# Set the executor for activities (only used for non-async
# activities) and workflow tasks
activity_executor=executor,
workflow_task_executor=executor,
# Set the max concurrent activities/workflows. These are the same as
# the defaults, but this makes it clear that the 100 + 100 = 200 for
# max_workers settings.
max_concurrent_activities=100,
max_concurrent_workflow_tasks=100,
):

# Wait until interrupted
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
main()
34 changes: 34 additions & 0 deletions gevent_async/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from gevent_async.activity import (
ComposeGreetingInput,
compose_greeting_async,
compose_greeting_sync,
)


@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)

# Run an async and a sync activity
async_res = await workflow.execute_activity(
compose_greeting_async,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
sync_res = await workflow.execute_activity(
compose_greeting_sync,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)

# Confirm the same, return one
if async_res != sync_res:
raise ValueError("Results are not the same")
return sync_res
Loading

0 comments on commit c587d20

Please sign in to comment.