From 005f9a7f3b558279ddd483879bc6b7f3c0acc723 Mon Sep 17 00:00:00 2001 From: Redna Date: Sat, 8 Jun 2024 20:09:07 +0200 Subject: [PATCH 1/7] Solves issue with usage stats when streaming is enabled on the OpenAIGenerator --- .../haystack_integrations/tracing/langfuse/tracer.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index f4786b9bd..316aba4c2 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -121,12 +121,14 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I # Haystack returns one meta dict for each message, but the 'usage' value # is always the same, let's just pick the first item m = meta[0] - span._span.update(usage=m.get("usage"), model=m.get("model")) + usage = m.get("usage") if m.get("usage") else None + span._span.update(usage=usage, model=m.get("model")) elif tags.get("haystack.component.type") == "OpenAIChatGenerator": replies = span._data.get("haystack.component.output", {}).get("replies") if replies: meta = replies[0].meta - span._span.update(usage=meta.get("usage"), model=meta.get("model")) + usage = meta.get("usage") if meta.get("usage") else None # empty dict will cause langfuse to throw an error - happens when streaming + span._span.update(usage=usage, model=meta.get("model")) pipeline_input = tags.get("haystack.pipeline.input_data", None) if pipeline_input: @@ -137,6 +139,12 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I span.raw_span().end() self._context.pop() + + if span_name == "haystack.pipeline.run": + # The root span has to be a trace, which need to be removed from the context after the pipeline run + self._context.pop() + + self._tracer.flush() def current_span(self) -> Span: From 96f9ea235d8cdfdf0ad6dd3b2f619c135766bbb3 Mon Sep 17 00:00:00 2001 From: Redna Date: Sun, 9 Jun 2024 19:59:20 +0200 Subject: [PATCH 2/7] Root span should be closed when the pipeline run is complete --- integrations/langfuse/pyproject.toml | 2 +- .../connectors/langfuse/langfuse_connector.py | 4 +++ .../tracing/langfuse/tracer.py | 9 +++-- integrations/langfuse/tests/test_tracer.py | 33 ++++++++++++++++++- 4 files changed, 43 insertions(+), 5 deletions(-) diff --git a/integrations/langfuse/pyproject.toml b/integrations/langfuse/pyproject.toml index beff61c35..504d816ae 100644 --- a/integrations/langfuse/pyproject.toml +++ b/integrations/langfuse/pyproject.toml @@ -25,7 +25,7 @@ classifiers = [ "Programming Language :: Python :: Implementation :: PyPy", ] dependencies = [ - "haystack-ai", + "haystack-ai>=2.1.0", "langfuse" ] diff --git a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py index bb2d22954..84e878268 100644 --- a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py +++ b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py @@ -20,6 +20,10 @@ class LangfuseConnector: In addition, you need to set the `HAYSTACK_CONTENT_TRACING_ENABLED` environment variable to `true` in order to enable Haystack tracing in your pipeline. + Lastly, you may disable flushing the data after each component by setting the `HAYSTACK_LANGFUSE_ENFORCE_FLUSH` environent + variable to `false`. By default, the data is flushed after each component and blocks the thread until the data is sent to + Langfuse. + Here is an example of how to use it: ```python diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index 316aba4c2..e9f51fd4d 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -1,4 +1,5 @@ import contextlib +import os from typing import Any, Dict, Iterator, Optional, Union from haystack.dataclasses import ChatMessage @@ -7,6 +8,7 @@ import langfuse +HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR = "HAYSTACK_LANGFUSE_ENFORCE_FLUSH" class LangfuseSpan(Span): """ @@ -93,6 +95,7 @@ def __init__(self, tracer: "langfuse.Langfuse", name: str = "Haystack", public: self._context: list[LangfuseSpan] = [] self._name = name self._public = public + self.enforce_flush = os.getenv(HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR, "true").lower() == "true" @contextlib.contextmanager def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]: @@ -140,12 +143,12 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I span.raw_span().end() self._context.pop() - if span_name == "haystack.pipeline.run": + if len(self._context) == 1: # The root span has to be a trace, which need to be removed from the context after the pipeline run self._context.pop() - - self._tracer.flush() + if self.enforce_flush: + self._tracer.flush() def current_span(self) -> Span: """ diff --git a/integrations/langfuse/tests/test_tracer.py b/integrations/langfuse/tests/test_tracer.py index 35a24c4b8..24cc43463 100644 --- a/integrations/langfuse/tests/test_tracer.py +++ b/integrations/langfuse/tests/test_tracer.py @@ -1,3 +1,4 @@ +import os from unittest.mock import Mock, MagicMock, patch from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer @@ -38,7 +39,7 @@ def test_create_new_span(self): assert span.raw_span().operation_name == "operation_name" assert span.raw_span().metadata == {"tag1": "value1", "tag2": "value2"} - assert len(tracer._context) == 1, "The trace span should have been popped, leaving root span in the context" + assert len(tracer._context) == 0, "The trace span should have been popped, and the root span is closed as well" # check that update method is called on the span instance with the provided key value pairs def test_update_span_with_pipeline_input_output_data(self): @@ -79,3 +80,33 @@ def end(self): with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.output_data": "bye"}) as span: assert span.raw_span()._data["metadata"] == {"haystack.pipeline.output_data": "bye"} + + def test_update_span_gets_flushed_by_default(self): + tracer_mock = Mock() + + tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False) + with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span: + pass + + tracer_mock.flush.assert_called_once() + + def test_update_span_flush_disable(self): + os.environ["HAYSTACK_LANGFUSE_ENFORCE_FLUSH"] = "false" + tracer_mock = Mock() + + from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer + + tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False) + with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span: + pass + + tracer_mock.flush.assert_not_called() + + def test_context_is_empty_after_tracing(self): + tracer_mock = Mock() + + tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False) + with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span: + pass + + assert tracer._context == [] \ No newline at end of file From 629c1ac944b4b494b0f51810e7ff9f3276b591bc Mon Sep 17 00:00:00 2001 From: Redna Date: Mon, 10 Jun 2024 12:31:01 +0200 Subject: [PATCH 3/7] Added documentation --- .../connectors/langfuse/langfuse_connector.py | 15 ++++++++++++++- .../tracing/langfuse/tracer.py | 2 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py index 84e878268..e611b6dbc 100644 --- a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py +++ b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py @@ -22,7 +22,20 @@ class LangfuseConnector: Lastly, you may disable flushing the data after each component by setting the `HAYSTACK_LANGFUSE_ENFORCE_FLUSH` environent variable to `false`. By default, the data is flushed after each component and blocks the thread until the data is sent to - Langfuse. + Langfuse. **Caution**: Disabling this feature may result in data loss if the program crashes before the data is sent to Langfuse. + Make sure you will call langfuse.flush() explicitly before the program exits. e.g. by using: + ```python + try: + # your code here + finally: + langfuse.flush() + ``` + or in FastAPI by defining a shutdown event handler: + ```python + @app.on_event("shutdown") + async def shutdown_event(): + langfuse.flush() + ``` Here is an example of how to use it: diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index e9f51fd4d..3a366a39e 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -8,6 +8,8 @@ import langfuse +from threading import Thread + HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR = "HAYSTACK_LANGFUSE_ENFORCE_FLUSH" class LangfuseSpan(Span): From c9ed7959c61b27a5d017018dda4cbec0c6fae401 Mon Sep 17 00:00:00 2001 From: Redna Date: Mon, 10 Jun 2024 12:44:05 +0200 Subject: [PATCH 4/7] Moved flushing execution to the last span in the context and improved the documentation to give examples of flushing properly manually --- .../connectors/langfuse/langfuse_connector.py | 13 ++++++++++--- .../tracing/langfuse/tracer.py | 7 +++++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py index e611b6dbc..b380fd0f2 100644 --- a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py +++ b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py @@ -23,18 +23,25 @@ class LangfuseConnector: Lastly, you may disable flushing the data after each component by setting the `HAYSTACK_LANGFUSE_ENFORCE_FLUSH` environent variable to `false`. By default, the data is flushed after each component and blocks the thread until the data is sent to Langfuse. **Caution**: Disabling this feature may result in data loss if the program crashes before the data is sent to Langfuse. - Make sure you will call langfuse.flush() explicitly before the program exits. e.g. by using: + Make sure you will call langfuse.flush() explicitly before the program exits. e.g. by using tracer.actual_tracer.flush(): + ```python + from haystack.tracing import tracer + try: # your code here finally: - langfuse.flush() + tracer.actual_tracer.flush() ``` or in FastAPI by defining a shutdown event handler: ```python + from haystack.tracing import tracer + + # ... + @app.on_event("shutdown") async def shutdown_event(): - langfuse.flush() + tracer.actual_tracer.flush() ``` Here is an example of how to use it: diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index 3a366a39e..745993eaa 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -148,9 +148,12 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I if len(self._context) == 1: # The root span has to be a trace, which need to be removed from the context after the pipeline run self._context.pop() + + if self.enforce_flush: + self.flush() - if self.enforce_flush: - self._tracer.flush() + def flush(self): + self._tracer.flush() def current_span(self) -> Span: """ From 97b4c041263702bbc05ad5d727c34978c13ba5c1 Mon Sep 17 00:00:00 2001 From: Redna Date: Mon, 10 Jun 2024 13:09:45 +0200 Subject: [PATCH 5/7] Fixed linting issues --- .../connectors/langfuse/langfuse_connector.py | 13 +++++++------ .../tracing/langfuse/tracer.py | 7 +++---- integrations/langfuse/tests/test_tracer.py | 10 ++++++---- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py index b380fd0f2..cfe150317 100644 --- a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py +++ b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py @@ -20,11 +20,12 @@ class LangfuseConnector: In addition, you need to set the `HAYSTACK_CONTENT_TRACING_ENABLED` environment variable to `true` in order to enable Haystack tracing in your pipeline. - Lastly, you may disable flushing the data after each component by setting the `HAYSTACK_LANGFUSE_ENFORCE_FLUSH` environent - variable to `false`. By default, the data is flushed after each component and blocks the thread until the data is sent to - Langfuse. **Caution**: Disabling this feature may result in data loss if the program crashes before the data is sent to Langfuse. - Make sure you will call langfuse.flush() explicitly before the program exits. e.g. by using tracer.actual_tracer.flush(): - + Lastly, you may disable flushing the data after each component by setting the `HAYSTACK_LANGFUSE_ENFORCE_FLUSH` + environent variable to `false`. By default, the data is flushed after each component and blocks the thread until + the data is sent to Langfuse. **Caution**: Disabling this feature may result in data loss if the program crashes + before the data is sent to Langfuse. Make sure you will call langfuse.flush() explicitly before the program exits. + E.g. by using tracer.actual_tracer.flush(): + ```python from haystack.tracing import tracer @@ -38,7 +39,7 @@ class LangfuseConnector: from haystack.tracing import tracer # ... - + @app.on_event("shutdown") async def shutdown_event(): tracer.actual_tracer.flush() diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index 745993eaa..6f2c7e432 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -8,10 +8,9 @@ import langfuse -from threading import Thread - HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR = "HAYSTACK_LANGFUSE_ENFORCE_FLUSH" + class LangfuseSpan(Span): """ Internal class representing a bridge between the Haystack span tracing API and Langfuse. @@ -132,7 +131,7 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I replies = span._data.get("haystack.component.output", {}).get("replies") if replies: meta = replies[0].meta - usage = meta.get("usage") if meta.get("usage") else None # empty dict will cause langfuse to throw an error - happens when streaming + usage = meta.get("usage") if meta.get("usage") else None span._span.update(usage=usage, model=meta.get("model")) pipeline_input = tags.get("haystack.pipeline.input_data", None) @@ -148,7 +147,7 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I if len(self._context) == 1: # The root span has to be a trace, which need to be removed from the context after the pipeline run self._context.pop() - + if self.enforce_flush: self.flush() diff --git a/integrations/langfuse/tests/test_tracer.py b/integrations/langfuse/tests/test_tracer.py index 24cc43463..1023f3f17 100644 --- a/integrations/langfuse/tests/test_tracer.py +++ b/integrations/langfuse/tests/test_tracer.py @@ -39,7 +39,9 @@ def test_create_new_span(self): assert span.raw_span().operation_name == "operation_name" assert span.raw_span().metadata == {"tag1": "value1", "tag2": "value2"} - assert len(tracer._context) == 0, "The trace span should have been popped, and the root span is closed as well" + assert ( + len(tracer._context) == 0 + ), "The trace span should have been popped, and the root span is closed as well" # check that update method is called on the span instance with the provided key value pairs def test_update_span_with_pipeline_input_output_data(self): @@ -89,13 +91,13 @@ def test_update_span_gets_flushed_by_default(self): pass tracer_mock.flush.assert_called_once() - + def test_update_span_flush_disable(self): os.environ["HAYSTACK_LANGFUSE_ENFORCE_FLUSH"] = "false" tracer_mock = Mock() from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer - + tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False) with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span: pass @@ -109,4 +111,4 @@ def test_context_is_empty_after_tracing(self): with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span: pass - assert tracer._context == [] \ No newline at end of file + assert tracer._context == [] From b6690ea25dda03917a7b1c94edba2573a96d9c4e Mon Sep 17 00:00:00 2001 From: Redna Date: Wed, 12 Jun 2024 22:05:32 +0200 Subject: [PATCH 6/7] make use of monkeypatch --- integrations/langfuse/tests/test_tracer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integrations/langfuse/tests/test_tracer.py b/integrations/langfuse/tests/test_tracer.py index 1023f3f17..241581a72 100644 --- a/integrations/langfuse/tests/test_tracer.py +++ b/integrations/langfuse/tests/test_tracer.py @@ -92,8 +92,8 @@ def test_update_span_gets_flushed_by_default(self): tracer_mock.flush.assert_called_once() - def test_update_span_flush_disable(self): - os.environ["HAYSTACK_LANGFUSE_ENFORCE_FLUSH"] = "false" + def test_update_span_flush_disable(self, monkeypatch): + monkeypatch.setenv("HAYSTACK_LANGFUSE_ENFORCE_FLUSH", "false") tracer_mock = Mock() from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer From 70afb75c3268ba48921d780fcb588cbe8f2f40aa Mon Sep 17 00:00:00 2001 From: Redna Date: Thu, 13 Jun 2024 12:14:01 +0200 Subject: [PATCH 7/7] improving code --- .../src/haystack_integrations/tracing/langfuse/tracer.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index 6f2c7e432..9261b8f70 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -125,14 +125,12 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I # Haystack returns one meta dict for each message, but the 'usage' value # is always the same, let's just pick the first item m = meta[0] - usage = m.get("usage") if m.get("usage") else None - span._span.update(usage=usage, model=m.get("model")) + span._span.update(usage=m.get("usage") or None, model=m.get("model")) elif tags.get("haystack.component.type") == "OpenAIChatGenerator": replies = span._data.get("haystack.component.output", {}).get("replies") if replies: meta = replies[0].meta - usage = meta.get("usage") if meta.get("usage") else None - span._span.update(usage=usage, model=meta.get("model")) + span._span.update(usage=meta.get("usage") or None, model=meta.get("model")) pipeline_input = tags.get("haystack.pipeline.input_data", None) if pipeline_input: