Skip to content

Commit

Permalink
Pydocs, add public trace flag
Browse files Browse the repository at this point in the history
  • Loading branch information
vblagoje committed Apr 30, 2024
1 parent 72c08c2 commit ff5b8e0
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,70 @@

@component
class LangfuseConnector:
def __init__(self, name: str):
"""
LangfuseConnector connects Haystack LLM framework with Langfuse in order to enable the tracing of operations
and data flow within various components of a pipeline.
Simply add this component to your pipeline, DO NOT connect it to any other component. The LangfuseConnector will
automatically trace the operations and data flow within the pipeline.
Here is an example of how to use it:
```python
import os
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"
from haystack import Pipeline
from haystack.components.builders import DynamicChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack_integrations.components.connectors.langfuse import LangfuseConnector
if __name__ == "__main__":
pipe = Pipeline()
pipe.add_component("tracer", LangfuseConnector("Chat example"))
pipe.add_component("prompt_builder", DynamicChatPromptBuilder())
pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo"))
pipe.connect("prompt_builder.prompt", "llm.messages")
messages = [
ChatMessage.from_system("Always respond in German even if some input data is in other languages."),
ChatMessage.from_user("Tell me about {{location}}"),
]
response = pipe.run(
data={"prompt_builder": {"template_variables": {"location": "Berlin"}, "prompt_source": messages}}
)
print(response["llm"]["replies"][0])
print(response["tracer"]["trace_url"])
```
"""

def __init__(self, name: str, public: bool = False):
"""
Initialize the LangfuseConnector component.
:param name: The name of the pipeline or component. This name will be used to identify the tracing run on the
Langfuse dashboard.
:param public: Whether the tracing data should be public or private. If set to `True`, the tracing data will be
publicly accessible to anyone with the tracing URL. If set to `False`, the tracing data will be private and
only accessible to the Langfuse account owner. Default is `False`.
"""
self.name = name
self.tracer = LangfuseTracer(Langfuse(), name)
self.tracer = LangfuseTracer(tracer=Langfuse(), name=name, public=public)
tracing.enable_tracing(self.tracer)

@component.output_types(name=str, trace_url=str)
def run(self):
"""
Runs the LangfuseConnector component.
:returns: A dictionary with the following keys:
- `name`: The name of the tracing component.
- `trace_url`: The URL to the tracing data.
"""
return {"name": self.name, "trace_url": self.tracer.get_trace_url()}
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ def to_openai_format(self, m: ChatMessage) -> Dict[str, Any]:


class LangfuseTracer(Tracer):
def __init__(self, tracer: "langfuse.Langfuse", name: str = "Haystack") -> None:
def __init__(self, tracer: "langfuse.Langfuse", name: str = "Haystack", public: bool = False) -> None:
self._tracer = tracer
self._context: list[LangfuseSpan] = []
self._name = name
self._public = public

@contextlib.contextmanager
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
Expand Down Expand Up @@ -105,7 +106,7 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I
def current_span(self) -> Span:
if not self._context:
# The root span has to be a trace
self._context.append(LangfuseSpan(self._tracer.trace(name=self._name)))
self._context.append(LangfuseSpan(self._tracer.trace(name=self._name, public=self._public)))
return self._context[-1]

def get_trace_url(self) -> str:
Expand Down
25 changes: 11 additions & 14 deletions integrations/langfuse/tests/test_tracing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import os

# don't remove (or move) this env var setting from here, it's needed to turn tracing on
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"

from urllib.parse import urlparse

import pytest
Expand All @@ -21,7 +25,7 @@
def test_tracing_integration():

pipe = Pipeline()
pipe.add_component("tracer", LangfuseConnector("Chat example"))
pipe.add_component("tracer", LangfuseConnector(name="Chat example", public=True)) # public so anyone can verify run
pipe.add_component("prompt_builder", DynamicChatPromptBuilder())
pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo"))

Expand All @@ -37,19 +41,12 @@ def test_tracing_integration():
)
assert "Berlin" in response["llm"]["replies"][0].content
assert response["tracer"]["trace_url"]

url = "https://cloud.langfuse.com/api/public/traces/"
trace_url = response["tracer"]["trace_url"]
parsed_url = urlparse(trace_url)
# trace id is the last part of the path (after the last '/')
uuid = os.path.basename(parsed_url.path)

try:
# GET request with Basic Authentication on the Langfuse API
response = requests.get(
url + uuid, auth=HTTPBasicAuth(os.environ.get("LANGFUSE_PUBLIC_KEY"), os.environ.get("LANGFUSE_SECRET_KEY"))
)

assert response.status_code == 200, f"Failed to retrieve data from Langfuse API: {response.status_code}"
# should be able to access the trace data because we set LangfuseConnector to public=True
response = requests.get(trace_url)
assert (
response.status_code == 200
), f"Failed to retrieve tracing data from {trace_url} got: {response.status_code}"
except requests.exceptions.RequestException as e:
assert False, f"Failed to retrieve data from Langfuse API: {e}"
assert False, f"Failed to retrieve tracing data from Langfuse: {e}"

0 comments on commit ff5b8e0

Please sign in to comment.