This example demonstrates how to use the Rewire
library for managing the lifecycle of Python functions, both synchronous and asynchronous.
First, let's import the required modules and create a LifecycleModule
instance:
from contextlib import asynccontextmanager
import anyio
from rewire import LifecycleModule
lm = LifecycleModule() # create lifecycle manager
Here are two examples of functions - an asynchronous function my_async_function()
, and a synchronous function my_sync_function()
.
async def my_async_function():
print("My async function ran")
def my_sync_function():
print("My sync function ran")
To start the functions, pass them to the run
method of the lifecycle manager:
lm.run(my_async_function()) # you can pass awaitables
lm.run(my_sync_function) # you can also pass callables (will run in thread)
You can define stop callbacks that will be executed when the lifecycle manager is stopped:
async def my_function_with_stop_callback():
print("Running function with stop callback")
async def stop():
print("Stop callback called")
async def stop_sync():
print("Sync stop callback called")
lm.on_stop(stop) # you add stop hooks after start
lm.on_stop(stop_sync)
Define an asynchronous context manager to run before any run()
functions and exit after all stop()
functions:
@asynccontextmanager
async def my_context_manager():
print("Context manager entered")
try:
yield
finally:
print("Context manager exited")
Apply the context manager to the lifecycle manager:
lm.contextmanager(my_context_manager())
Finally, use anyio.run()
to start the lifecycle manager:
anyio.run(lm.start, True)