-
Notifications
You must be signed in to change notification settings - Fork 435
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #772 from pipecat-ai/mb/nim-llm
Add a NIM LLM service
- Loading branch information
Showing
5 changed files
with
252 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
# | ||
# Copyright (c) 2024, Daily | ||
# | ||
# SPDX-License-Identifier: BSD 2-Clause License | ||
# | ||
|
||
import asyncio | ||
import os | ||
import sys | ||
|
||
import aiohttp | ||
from dotenv import load_dotenv | ||
from loguru import logger | ||
from openai.types.chat import ChatCompletionToolParam | ||
from runner import configure | ||
|
||
from pipecat.audio.vad.silero import SileroVADAnalyzer | ||
from pipecat.pipeline.pipeline import Pipeline | ||
from pipecat.pipeline.runner import PipelineRunner | ||
from pipecat.pipeline.task import PipelineParams, PipelineTask | ||
from pipecat.services.cartesia import CartesiaTTSService | ||
from pipecat.services.nim import NimLLMService | ||
from pipecat.services.openai import OpenAILLMContext | ||
from pipecat.transports.services.daily import DailyParams, DailyTransport | ||
|
||
load_dotenv(override=True) | ||
|
||
logger.remove(0) | ||
logger.add(sys.stderr, level="DEBUG") | ||
|
||
|
||
async def start_fetch_weather(function_name, llm, context): | ||
# note: we can't push a frame to the LLM here. the bot | ||
# can interrupt itself and/or cause audio overlapping glitches. | ||
# possible question for Aleix and Chad about what the right way | ||
# to trigger speech is, now, with the new queues/async/sync refactors. | ||
# await llm.push_frame(TextFrame("Let me check on that.")) | ||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}") | ||
|
||
|
||
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback): | ||
await result_callback({"conditions": "nice", "temperature": "75"}) | ||
|
||
|
||
async def main(): | ||
async with aiohttp.ClientSession() as session: | ||
(room_url, token) = await configure(session) | ||
|
||
transport = DailyTransport( | ||
room_url, | ||
token, | ||
"Respond bot", | ||
DailyParams( | ||
audio_out_enabled=True, | ||
transcription_enabled=True, | ||
vad_enabled=True, | ||
vad_analyzer=SileroVADAnalyzer(), | ||
), | ||
) | ||
|
||
tts = CartesiaTTSService( | ||
api_key=os.getenv("CARTESIA_API_KEY"), | ||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady | ||
# text_filter=MarkdownTextFilter(), | ||
) | ||
|
||
llm = NimLLMService( | ||
api_key=os.getenv("NVIDIA_API_KEY"), model="meta/llama-3.1-405b-instruct" | ||
) | ||
# Register a function_name of None to get all functions | ||
# sent to the same callback with an additional function_name parameter. | ||
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather) | ||
|
||
tools = [ | ||
ChatCompletionToolParam( | ||
type="function", | ||
function={ | ||
"name": "get_current_weather", | ||
"description": "Get the current weather", | ||
"parameters": { | ||
"type": "object", | ||
"properties": { | ||
"location": { | ||
"type": "string", | ||
"description": "The city and state, e.g. San Francisco, CA", | ||
}, | ||
"format": { | ||
"type": "string", | ||
"enum": ["celsius", "fahrenheit"], | ||
"description": "The temperature unit to use. Infer this from the users location.", | ||
}, | ||
}, | ||
"required": ["location", "format"], | ||
}, | ||
}, | ||
) | ||
] | ||
messages = [ | ||
{ | ||
"role": "system", | ||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", | ||
}, | ||
] | ||
|
||
context = OpenAILLMContext(messages, tools) | ||
context_aggregator = llm.create_context_aggregator(context) | ||
|
||
pipeline = Pipeline( | ||
[ | ||
transport.input(), | ||
context_aggregator.user(), | ||
llm, | ||
tts, | ||
transport.output(), | ||
context_aggregator.assistant(), | ||
] | ||
) | ||
|
||
task = PipelineTask( | ||
pipeline, | ||
PipelineParams( | ||
allow_interruptions=True, | ||
enable_metrics=True, | ||
enable_usage_metrics=True, | ||
), | ||
) | ||
|
||
@transport.event_handler("on_first_participant_joined") | ||
async def on_first_participant_joined(transport, participant): | ||
await transport.capture_participant_transcription(participant["id"]) | ||
# Kick off the conversation. | ||
await task.queue_frames([context_aggregator.user().get_context_frame()]) | ||
|
||
runner = PipelineRunner() | ||
|
||
await runner.run(task) | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
# | ||
# Copyright (c) 2024, Daily | ||
# | ||
# SPDX-License-Identifier: BSD 2-Clause License | ||
# | ||
|
||
|
||
from pipecat.metrics.metrics import LLMTokenUsage | ||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext | ||
from pipecat.services.openai import OpenAILLMService | ||
|
||
|
||
class NimLLMService(OpenAILLMService): | ||
"""A service for interacting with NVIDIA's NIM (NVIDIA Inference Microservice) API. | ||
This service extends OpenAILLMService to work with NVIDIA's NIM API while maintaining | ||
compatibility with the OpenAI-style interface. It specifically handles the difference | ||
in token usage reporting between NIM (incremental) and OpenAI (final summary). | ||
Args: | ||
api_key (str): The API key for accessing NVIDIA's NIM API | ||
base_url (str, optional): The base URL for NIM API. Defaults to "https://integrate.api.nvidia.com/v1" | ||
model (str, optional): The model identifier to use. Defaults to "nvidia/llama-3.1-nemotron-70b-instruct" | ||
**kwargs: Additional keyword arguments passed to OpenAILLMService | ||
Example: | ||
```python | ||
service = NimLLMService( | ||
api_key="your-api-key", | ||
model="nvidia/llama-3.1-nemotron-70b-instruct" | ||
) | ||
``` | ||
""" | ||
|
||
def __init__( | ||
self, | ||
*, | ||
api_key: str, | ||
base_url: str = "https://integrate.api.nvidia.com/v1", | ||
model: str = "nvidia/llama-3.1-nemotron-70b-instruct", | ||
**kwargs, | ||
): | ||
super().__init__(api_key=api_key, base_url=base_url, model=model, **kwargs) | ||
# Counters for accumulating token usage metrics | ||
self._prompt_tokens = 0 | ||
self._completion_tokens = 0 | ||
self._total_tokens = 0 | ||
self._has_reported_prompt_tokens = False | ||
self._is_processing = False | ||
|
||
async def _process_context(self, context: OpenAILLMContext): | ||
"""Process a context through the LLM and accumulate token usage metrics. | ||
This method overrides the parent class implementation to handle NVIDIA's | ||
incremental token reporting style, accumulating the counts and reporting | ||
them once at the end of processing. | ||
Args: | ||
context (OpenAILLMContext): The context to process, containing messages | ||
and other information needed for the LLM interaction. | ||
""" | ||
# Reset all counters and flags at the start of processing | ||
self._prompt_tokens = 0 | ||
self._completion_tokens = 0 | ||
self._total_tokens = 0 | ||
self._has_reported_prompt_tokens = False | ||
self._is_processing = True | ||
|
||
try: | ||
await super()._process_context(context) | ||
finally: | ||
self._is_processing = False | ||
# Report final accumulated token usage at the end of processing | ||
if self._prompt_tokens > 0 or self._completion_tokens > 0: | ||
self._total_tokens = self._prompt_tokens + self._completion_tokens | ||
tokens = LLMTokenUsage( | ||
prompt_tokens=self._prompt_tokens, | ||
completion_tokens=self._completion_tokens, | ||
total_tokens=self._total_tokens, | ||
) | ||
await super().start_llm_usage_metrics(tokens) | ||
|
||
async def start_llm_usage_metrics(self, tokens: LLMTokenUsage): | ||
"""Accumulate token usage metrics during processing. | ||
This method intercepts the incremental token updates from NVIDIA's API | ||
and accumulates them instead of passing each update to the metrics system. | ||
The final accumulated totals are reported at the end of processing. | ||
Args: | ||
tokens (LLMTokenUsage): The token usage metrics for the current chunk | ||
of processing, containing prompt_tokens and completion_tokens counts. | ||
""" | ||
# Only accumulate metrics during active processing | ||
if not self._is_processing: | ||
return | ||
|
||
# Record prompt tokens the first time we see them | ||
if not self._has_reported_prompt_tokens and tokens.prompt_tokens > 0: | ||
self._prompt_tokens = tokens.prompt_tokens | ||
self._has_reported_prompt_tokens = True | ||
|
||
# Update completion tokens count if it has increased | ||
if tokens.completion_tokens > self._completion_tokens: | ||
self._completion_tokens = tokens.completion_tokens |