Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding langfuse for openai #847

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dot-env.template
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,8 @@ SIMLI_FACE_ID=...

# Krisp
KRISP_MODEL_PATH=...

# Langfuse
LANGFUSE_HOST=...
LANGFUSE_PUBLIC_KEY=...
LANGFUSE_SECRET_KEY=...
77 changes: 77 additions & 0 deletions examples/foundational/28-langfuse-llm-tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import aiohttp
import os
import sys

from langfuse.decorators import observe, langfuse_context

from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport

from runner import configure

from loguru import logger

from dotenv import load_dotenv

load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")

@observe()
async def main():

async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)

langfuse_context.update_current_trace(
# session_id=, # fill in with session_id
# user_id=user_id, # fill in with user_id/participant
metadata={
"room_url": room_url
}, tags=["langfuse", "llm", "daily"]
)

transport = DailyTransport(
room_url, None, "Say One Thing From an LLM", DailyParams(audio_out_enabled=True)
)

tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)

llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")

messages = [
{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world.",
}
]

runner = PipelineRunner()

task = PipelineTask(Pipeline([llm, tts, transport.output()]))

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await task.queue_frames([LLMMessagesFrame(messages), EndFrame()])

await runner.run(task)


if __name__ == "__main__":
asyncio.run(main())
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ gstreamer = [ "pygobject~=3.48.2" ]
fireworks = [ "openai~=1.57.2" ]
krisp = [ "pipecat-ai-krisp~=0.3.0" ]
langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ]
langfuse = [ "langfuse==2.54.1"]
livekit = [ "livekit~=0.17.5", "livekit-api~=0.7.1", "tenacity~=8.5.0" ]
lmnt = [ "lmnt~=1.1.4" ]
local = [ "pyaudio~=0.2.14" ]
Expand Down
24 changes: 24 additions & 0 deletions src/pipecat/services/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,29 @@
raise Exception(f"Missing module: {e}")


try:
from langfuse.openai import AsyncOpenAI
from langfuse.decorators import observe
except ModuleNotFoundError as e:
logger.warning(f"Langfuse is not installed. Exception: {e}")
logger.warning(
"Langfuse integration is optional. To enable it, install the Langfuse package with "
"`pip install pipecat-ai[langfuse]` and set the `LANGFUSE_HOST`, "
"`LANGFUSE_PUBLIC_KEY`, and `LANGFUSE_SECRET_KEY` environment variables."
)

# Dummy observe decorator
def observe(*args, **kwargs):
def decorator(func):
async def wrapper(*func_args, **func_kwargs):
# Log or print that the dummy observe is being used, if needed
logger.debug("Using dummy observe decorator.")
return await func(*func_args, **func_kwargs)

return wrapper

return decorator

ValidVoice = Literal["alloy", "echo", "fable", "onyx", "nova", "shimmer"]

VALID_VOICES: Dict[str, ValidVoice] = {
Expand Down Expand Up @@ -144,6 +167,7 @@ def create_client(self, api_key=None, base_url=None, **kwargs):
def can_generate_metrics(self) -> bool:
return True

@observe
async def get_chat_completions(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
) -> AsyncStream[ChatCompletionChunk]:
Expand Down