Skip to content

Commit

Permalink
Fixes up the async example to use the tracker
Browse files Browse the repository at this point in the history
But only if the server is running
  • Loading branch information
elijahbenizzy committed Jun 25, 2024
1 parent f6782c8 commit 24bfb6d
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 8 deletions.
6 changes: 6 additions & 0 deletions examples/async/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ You should get the following result:
{"pipeline":{"computation1":false,"computation2":true}}
```

## Tracking

This uses the async tracker if the [ui](https://hamilton.dagworks.io/en/latest/concepts/ui/)
is running on port 8241 -- see [fastapi_example.py](fastapi_example.py) for the code.
If it is not running it will proceed anyway without tracking.


## How it works

Expand Down
10 changes: 5 additions & 5 deletions examples/async/async_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ def bar(request_raw: dict) -> str:
return request_raw.get("bar", "baz")


async def computation1(foo: str, some_data: dict) -> bool:
await asyncio.sleep(1)
return False


async def some_data() -> dict:
async with aiohttp.ClientSession() as session:
async with session.get("http://httpbin.org/get") as resp:
return await resp.json()


async def computation1(foo: str, some_data: dict) -> bool:
await asyncio.sleep(1)
return False


async def computation2(bar: str) -> bool:
await asyncio.sleep(1)
return True
Expand Down
54 changes: 51 additions & 3 deletions examples/async/fastapi_example.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,61 @@
import logging
from contextlib import asynccontextmanager

import aiohttp
import async_module
import fastapi
from aiohttp import client_exceptions
from hamilton_sdk import adapters

from hamilton import base
from hamilton.experimental import h_async

app = fastapi.FastAPI()
logger = logging.getLogger(__name__)

# can instantiate a driver once for the life of the app:
dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult())
dr = None


async def _tracking_server_running():
"""Quickly tells if the tracking server is up and running"""
async with aiohttp.ClientSession() as session:
try:
async with session.get("http://localhost:8241/api/v1/ping") as response:
if response.status == 200:
return True
else:
return False
except client_exceptions.ClientConnectionError:
return False


@asynccontextmanager
async def lifespan(app: fastapi.FastAPI):
"""Fast API lifespan context manager for setting up the driver and tracking adapters
This has to be done async as there are initializers
"""
global dr
is_server_running = await _tracking_server_running()
if not is_server_running:
logger.warning(
"Tracking server is not running, skipping telemetry. To run the telemetry server, run hamilton ui. "
"Note you must have a project with ID 1 if it is running -- if not, you can change the project "
"ID in this file or create a new one from the UI"
)
tracking_adapters = []
if is_server_running:
tracker_async = adapters.AsyncHamiltonTracker(
project_id=1,
username="elijah",
dag_name="async_tracker",
)
tracking_adapters = [tracker_async]
dr = (
await h_async.Builder().with_modules(async_module).with_adapters(*tracking_adapters).build()
)
yield


app = fastapi.FastAPI(lifespan=lifespan)


@app.post("/execute")
Expand Down

0 comments on commit 24bfb6d

Please sign in to comment.