Skip to content

Commit

Permalink
Merge branch 'main' into pw/add-single-aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
PatrykWyzgowski committed Jul 19, 2024
2 parents 2918ba5 + ea687f8 commit 0b7e50a
Show file tree
Hide file tree
Showing 18 changed files with 708 additions and 46 deletions.
Binary file added docs/assets/otel_handler_jeager.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
126 changes: 126 additions & 0 deletions docs/how-to/trace_runs_with_otel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# How-To: Trace runs with OpenTelemetry

db-ally provides you a way to track execution of the query processing using
[OpenTelemetry](https://opentelemetry.io/) standard. As db-ally is a library, it only depends on the
[OpenTelemtry API](https://opentelemetry.io/docs/specs/otel/overview/#api). For projects that use db-ally, include
[OpenTelemetry SDK](https://opentelemetry.io/docs/specs/otel/overview/#sdk) or perform
[Auto Instrumentation](https://opentelemetry.io/docs/zero-code/python/).


## Step-by-step guide

1. [Python OpenTelemetry SDK](https://opentelemetry-python.readthedocs.io/en/latest/sdk/index.html) must be installed:

```bash
pip install opentelemetry-sdk
```

2. To capture the traces, you can use [Jeager](https://www.jaegertracing.io/). An open-source software for telemetry
data. The recommended option is to start with Docker. You can run:

```bash
docker run --network host --rm --name jeager -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 jaegertracing/all-in-one
```

For simplicity we are using `--network host`, however, do not use this settings in production deployments and
expose only ports that are needed.

3. Import required OpenTelemetry SDKs and db-ally OTel Handler:

```python
from dbally.audit.event_handlers.otel_event_handler import OtelEventHandler
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
```

4. Setup the OTel exporter in your project:

```python
exporeter = OTLPSpanExporter("http://localhost:4317", insecure=True)
provider = TracerProvider(resource=Resource({"service.name": "db-ally"}))
processor = BatchSpanProcessor(exporeter)
provider.add_span_processor(processor)
handler = OtelEventHandler(provider)
```

Using Resource you can add a name for your service. OTLPSpanExporter is used to export telemetry data using gRPC or
HTTP to desired location. We mark it as insecure, as demo does not use TLS. To efficently send data over network,
we should use BatchSpanProcessor to batch exports of telemetry data. Finally, we setup the db-ally handler.

5. Use handler with collection:

```python
df = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie", "David", "Eve"],
"city": ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"],
})
llm = LiteLLM(model_name="gpt-4o")
collection = dbally.create_collection("clients", llm=llm, event_handlers=[handler], nl_responder=NLResponder(llm))
collection.add(ClientView, lambda: ClientView(df))
```

6. Ask your questions:

```python
result = await collection.ask("What clients are from Huston?", return_natural_response=True)
print(result)
```

7. Explore your traces in observability platform (Jeager in our case):

![Example trace in Jeager UI](../assets/otel_handler_jeager.png)


## Full code example

```python
import asyncio
import pandas as pd
import dbally
from dbally import DataFrameBaseView
from dbally.audit.event_handlers.otel_event_handler import OtelEventHandler
from dbally.nl_responder.nl_responder import NLResponder
from dbally.views import decorators
from dbally.llms import LiteLLM
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
class ClientView(DataFrameBaseView):
@decorators.view_filter()
def filter_by_city(self, city: str):
return self.df['city'] == city
async def main():
exporeter = OTLPSpanExporter("http://localhost:4317", insecure=True)
provider = TracerProvider(resource=Resource({"service.name": "db-ally"}))
processor = BatchSpanProcessor(exporeter)
provider.add_span_processor(processor)
handler = OtelEventHandler(provider)
df = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie", "David", "Eve"],
"city": ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"],
})
llm = LiteLLM(model_name="gpt-4o")
collection = dbally.create_collection("clients", llm=llm, event_handlers=[handler], nl_responder=NLResponder(llm))
collection.add(ClientView, lambda: ClientView(df))
result = await collection.ask("What clients are from Huston?", return_natural_response=True)
print(result)
if __name__ == '__main__':
asyncio.run(main())
```
3 changes: 3 additions & 0 deletions docs/reference/event_handlers/otel_handler.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# OtelEventHandler

::: dbally.audit.OtelEventHandler
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ nav:
- how-to/update_similarity_indexes.md
- how-to/visualize_views.md
- how-to/log_runs_to_langsmith.md
- how-to/trace_runs_with_otel.md
- how-to/create_custom_event_handler.md
- how-to/openai_assistants_integration.md
- API Reference:
Expand All @@ -54,6 +55,7 @@ nav:
- reference/event_handlers/index.md
- reference/event_handlers/cli_handler.md
- reference/event_handlers/langsmith_handler.md
- reference/event_handlers/otel_handler.md
- View Selection:
- reference/view_selection/index.md
- reference/view_selection/llm_view_selector.md
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ install_requires =
tabulate>=0.9.0
click~=8.1.7
numpy>=1.24.0
opentelemetry-api>=1.0.0

[options.extras_require]
litellm =
Expand Down
5 changes: 3 additions & 2 deletions src/dbally/audit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
except ImportError:
pass

from .event_handlers.otel_event_handler import OtelEventHandler
from .event_tracker import EventTracker
from .events import Event, LLMEvent, RequestEnd, RequestStart, SimilarityEvent
from .events import LLMEvent, RequestEnd, RequestStart, SimilarityEvent
from .spans import EventSpan

__all__ = [
"CLIEventHandler",
"LangSmithEventHandler",
"Event",
"OtelEventHandler",
"EventHandler",
"EventTracker",
"EventSpan",
Expand Down
220 changes: 220 additions & 0 deletions src/dbally/audit/event_handlers/otel_event_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import json
from dataclasses import dataclass
from typing import Any, Callable, Optional

from opentelemetry import trace
from opentelemetry.trace import Span, SpanKind, StatusCode, TracerProvider
from opentelemetry.util.types import AttributeValue

from dbally.audit.event_handlers.base import EventHandler
from dbally.audit.events import Event, LLMEvent, RequestEnd, RequestStart, SimilarityEvent

TRACER_NAME = "db-ally.events"
FORBIDDEN_CONTEXT_KEYS = {"filter_mask"}

TransformFn = Optional[Callable[[Any], Optional[AttributeValue]]]


def _optional_str(value: Optional[any]) -> Optional[str]:
return None if value is None else str(value)


@dataclass
class SpanHandler:
"""Handles span attributes and lifecycle"""

span: Span
record_inputs: bool
record_outputs: bool

def set(self, key: str, value: Optional[Any], transform: TransformFn = None) -> "SpanHandler":
"""
Sets a value as span attribute under given key if the value exists. Optionally one can add transform function to
change value from any to valid OpenTelemetry attribute type.
Args:
key: attribute name
value: attribute value. If None, the value is not set
transform: optional function to transform from Any to valid OTel AttributeValue
Returns:
self, for chaining calls
"""
value = value if transform is None else transform(value)
if value is not None:
self.span.set_attribute(key, value)

return self

def set_input(self, key: str, value: Optional[Any], transform: TransformFn = None) -> "SpanHandler":
"""
Sets a value, that is used as model input, under given key if the value exists. If the class does not record
inputs, then the value is not set. Optionally one can add transform function to change value from any to valid
OpenTelemetry attribute type.
Args:
key: attribute name
value: attribute value. If None, the value is not set. If record_inputs is False, the value is not set.
transform: optional function to transform from Any to valid OTel AttributeValue
Returns:
self, for chaining calls
"""
value = value if transform is None else transform(value)
if value is not None and self.record_inputs:
self.span.set_attribute(key, value)

return self

def set_output(self, key: str, value: Optional[Any], transform: TransformFn = None) -> "SpanHandler":
"""
Sets a value, that is the model output under, given key if the value exists. If the class does not record
inputs, then the value is not set. Optionally one can add transform function to change value from any to valid
OpenTelemetry attribute type.
Args:
key: attribute name
value: attribute value. If None, the value is not set. If record_output is False, the value is not set.
transform: optional function to transform from Any to valid OTel AttributeValue
Returns:
self, for chaining calls
"""
value = value if transform is None else transform(value)
if value is not None and self.record_outputs:
self.span.set_attribute(key, value)

return self

def end_succesfully(self) -> None:
"""Sets status of the span to OK and ends the span with current time"""
self.span.set_status(StatusCode.OK)
self.span.end()


class OtelEventHandler(EventHandler[SpanHandler, SpanHandler]):
"""
This handler emits OpenTelemetry spans for recorded events.
"""

def __init__(
self, provider: Optional[TracerProvider] = None, record_inputs: bool = True, record_outputs: bool = True
) -> None:
"""
Initialize OtelEventHandler. By default, it will try to use globaly configured TracerProvider. Pass it
explicitly if you want custom implementation, or you do not use OTel auto-instrumentation.
To comply with the
[OTel Semantic Conventions](https://opentelemetry.io/docs/specs/semconv/gen-ai/llm-spans/#configuration)
recording of inputs and outputs can be disabled.
Args:
provider: Optional tracer provider. By default global provider is used.
record_inputs: if true (default) all inputs are recorded as span attributes. Depending on usecase it maybe
turned off, to save resources and improve performance.
record_outputs: if true (default) all outputs are recorded as span attributes. Depending on usecase it
maybe turned off, to save resources and improve performance.
"""
self.record_inputs = record_inputs
self.record_outputs = record_outputs
if provider is None:
self.tracer = trace.get_tracer(TRACER_NAME)
else:
self.tracer = provider.get_tracer(TRACER_NAME)

def _handle_span(self, span: Span) -> SpanHandler:
return SpanHandler(span, self.record_inputs, self.record_outputs)

async def request_start(self, user_request: RequestStart) -> SpanHandler:
"""
Initializes new OTel Span as a parent.
Args:
user_request: The start of the request.
Returns:
span object as a parent for all subsequent events for this request
"""
with self.tracer.start_as_current_span("request", end_on_exit=False, kind=SpanKind.SERVER) as span:
return (
self._handle_span(span)
.set("db-ally.user.collection", user_request.collection_name)
.set_input("db-ally.user.question", user_request.question)
)

async def event_start(self, event: Event, request_context: SpanHandler) -> SpanHandler:
"""
Starts a new event in a system as a span. Uses request span as a parent.
Args:
event: Event to register
request_context: Parent span for this event
Returns:
span object capturing start of execution for this event
Raises:
ValueError: it is thrown when unknown event type is passed as argument
"""
if isinstance(event, LLMEvent):
with self._new_child_span(request_context, "llm") as span:
return (
self._handle_span(span)
.set("db-ally.llm.type", event.type)
.set_input("db-ally.llm.prompts", json.dumps(event.prompt))
)

if isinstance(event, SimilarityEvent):
with self._new_child_span(request_context, "similarity") as span:
return (
self._handle_span(span)
.set("db-ally.similarity.store", event.store)
.set("db-ally.similarity.fetcher", event.fetcher)
.set_input("db-ally.similarity.input", event.input_value)
)

raise ValueError(f"Unsuported event: {type(event)}")

async def event_end(self, event: Optional[Event], request_context: SpanHandler, event_context: SpanHandler) -> None:
"""
Finalizes execution of the event, ending a span for this event.
Args:
event: optional event information
request_context: parent span
event_context: event span
"""

if isinstance(event, LLMEvent):
event_context.set("db-ally.llm.response-tokes", event.completion_tokens).set_output(
"db-ally.llm.response", event.response
)

if isinstance(event, SimilarityEvent) and self.record_outputs:
event_context.set("db-ally.similarity.output", event.output_value)

event_context.end_succesfully()

async def request_end(self, output: RequestEnd, request_context: SpanHandler) -> None:
"""
Finalizes entire request, ending the span for this request.
Args:
output: output generated for this request
request_context: span to be closed
"""
request_context.set_output("db-ally.result.textual", output.result.textual_response).set(
"db-ally.result.execution-time", output.result.execution_time
).set("db-ally.result.execution-time-view", output.result.execution_time_view).set(
"db-ally.result.view-name", output.result.view_name
)

for key, value in output.result.context.items():
if key not in FORBIDDEN_CONTEXT_KEYS:
request_context.set(f"db-ally.result.context.{key}", value, transform=_optional_str)

request_context.end_succesfully()

def _new_child_span(self, parent: SpanHandler, name: str):
context = trace.set_span_in_context(parent.span)
return self.tracer.start_as_current_span(name, context=context, end_on_exit=False, kind=SpanKind.CLIENT)
Loading

0 comments on commit 0b7e50a

Please sign in to comment.