From 320e4f2a8f3236421a5be5e309817e75a3729f9e Mon Sep 17 00:00:00 2001 From: Bohan Qu Date: Wed, 30 Oct 2024 19:24:31 +0800 Subject: [PATCH 1/3] feat: add support for ttft --- .../tracing/langfuse/tracer.py | 10 ++- integrations/langfuse/tests/test_tracer.py | 85 ++++++++++++------- 2 files changed, 62 insertions(+), 33 deletions(-) diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index 94064a0d1..c1c216281 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -1,5 +1,6 @@ import contextlib import os +from datetime import datetime from typing import Any, Dict, Iterator, Optional, Union from haystack.components.generators.openai_utils import _convert_message_to_openai_format @@ -148,7 +149,14 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I replies = span._data.get("haystack.component.output", {}).get("replies") if replies: meta = replies[0].meta - span._span.update(usage=meta.get("usage") or None, model=meta.get("model")) + completion_start_time = meta.get("completion_start_time") + if completion_start_time: + completion_start_time = datetime.fromisoformat(completion_start_time) + span._span.update( + usage=meta.get("usage") or None, + model=meta.get("model"), + completion_start_time=completion_start_time, + ) pipeline_input = tags.get("haystack.pipeline.input_data", None) if pipeline_input: diff --git a/integrations/langfuse/tests/test_tracer.py b/integrations/langfuse/tests/test_tracer.py index c6bf4acdf..004424f56 100644 --- a/integrations/langfuse/tests/test_tracer.py +++ b/integrations/langfuse/tests/test_tracer.py @@ -1,9 +1,43 @@ -import os +import datetime from unittest.mock import MagicMock, Mock, patch +from haystack.dataclasses import ChatMessage from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer +class MockSpan: + def __init__(self): + self._data = {} + self._span = self + self.operation_name = "operation_name" + + def raw_span(self): + return self + + def span(self, name=None): + # assert correct operation name passed to the span + assert name == "operation_name" + return self + + def update(self, **kwargs): + self._data.update(kwargs) + + def generation(self, name=None): + return self + + def end(self): + pass + + +class MockTracer: + + def trace(self, name, **kwargs): + return MockSpan() + + def flush(self): + pass + + class TestLangfuseTracer: # LangfuseTracer can be initialized with a Langfuse instance, a name and a boolean value for public. @@ -45,37 +79,6 @@ def test_create_new_span(self): # check that update method is called on the span instance with the provided key value pairs def test_update_span_with_pipeline_input_output_data(self): - class MockTracer: - - def trace(self, name, **kwargs): - return MockSpan() - - def flush(self): - pass - - class MockSpan: - def __init__(self): - self._data = {} - self._span = self - self.operation_name = "operation_name" - - def raw_span(self): - return self - - def span(self, name=None): - # assert correct operation name passed to the span - assert name == "operation_name" - return self - - def update(self, **kwargs): - self._data.update(kwargs) - - def generation(self, name=None): - return self - - def end(self): - pass - tracer = LangfuseTracer(tracer=MockTracer(), name="Haystack", public=False) with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span: assert span.raw_span()._data["metadata"] == {"haystack.pipeline.input_data": "hello"} @@ -83,6 +86,24 @@ def end(self): with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.output_data": "bye"}) as span: assert span.raw_span()._data["metadata"] == {"haystack.pipeline.output_data": "bye"} + def test_trace_generation(self): + tracer = LangfuseTracer(tracer=MockTracer(), name="Haystack", public=False) + tags = { + "haystack.component.type": "OpenAIChatGenerator", + "haystack.component.output": { + "replies": [ + ChatMessage.from_assistant( + "", meta={"completion_start_time": "2021-07-27T16:02:08.012345", "model": "test_model"} + ) + ] + }, + } + with tracer.trace(operation_name="operation_name", tags=tags) as span: + ... + assert span.raw_span()._data["usage"] is None + assert span.raw_span()._data["model"] == "test_model" + assert span.raw_span()._data["completion_start_time"] == datetime.datetime(2021, 7, 27, 16, 2, 8, 12345) + def test_update_span_gets_flushed_by_default(self): tracer_mock = Mock() From e96d05866a9a49fcbbf485ac1c4f86c080cee5a3 Mon Sep 17 00:00:00 2001 From: Bohan Qu Date: Fri, 8 Nov 2024 20:11:26 +0800 Subject: [PATCH 2/3] chore: skip ttft logging if completion start time is invalid --- .../tracing/langfuse/tracer.py | 10 +++++++++- integrations/langfuse/tests/test_tracer.py | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index c1c216281..3e9dc9c6e 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -1,4 +1,5 @@ import contextlib +import logging import os from datetime import datetime from typing import Any, Dict, Iterator, Optional, Union @@ -10,6 +11,9 @@ import langfuse + +logger = logging.getLogger(__name__) + HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR = "HAYSTACK_LANGFUSE_ENFORCE_FLUSH" _SUPPORTED_GENERATORS = [ "AzureOpenAIGenerator", @@ -151,7 +155,11 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I meta = replies[0].meta completion_start_time = meta.get("completion_start_time") if completion_start_time: - completion_start_time = datetime.fromisoformat(completion_start_time) + try: + completion_start_time = datetime.fromisoformat(completion_start_time) + except ValueError: + logger.error(f"Failed to parse completion_start_time: {completion_start_time}") + completion_start_time = None span._span.update( usage=meta.get("usage") or None, model=meta.get("model"), diff --git a/integrations/langfuse/tests/test_tracer.py b/integrations/langfuse/tests/test_tracer.py index 004424f56..65f1ec8fd 100644 --- a/integrations/langfuse/tests/test_tracer.py +++ b/integrations/langfuse/tests/test_tracer.py @@ -104,6 +104,24 @@ def test_trace_generation(self): assert span.raw_span()._data["model"] == "test_model" assert span.raw_span()._data["completion_start_time"] == datetime.datetime(2021, 7, 27, 16, 2, 8, 12345) + def test_trace_generation_invalid_start_time(self): + tracer = LangfuseTracer(tracer=MockTracer(), name="Haystack", public=False) + tags = { + "haystack.component.type": "OpenAIChatGenerator", + "haystack.component.output": { + "replies": [ + ChatMessage.from_assistant( + "", meta={"completion_start_time": "foobar", "model": "test_model"} + ) + ] + }, + } + with tracer.trace(operation_name="operation_name", tags=tags) as span: + ... + assert span.raw_span()._data["usage"] is None + assert span.raw_span()._data["model"] == "test_model" + assert span.raw_span()._data["completion_start_time"] is None + def test_update_span_gets_flushed_by_default(self): tracer_mock = Mock() From 0ab5cc735165baf6d1764c413f7f333d6f8cf767 Mon Sep 17 00:00:00 2001 From: Bohan Qu Date: Sat, 9 Nov 2024 11:27:50 +0800 Subject: [PATCH 3/3] chore: addressing lint issues --- .../src/haystack_integrations/tracing/langfuse/tracer.py | 1 - integrations/langfuse/tests/test_tracer.py | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index 3e9dc9c6e..c9c8a354e 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -11,7 +11,6 @@ import langfuse - logger = logging.getLogger(__name__) HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR = "HAYSTACK_LANGFUSE_ENFORCE_FLUSH" diff --git a/integrations/langfuse/tests/test_tracer.py b/integrations/langfuse/tests/test_tracer.py index 65f1ec8fd..9ee8e5dc4 100644 --- a/integrations/langfuse/tests/test_tracer.py +++ b/integrations/langfuse/tests/test_tracer.py @@ -110,9 +110,7 @@ def test_trace_generation_invalid_start_time(self): "haystack.component.type": "OpenAIChatGenerator", "haystack.component.output": { "replies": [ - ChatMessage.from_assistant( - "", meta={"completion_start_time": "foobar", "model": "test_model"} - ) + ChatMessage.from_assistant("", meta={"completion_start_time": "foobar", "model": "test_model"}), ] }, }