diff --git a/integrations/langfuse/pyproject.toml b/integrations/langfuse/pyproject.toml index beff61c35..504d816ae 100644 --- a/integrations/langfuse/pyproject.toml +++ b/integrations/langfuse/pyproject.toml @@ -25,7 +25,7 @@ classifiers = [ "Programming Language :: Python :: Implementation :: PyPy", ] dependencies = [ - "haystack-ai", + "haystack-ai>=2.1.0", "langfuse" ] diff --git a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py index bb2d22954..cfe150317 100644 --- a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py +++ b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py @@ -20,6 +20,31 @@ class LangfuseConnector: In addition, you need to set the `HAYSTACK_CONTENT_TRACING_ENABLED` environment variable to `true` in order to enable Haystack tracing in your pipeline. + Lastly, you may disable flushing the data after each component by setting the `HAYSTACK_LANGFUSE_ENFORCE_FLUSH` + environent variable to `false`. By default, the data is flushed after each component and blocks the thread until + the data is sent to Langfuse. **Caution**: Disabling this feature may result in data loss if the program crashes + before the data is sent to Langfuse. Make sure you will call langfuse.flush() explicitly before the program exits. + E.g. by using tracer.actual_tracer.flush(): + + ```python + from haystack.tracing import tracer + + try: + # your code here + finally: + tracer.actual_tracer.flush() + ``` + or in FastAPI by defining a shutdown event handler: + ```python + from haystack.tracing import tracer + + # ... + + @app.on_event("shutdown") + async def shutdown_event(): + tracer.actual_tracer.flush() + ``` + Here is an example of how to use it: ```python diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index f4786b9bd..9261b8f70 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -1,4 +1,5 @@ import contextlib +import os from typing import Any, Dict, Iterator, Optional, Union from haystack.dataclasses import ChatMessage @@ -7,6 +8,8 @@ import langfuse +HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR = "HAYSTACK_LANGFUSE_ENFORCE_FLUSH" + class LangfuseSpan(Span): """ @@ -93,6 +96,7 @@ def __init__(self, tracer: "langfuse.Langfuse", name: str = "Haystack", public: self._context: list[LangfuseSpan] = [] self._name = name self._public = public + self.enforce_flush = os.getenv(HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR, "true").lower() == "true" @contextlib.contextmanager def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]: @@ -121,12 +125,12 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I # Haystack returns one meta dict for each message, but the 'usage' value # is always the same, let's just pick the first item m = meta[0] - span._span.update(usage=m.get("usage"), model=m.get("model")) + span._span.update(usage=m.get("usage") or None, model=m.get("model")) elif tags.get("haystack.component.type") == "OpenAIChatGenerator": replies = span._data.get("haystack.component.output", {}).get("replies") if replies: meta = replies[0].meta - span._span.update(usage=meta.get("usage"), model=meta.get("model")) + span._span.update(usage=meta.get("usage") or None, model=meta.get("model")) pipeline_input = tags.get("haystack.pipeline.input_data", None) if pipeline_input: @@ -137,6 +141,15 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I span.raw_span().end() self._context.pop() + + if len(self._context) == 1: + # The root span has to be a trace, which need to be removed from the context after the pipeline run + self._context.pop() + + if self.enforce_flush: + self.flush() + + def flush(self): self._tracer.flush() def current_span(self) -> Span: diff --git a/integrations/langfuse/tests/test_tracer.py b/integrations/langfuse/tests/test_tracer.py index 35a24c4b8..241581a72 100644 --- a/integrations/langfuse/tests/test_tracer.py +++ b/integrations/langfuse/tests/test_tracer.py @@ -1,3 +1,4 @@ +import os from unittest.mock import Mock, MagicMock, patch from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer @@ -38,7 +39,9 @@ def test_create_new_span(self): assert span.raw_span().operation_name == "operation_name" assert span.raw_span().metadata == {"tag1": "value1", "tag2": "value2"} - assert len(tracer._context) == 1, "The trace span should have been popped, leaving root span in the context" + assert ( + len(tracer._context) == 0 + ), "The trace span should have been popped, and the root span is closed as well" # check that update method is called on the span instance with the provided key value pairs def test_update_span_with_pipeline_input_output_data(self): @@ -79,3 +82,33 @@ def end(self): with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.output_data": "bye"}) as span: assert span.raw_span()._data["metadata"] == {"haystack.pipeline.output_data": "bye"} + + def test_update_span_gets_flushed_by_default(self): + tracer_mock = Mock() + + tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False) + with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span: + pass + + tracer_mock.flush.assert_called_once() + + def test_update_span_flush_disable(self, monkeypatch): + monkeypatch.setenv("HAYSTACK_LANGFUSE_ENFORCE_FLUSH", "false") + tracer_mock = Mock() + + from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer + + tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False) + with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span: + pass + + tracer_mock.flush.assert_not_called() + + def test_context_is_empty_after_tracing(self): + tracer_mock = Mock() + + tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False) + with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span: + pass + + assert tracer._context == []