Skip to content

Commit

Permalink
fix: Performance optimizations and value error when streaming in lang…
Browse files Browse the repository at this point in the history
…fuse (#798)

* Solves issue with usage stats when streaming is enabled on the OpenAIGenerator

* Root span should be closed when the pipeline run is complete

* Added documentation

* Moved flushing execution to the last span in the context and improved the documentation to give examples of flushing properly manually

* Fixed linting issues

* make use of monkeypatch

* improving code
  • Loading branch information
Redna authored Jun 13, 2024
1 parent 575e209 commit bf5c641
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 4 deletions.
2 changes: 1 addition & 1 deletion integrations/langfuse/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = [
"haystack-ai",
"haystack-ai>=2.1.0",
"langfuse"
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,31 @@ class LangfuseConnector:
In addition, you need to set the `HAYSTACK_CONTENT_TRACING_ENABLED` environment variable to `true` in order to
enable Haystack tracing in your pipeline.
Lastly, you may disable flushing the data after each component by setting the `HAYSTACK_LANGFUSE_ENFORCE_FLUSH`
environent variable to `false`. By default, the data is flushed after each component and blocks the thread until
the data is sent to Langfuse. **Caution**: Disabling this feature may result in data loss if the program crashes
before the data is sent to Langfuse. Make sure you will call langfuse.flush() explicitly before the program exits.
E.g. by using tracer.actual_tracer.flush():
```python
from haystack.tracing import tracer
try:
# your code here
finally:
tracer.actual_tracer.flush()
```
or in FastAPI by defining a shutdown event handler:
```python
from haystack.tracing import tracer
# ...
@app.on_event("shutdown")
async def shutdown_event():
tracer.actual_tracer.flush()
```
Here is an example of how to use it:
```python
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import os
from typing import Any, Dict, Iterator, Optional, Union

from haystack.dataclasses import ChatMessage
Expand All @@ -7,6 +8,8 @@

import langfuse

HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR = "HAYSTACK_LANGFUSE_ENFORCE_FLUSH"


class LangfuseSpan(Span):
"""
Expand Down Expand Up @@ -93,6 +96,7 @@ def __init__(self, tracer: "langfuse.Langfuse", name: str = "Haystack", public:
self._context: list[LangfuseSpan] = []
self._name = name
self._public = public
self.enforce_flush = os.getenv(HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR, "true").lower() == "true"

@contextlib.contextmanager
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
Expand Down Expand Up @@ -121,12 +125,12 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I
# Haystack returns one meta dict for each message, but the 'usage' value
# is always the same, let's just pick the first item
m = meta[0]
span._span.update(usage=m.get("usage"), model=m.get("model"))
span._span.update(usage=m.get("usage") or None, model=m.get("model"))
elif tags.get("haystack.component.type") == "OpenAIChatGenerator":
replies = span._data.get("haystack.component.output", {}).get("replies")
if replies:
meta = replies[0].meta
span._span.update(usage=meta.get("usage"), model=meta.get("model"))
span._span.update(usage=meta.get("usage") or None, model=meta.get("model"))

pipeline_input = tags.get("haystack.pipeline.input_data", None)
if pipeline_input:
Expand All @@ -137,6 +141,15 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I

span.raw_span().end()
self._context.pop()

if len(self._context) == 1:
# The root span has to be a trace, which need to be removed from the context after the pipeline run
self._context.pop()

if self.enforce_flush:
self.flush()

def flush(self):
self._tracer.flush()

def current_span(self) -> Span:
Expand Down
35 changes: 34 additions & 1 deletion integrations/langfuse/tests/test_tracer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from unittest.mock import Mock, MagicMock, patch

from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer
Expand Down Expand Up @@ -38,7 +39,9 @@ def test_create_new_span(self):
assert span.raw_span().operation_name == "operation_name"
assert span.raw_span().metadata == {"tag1": "value1", "tag2": "value2"}

assert len(tracer._context) == 1, "The trace span should have been popped, leaving root span in the context"
assert (
len(tracer._context) == 0
), "The trace span should have been popped, and the root span is closed as well"

# check that update method is called on the span instance with the provided key value pairs
def test_update_span_with_pipeline_input_output_data(self):
Expand Down Expand Up @@ -79,3 +82,33 @@ def end(self):

with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.output_data": "bye"}) as span:
assert span.raw_span()._data["metadata"] == {"haystack.pipeline.output_data": "bye"}

def test_update_span_gets_flushed_by_default(self):
tracer_mock = Mock()

tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False)
with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span:
pass

tracer_mock.flush.assert_called_once()

def test_update_span_flush_disable(self, monkeypatch):
monkeypatch.setenv("HAYSTACK_LANGFUSE_ENFORCE_FLUSH", "false")
tracer_mock = Mock()

from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer

tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False)
with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span:
pass

tracer_mock.flush.assert_not_called()

def test_context_is_empty_after_tracing(self):
tracer_mock = Mock()

tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False)
with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span:
pass

assert tracer._context == []

0 comments on commit bf5c641

Please sign in to comment.