Skip to content

Commit

Permalink
Add distributed tracing middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
hinthornw committed Oct 10, 2024
1 parent 550b28d commit 6317c25
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 13 deletions.
40 changes: 40 additions & 0 deletions python/langsmith/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Middleware for making it easier to do distributed tracing."""

Check notice on line 1 in python/langsmith/middleware.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

......................................... create_5_000_run_trees: Mean +- std dev: 565 ms +- 44 ms ......................................... create_10_000_run_trees: Mean +- std dev: 1.12 sec +- 0.06 sec ......................................... create_20_000_run_trees: Mean +- std dev: 1.12 sec +- 0.06 sec ......................................... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 766 us +- 10 us ......................................... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 27.2 ms +- 0.3 ms ......................................... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 111 ms +- 2 ms ......................................... dumps_dataclass_nested_50x100: Mean +- std dev: 27.5 ms +- 0.3 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (15.7 ms) is 26% of the mean (60.7 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 60.7 ms +- 15.7 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (29.6 ms) is 14% of the mean (214 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydanticv1_nested_50x100: Mean +- std dev: 214 ms +- 30 ms

Check notice on line 1 in python/langsmith/middleware.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+---------+-----------------------+ | Benchmark | main | changes | +===============================================+=========+=======================+ | dumps_dataclass_nested_50x100 | 27.9 ms | 27.5 ms: 1.02x faster | +-----------------------------------------------+---------+-----------------------+ | dumps_class_nested_py_leaf_100x200 | 113 ms | 111 ms: 1.02x faster | +-----------------------------------------------+---------+-----------------------+ | dumps_class_nested_py_leaf_50x100 | 27.3 ms | 27.2 ms: 1.00x faster | +-----------------------------------------------+---------+-----------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 762 us | 766 us: 1.01x slower | +-----------------------------------------------+---------+-----------------------+ | Geometric mean | (ref) | 1.01x faster | +-----------------------------------------------+---------+-----------------------+ Benchmark hidden because not significant (5): dumps_pydantic_nested_50x100, dumps_pydanticv1_nested_50x100, create_5_000_run_trees, create_10_000_run_trees, create_20_000_run_trees


class TracingMiddleware:
"""Middleware for propagating distributed tracing context using LangSmith.
This middleware checks for the 'langsmith-trace' header and propagates the
tracing context if present. It does not start new traces by default.
It is designed to work with ASGI applications.
Attributes:
app: The ASGI application being wrapped.
"""

def __init__(self, app):
"""Initialize the middleware."""
from langsmith.run_helpers import tracing_context # type: ignore

self._with_headers = tracing_context
self.app = app

async def __call__(self, scope: dict, receive, send):
"""Handle incoming requests and propagate tracing context if applicable.
Args:
scope: A dict containing ASGI connection scope.
receive: An awaitable callable for receiving ASGI events.
send: An awaitable callable for sending ASGI events.
If the request is HTTP and contains the 'langsmith-trace' header,
it propagates the tracing context before calling the wrapped application.
Otherwise, it calls the application directly without modifying the context.
"""
if scope["type"] == "http" and "headers" in scope:
headers = dict(scope["headers"])
if b"langsmith-trace" in headers:
with self._with_headers(parent=headers):
await self.app(scope, receive, send)
return
await self.app(scope, receive, send)
43 changes: 32 additions & 11 deletions python/langsmith/run_trees.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import json
import logging
import sys
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union, cast
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union, cast
from uuid import UUID, uuid4

try:
Expand All @@ -26,7 +27,11 @@
logger = logging.getLogger(__name__)

LANGSMITH_PREFIX = "langsmith-"
LANGSMITH_DOTTED_ORDER = f"{LANGSMITH_PREFIX}trace"
LANGSMITH_DOTTED_ORDER = sys.intern(f"{LANGSMITH_PREFIX}trace")
LANGSMITH_DOTTED_ORDER_BYTES = LANGSMITH_DOTTED_ORDER.encode("utf-8")
LANGSMITH_METADATA = sys.intern(f"{LANGSMITH_PREFIX}metadata")
LANGSMITH_TAGS = sys.intern(f"{LANGSMITH_PREFIX}tags")
LANGSMITH_PROJECT = sys.intern(f"{LANGSMITH_PREFIX}project")
_CLIENT: Optional[Client] = None
_LOCK = threading.Lock() # Keeping around for a while for backwards compat

Expand Down Expand Up @@ -332,9 +337,9 @@ def from_dotted_order(
RunTree: The new span.
"""
headers = {
f"{LANGSMITH_DOTTED_ORDER}": dotted_order,
LANGSMITH_DOTTED_ORDER: dotted_order,
}
return cast(RunTree, cls.from_headers(headers, **kwargs))
return cast(RunTree, cls.from_headers(headers, **kwargs)) # type: ignore[arg-type]

@classmethod
def from_runnable_config(
Expand Down Expand Up @@ -402,7 +407,9 @@ def from_runnable_config(
return None

@classmethod
def from_headers(cls, headers: Dict[str, str], **kwargs: Any) -> Optional[RunTree]:
def from_headers(
cls, headers: Mapping[Union[str, bytes], Union[str, bytes]], **kwargs: Any
) -> Optional[RunTree]:
"""Create a new 'parent' span from the provided headers.
Extracts parent span information from the headers and creates a new span.
Expand All @@ -415,9 +422,14 @@ def from_headers(cls, headers: Dict[str, str], **kwargs: Any) -> Optional[RunTre
"""
init_args = kwargs.copy()

langsmith_trace = headers.get(f"{LANGSMITH_DOTTED_ORDER}")
langsmith_trace = cast(Optional[str], headers.get(LANGSMITH_DOTTED_ORDER))
if not langsmith_trace:
return # type: ignore[return-value]
langsmith_trace_bytes = cast(
Optional[bytes], headers.get(LANGSMITH_DOTTED_ORDER_BYTES)
)
if not langsmith_trace_bytes:
return # type: ignore[return-value]
langsmith_trace = langsmith_trace_bytes.decode("utf-8")

parent_dotted_order = langsmith_trace.strip()
parsed_dotted_order = _parse_dotted_order(parent_dotted_order)
Expand All @@ -436,7 +448,7 @@ def from_headers(cls, headers: Dict[str, str], **kwargs: Any) -> Optional[RunTre
init_args["run_type"] = init_args.get("run_type") or "chain"
init_args["name"] = init_args.get("name") or "parent"

baggage = _Baggage.from_header(headers.get("baggage"))
baggage = _Baggage.from_headers(headers)
if baggage.metadata or baggage.tags:
init_args["extra"] = init_args.setdefault("extra", {})
init_args["extra"]["metadata"] = init_args["extra"].setdefault(
Expand Down Expand Up @@ -490,17 +502,26 @@ def from_header(cls, header_value: Optional[str]) -> _Baggage:
try:
for item in header_value.split(","):
key, value = item.split("=", 1)
if key == f"{LANGSMITH_PREFIX}metadata":
if key == LANGSMITH_METADATA:
metadata = json.loads(urllib.parse.unquote(value))
elif key == f"{LANGSMITH_PREFIX}tags":
elif key == LANGSMITH_TAGS:
tags = urllib.parse.unquote(value).split(",")
elif key == f"{LANGSMITH_PREFIX}project":
elif key == LANGSMITH_PROJECT:
project_name = urllib.parse.unquote(value)
except Exception as e:
logger.warning(f"Error parsing baggage header: {e}")

return cls(metadata=metadata, tags=tags, project_name=project_name)

@classmethod
def from_headers(cls, headers: Mapping[Union[str, bytes], Any]) -> _Baggage:
if "baggage" in headers:
return cls.from_header(headers["baggage"])
elif b"baggage" in headers:
return cls.from_header(cast(bytes, headers[b"baggage"]).decode("utf-8"))
else:
return cls.from_header(None)

def to_header(self) -> str:
"""Return the Baggage object as a header value."""
items = []
Expand Down
4 changes: 2 additions & 2 deletions python/tests/integration_tests/fake_server.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from fastapi import FastAPI, Request

from langsmith import traceable
from langsmith.middleware import TracingMiddleware
from langsmith.run_helpers import get_current_run_tree, trace, tracing_context

fake_app = FastAPI()
fake_app.add_middleware(TracingMiddleware)


@traceable
Expand Down Expand Up @@ -47,13 +49,11 @@ async def fake_route(request: Request):
with trace(
"Trace",
project_name="Definitely-not-your-grandpas-project",
parent=request.headers,
):
fake_function()
fake_function_two(
"foo",
langsmith_extra={
"parent": request.headers,
"project_name": "Definitely-not-your-grandpas-project",
},
)
Expand Down

0 comments on commit 6317c25

Please sign in to comment.