diff --git a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py index b380fd0f24..cfe1503176 100644 --- a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py +++ b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py @@ -20,11 +20,12 @@ class LangfuseConnector: In addition, you need to set the `HAYSTACK_CONTENT_TRACING_ENABLED` environment variable to `true` in order to enable Haystack tracing in your pipeline. - Lastly, you may disable flushing the data after each component by setting the `HAYSTACK_LANGFUSE_ENFORCE_FLUSH` environent - variable to `false`. By default, the data is flushed after each component and blocks the thread until the data is sent to - Langfuse. **Caution**: Disabling this feature may result in data loss if the program crashes before the data is sent to Langfuse. - Make sure you will call langfuse.flush() explicitly before the program exits. e.g. by using tracer.actual_tracer.flush(): - + Lastly, you may disable flushing the data after each component by setting the `HAYSTACK_LANGFUSE_ENFORCE_FLUSH` + environent variable to `false`. By default, the data is flushed after each component and blocks the thread until + the data is sent to Langfuse. **Caution**: Disabling this feature may result in data loss if the program crashes + before the data is sent to Langfuse. Make sure you will call langfuse.flush() explicitly before the program exits. + E.g. by using tracer.actual_tracer.flush(): + ```python from haystack.tracing import tracer @@ -38,7 +39,7 @@ class LangfuseConnector: from haystack.tracing import tracer # ... - + @app.on_event("shutdown") async def shutdown_event(): tracer.actual_tracer.flush() diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index 745993eaa5..d8de1d5049 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -8,8 +8,6 @@ import langfuse -from threading import Thread - HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR = "HAYSTACK_LANGFUSE_ENFORCE_FLUSH" class LangfuseSpan(Span): @@ -132,7 +130,7 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I replies = span._data.get("haystack.component.output", {}).get("replies") if replies: meta = replies[0].meta - usage = meta.get("usage") if meta.get("usage") else None # empty dict will cause langfuse to throw an error - happens when streaming + usage = meta.get("usage") if meta.get("usage") else None span._span.update(usage=usage, model=meta.get("model")) pipeline_input = tags.get("haystack.pipeline.input_data", None) @@ -148,7 +146,7 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I if len(self._context) == 1: # The root span has to be a trace, which need to be removed from the context after the pipeline run self._context.pop() - + if self.enforce_flush: self.flush()