Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Performance optimizations and value error when streaming in langfuse #798

Merged
merged 9 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrations/langfuse/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = [
"haystack-ai",
"haystack-ai>=2.1.0",
masci marked this conversation as resolved.
Show resolved Hide resolved
"langfuse"
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,31 @@ class LangfuseConnector:
In addition, you need to set the `HAYSTACK_CONTENT_TRACING_ENABLED` environment variable to `true` in order to
enable Haystack tracing in your pipeline.

Lastly, you may disable flushing the data after each component by setting the `HAYSTACK_LANGFUSE_ENFORCE_FLUSH`
environent variable to `false`. By default, the data is flushed after each component and blocks the thread until
the data is sent to Langfuse. **Caution**: Disabling this feature may result in data loss if the program crashes
before the data is sent to Langfuse. Make sure you will call langfuse.flush() explicitly before the program exits.
E.g. by using tracer.actual_tracer.flush():

```python
from haystack.tracing import tracer

try:
# your code here
finally:
tracer.actual_tracer.flush()
```
or in FastAPI by defining a shutdown event handler:
```python
from haystack.tracing import tracer

# ...

@app.on_event("shutdown")
async def shutdown_event():
tracer.actual_tracer.flush()
```

Here is an example of how to use it:

```python
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import os
from typing import Any, Dict, Iterator, Optional, Union

from haystack.dataclasses import ChatMessage
Expand All @@ -7,6 +8,8 @@

import langfuse

HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR = "HAYSTACK_LANGFUSE_ENFORCE_FLUSH"


class LangfuseSpan(Span):
"""
Expand Down Expand Up @@ -93,6 +96,7 @@ def __init__(self, tracer: "langfuse.Langfuse", name: str = "Haystack", public:
self._context: list[LangfuseSpan] = []
self._name = name
self._public = public
self.enforce_flush = os.getenv(HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR, "true").lower() == "true"

@contextlib.contextmanager
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
Expand Down Expand Up @@ -121,12 +125,14 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I
# Haystack returns one meta dict for each message, but the 'usage' value
# is always the same, let's just pick the first item
m = meta[0]
span._span.update(usage=m.get("usage"), model=m.get("model"))
usage = m.get("usage") if m.get("usage") else None
span._span.update(usage=usage, model=m.get("model"))
masci marked this conversation as resolved.
Show resolved Hide resolved
elif tags.get("haystack.component.type") == "OpenAIChatGenerator":
replies = span._data.get("haystack.component.output", {}).get("replies")
if replies:
meta = replies[0].meta
span._span.update(usage=meta.get("usage"), model=meta.get("model"))
usage = meta.get("usage") if meta.get("usage") else None
span._span.update(usage=usage, model=meta.get("model"))
masci marked this conversation as resolved.
Show resolved Hide resolved

pipeline_input = tags.get("haystack.pipeline.input_data", None)
if pipeline_input:
Expand All @@ -137,6 +143,15 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I

span.raw_span().end()
self._context.pop()

if len(self._context) == 1:
# The root span has to be a trace, which need to be removed from the context after the pipeline run
self._context.pop()
masci marked this conversation as resolved.
Show resolved Hide resolved

if self.enforce_flush:
self.flush()

def flush(self):
self._tracer.flush()

def current_span(self) -> Span:
Expand Down
35 changes: 34 additions & 1 deletion integrations/langfuse/tests/test_tracer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from unittest.mock import Mock, MagicMock, patch

from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer
Expand Down Expand Up @@ -38,7 +39,9 @@ def test_create_new_span(self):
assert span.raw_span().operation_name == "operation_name"
assert span.raw_span().metadata == {"tag1": "value1", "tag2": "value2"}

assert len(tracer._context) == 1, "The trace span should have been popped, leaving root span in the context"
assert (
len(tracer._context) == 0
), "The trace span should have been popped, and the root span is closed as well"

# check that update method is called on the span instance with the provided key value pairs
def test_update_span_with_pipeline_input_output_data(self):
Expand Down Expand Up @@ -79,3 +82,33 @@ def end(self):

with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.output_data": "bye"}) as span:
assert span.raw_span()._data["metadata"] == {"haystack.pipeline.output_data": "bye"}

def test_update_span_gets_flushed_by_default(self):
tracer_mock = Mock()

tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False)
with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span:
pass

tracer_mock.flush.assert_called_once()

def test_update_span_flush_disable(self):
os.environ["HAYSTACK_LANGFUSE_ENFORCE_FLUSH"] = "false"
masci marked this conversation as resolved.
Show resolved Hide resolved
tracer_mock = Mock()

from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer

tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False)
with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span:
pass

tracer_mock.flush.assert_not_called()

def test_context_is_empty_after_tracing(self):
tracer_mock = Mock()

tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False)
with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span:
pass

assert tracer._context == []