diff --git a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py index 1b0f4eda2..89e512ab8 100644 --- a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py +++ b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py @@ -6,11 +6,70 @@ @component class LangfuseConnector: - def __init__(self, name: str): + """ + LangfuseConnector connects Haystack LLM framework with Langfuse in order to enable the tracing of operations + and data flow within various components of a pipeline. + + Simply add this component to your pipeline, DO NOT connect it to any other component. The LangfuseConnector will + automatically trace the operations and data flow within the pipeline. + + Here is an example of how to use it: + + ```python + import os + + os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true" + + from haystack import Pipeline + from haystack.components.builders import DynamicChatPromptBuilder + from haystack.components.generators.chat import OpenAIChatGenerator + from haystack.dataclasses import ChatMessage + from haystack_integrations.components.connectors.langfuse import LangfuseConnector + + if __name__ == "__main__": + + pipe = Pipeline() + pipe.add_component("tracer", LangfuseConnector("Chat example")) + pipe.add_component("prompt_builder", DynamicChatPromptBuilder()) + pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo")) + + pipe.connect("prompt_builder.prompt", "llm.messages") + + messages = [ + ChatMessage.from_system("Always respond in German even if some input data is in other languages."), + ChatMessage.from_user("Tell me about {{location}}"), + ] + + response = pipe.run( + data={"prompt_builder": {"template_variables": {"location": "Berlin"}, "prompt_source": messages}} + ) + print(response["llm"]["replies"][0]) + print(response["tracer"]["trace_url"]) + ``` + + """ + + def __init__(self, name: str, public: bool = False): + """ + Initialize the LangfuseConnector component. + + :param name: The name of the pipeline or component. This name will be used to identify the tracing run on the + Langfuse dashboard. + :param public: Whether the tracing data should be public or private. If set to `True`, the tracing data will be + publicly accessible to anyone with the tracing URL. If set to `False`, the tracing data will be private and + only accessible to the Langfuse account owner. Default is `False`. + """ self.name = name - self.tracer = LangfuseTracer(Langfuse(), name) + self.tracer = LangfuseTracer(tracer=Langfuse(), name=name, public=public) tracing.enable_tracing(self.tracer) @component.output_types(name=str, trace_url=str) def run(self): + """ + Runs the LangfuseConnector component. + + :returns: A dictionary with the following keys: + - `name`: The name of the tracing component. + - `trace_url`: The URL to the tracing data. + """ return {"name": self.name, "trace_url": self.tracer.get_trace_url()} diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index f138e4912..857081274 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -58,10 +58,11 @@ def to_openai_format(self, m: ChatMessage) -> Dict[str, Any]: class LangfuseTracer(Tracer): - def __init__(self, tracer: "langfuse.Langfuse", name: str = "Haystack") -> None: + def __init__(self, tracer: "langfuse.Langfuse", name: str = "Haystack", public: bool = False) -> None: self._tracer = tracer self._context: list[LangfuseSpan] = [] self._name = name + self._public = public @contextlib.contextmanager def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]: @@ -105,7 +106,7 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I def current_span(self) -> Span: if not self._context: # The root span has to be a trace - self._context.append(LangfuseSpan(self._tracer.trace(name=self._name))) + self._context.append(LangfuseSpan(self._tracer.trace(name=self._name, public=self._public))) return self._context[-1] def get_trace_url(self) -> str: diff --git a/integrations/langfuse/tests/test_tracing.py b/integrations/langfuse/tests/test_tracing.py index 50f6e89c1..3d61d2cce 100644 --- a/integrations/langfuse/tests/test_tracing.py +++ b/integrations/langfuse/tests/test_tracing.py @@ -1,4 +1,8 @@ import os + +# don't remove (or move) this env var setting from here, it's needed to turn tracing on +os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true" + from urllib.parse import urlparse import pytest @@ -21,7 +25,7 @@ def test_tracing_integration(): pipe = Pipeline() - pipe.add_component("tracer", LangfuseConnector("Chat example")) + pipe.add_component("tracer", LangfuseConnector(name="Chat example", public=True)) # public so anyone can verify run pipe.add_component("prompt_builder", DynamicChatPromptBuilder()) pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo")) @@ -37,19 +41,12 @@ def test_tracing_integration(): ) assert "Berlin" in response["llm"]["replies"][0].content assert response["tracer"]["trace_url"] - - url = "https://cloud.langfuse.com/api/public/traces/" trace_url = response["tracer"]["trace_url"] - parsed_url = urlparse(trace_url) - # trace id is the last part of the path (after the last '/') - uuid = os.path.basename(parsed_url.path) - try: - # GET request with Basic Authentication on the Langfuse API - response = requests.get( - url + uuid, auth=HTTPBasicAuth(os.environ.get("LANGFUSE_PUBLIC_KEY"), os.environ.get("LANGFUSE_SECRET_KEY")) - ) - - assert response.status_code == 200, f"Failed to retrieve data from Langfuse API: {response.status_code}" + # should be able to access the trace data because we set LangfuseConnector to public=True + response = requests.get(trace_url) + assert ( + response.status_code == 200 + ), f"Failed to retrieve tracing data from {trace_url} got: {response.status_code}" except requests.exceptions.RequestException as e: - assert False, f"Failed to retrieve data from Langfuse API: {e}" + assert False, f"Failed to retrieve tracing data from Langfuse: {e}"