diff --git a/CHANGELOG.md b/CHANGELOG.md index 009625294..f12c5806e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,12 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- `GroqLLMService` and `GrokLLMService` for Groq and Grok API integration, with - OpenAI-compatible interface. +- Added `GroqLLMService`, `GrokLLMService`, and `NimLLMService` for Groq, Grok, + and NVIDIA NIM API integration, with an OpenAI-compatible interface. - New examples demonstrating function calling with Groq, Grok, Azure OpenAI, - and Fireworks: `14f-function-calling-groq.py`, `14g-function-calling-grok.py`, - `14h-function-calling-azure.py`, and `14i-function-calling-fireworks.py`. + Fireworks, and NVIDIA NIM: `14f-function-calling-groq.py`, + `14g-function-calling-grok.py`, `14h-function-calling-azure.py`, + `14i-function-calling-fireworks.py`, and `14j-function-calling-nvidia.py`. - In order to obtain the audio stored by the `AudioBufferProcessor` you can now also register an `on_audio_data` event handler. The `on_audio_data` handler diff --git a/README.md b/README.md index c38b84422..65174b561 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ Available options include: | Category | Services | Install Command Example | | ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------- | | Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/api-reference/services/stt/assemblyai), [Azure](https://docs.pipecat.ai/api-reference/services/stt/azure), [Deepgram](https://docs.pipecat.ai/api-reference/services/stt/deepgram), [Gladia](https://docs.pipecat.ai/api-reference/services/stt/gladia), [Whisper](https://docs.pipecat.ai/api-reference/services/stt/whisper) | `pip install "pipecat-ai[deepgram]"` | -| LLMs | [Anthropic](https://docs.pipecat.ai/api-reference/services/llm/anthropic), [Azure](https://docs.pipecat.ai/api-reference/services/llm/azure), [Fireworks AI](https://docs.pipecat.ai/api-reference/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/api-reference/services/llm/gemini), [Grok](https://docs.pipecat.ai/api-reference/services/llm/grok), [Groq](https://docs.pipecat.ai/api-reference/services/llm/groq) [Ollama](https://docs.pipecat.ai/api-reference/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/api-reference/services/llm/openai), [Together AI](https://docs.pipecat.ai/api-reference/services/llm/together) | `pip install "pipecat-ai[openai]"` | +| LLMs | [Anthropic](https://docs.pipecat.ai/api-reference/services/llm/anthropic), [Azure](https://docs.pipecat.ai/api-reference/services/llm/azure), [Fireworks AI](https://docs.pipecat.ai/api-reference/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/api-reference/services/llm/gemini), [Grok](https://docs.pipecat.ai/api-reference/services/llm/grok), [Groq](https://docs.pipecat.ai/api-reference/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/api-reference/services/llm/nim), [Ollama](https://docs.pipecat.ai/api-reference/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/api-reference/services/llm/openai), [Together AI](https://docs.pipecat.ai/api-reference/services/llm/together) | `pip install "pipecat-ai[openai]"` | | Text-to-Speech | [AWS](https://docs.pipecat.ai/api-reference/services/tts/aws), [Azure](https://docs.pipecat.ai/api-reference/services/tts/azure), [Cartesia](https://docs.pipecat.ai/api-reference/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/api-reference/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/api-reference/services/tts/elevenlabs), [Google](https://docs.pipecat.ai/api-reference/services/tts/google), [LMNT](https://docs.pipecat.ai/api-reference/services/tts/lmnt), [OpenAI](https://docs.pipecat.ai/api-reference/services/tts/openai), [PlayHT](https://docs.pipecat.ai/api-reference/services/tts/playht), [Rime](https://docs.pipecat.ai/api-reference/services/tts/rime), [XTTS](https://docs.pipecat.ai/api-reference/services/tts/xtts) | `pip install "pipecat-ai[cartesia]"` | | Speech-to-Speech | [OpenAI Realtime](https://docs.pipecat.ai/api-reference/services/s2s/openai) | `pip install "pipecat-ai[openai]"` | | Transport | [Daily (WebRTC)](https://docs.pipecat.ai/api-reference/services/transport/daily), WebSocket, Local | `pip install "pipecat-ai[daily]"` | diff --git a/examples/foundational/14j-function-calling-nim.py b/examples/foundational/14j-function-calling-nim.py new file mode 100644 index 000000000..68bb3b52e --- /dev/null +++ b/examples/foundational/14j-function-calling-nim.py @@ -0,0 +1,140 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from openai.types.chat import ChatCompletionToolParam +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.nim import NimLLMService +from pipecat.services.openai import OpenAILLMContext +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def start_fetch_weather(function_name, llm, context): + # note: we can't push a frame to the LLM here. the bot + # can interrupt itself and/or cause audio overlapping glitches. + # possible question for Aleix and Chad about what the right way + # to trigger speech is, now, with the new queues/async/sync refactors. + # await llm.push_frame(TextFrame("Let me check on that.")) + logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}") + + +async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback): + await result_callback({"conditions": "nice", "temperature": "75"}) + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + # text_filter=MarkdownTextFilter(), + ) + + llm = NimLLMService( + api_key=os.getenv("NVIDIA_API_KEY"), model="meta/llama-3.1-405b-instruct" + ) + # Register a function_name of None to get all functions + # sent to the same callback with an additional function_name parameter. + llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather) + + tools = [ + ChatCompletionToolParam( + type="function", + function={ + "name": "get_current_weather", + "description": "Get the current weather", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the users location.", + }, + }, + "required": ["location", "format"], + }, + }, + ) + ] + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages, tools) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + PipelineParams( + allow_interruptions=True, + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 321d3a9bb..d487a7dd7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,7 @@ livekit = [ "livekit~=0.17.5", "livekit-api~=0.7.1", "tenacity~=8.5.0" ] lmnt = [ "lmnt~=1.1.4" ] local = [ "pyaudio~=0.2.14" ] moondream = [ "einops~=0.8.0", "timm~=1.0.8", "transformers~=4.44.0" ] +nim = [ "openai~=1.50.2" ] noisereduce = [ "noisereduce~=3.0.3" ] openai = [ "openai~=1.50.2", "websockets~=13.1", "python-deepcompare~=1.0.1" ] openpipe = [ "openpipe~=4.24.0" ] diff --git a/src/pipecat/services/nim.py b/src/pipecat/services/nim.py new file mode 100644 index 000000000..0ce0171c9 --- /dev/null +++ b/src/pipecat/services/nim.py @@ -0,0 +1,105 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + + +from pipecat.metrics.metrics import LLMTokenUsage +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.openai import OpenAILLMService + + +class NimLLMService(OpenAILLMService): + """A service for interacting with NVIDIA's NIM (NVIDIA Inference Microservice) API. + + This service extends OpenAILLMService to work with NVIDIA's NIM API while maintaining + compatibility with the OpenAI-style interface. It specifically handles the difference + in token usage reporting between NIM (incremental) and OpenAI (final summary). + + Args: + api_key (str): The API key for accessing NVIDIA's NIM API + base_url (str, optional): The base URL for NIM API. Defaults to "https://integrate.api.nvidia.com/v1" + model (str, optional): The model identifier to use. Defaults to "nvidia/llama-3.1-nemotron-70b-instruct" + **kwargs: Additional keyword arguments passed to OpenAILLMService + + Example: + ```python + service = NimLLMService( + api_key="your-api-key", + model="nvidia/llama-3.1-nemotron-70b-instruct" + ) + ``` + """ + + def __init__( + self, + *, + api_key: str, + base_url: str = "https://integrate.api.nvidia.com/v1", + model: str = "nvidia/llama-3.1-nemotron-70b-instruct", + **kwargs, + ): + super().__init__(api_key=api_key, base_url=base_url, model=model, **kwargs) + # Counters for accumulating token usage metrics + self._prompt_tokens = 0 + self._completion_tokens = 0 + self._total_tokens = 0 + self._has_reported_prompt_tokens = False + self._is_processing = False + + async def _process_context(self, context: OpenAILLMContext): + """Process a context through the LLM and accumulate token usage metrics. + + This method overrides the parent class implementation to handle NVIDIA's + incremental token reporting style, accumulating the counts and reporting + them once at the end of processing. + + Args: + context (OpenAILLMContext): The context to process, containing messages + and other information needed for the LLM interaction. + """ + # Reset all counters and flags at the start of processing + self._prompt_tokens = 0 + self._completion_tokens = 0 + self._total_tokens = 0 + self._has_reported_prompt_tokens = False + self._is_processing = True + + try: + await super()._process_context(context) + finally: + self._is_processing = False + # Report final accumulated token usage at the end of processing + if self._prompt_tokens > 0 or self._completion_tokens > 0: + self._total_tokens = self._prompt_tokens + self._completion_tokens + tokens = LLMTokenUsage( + prompt_tokens=self._prompt_tokens, + completion_tokens=self._completion_tokens, + total_tokens=self._total_tokens, + ) + await super().start_llm_usage_metrics(tokens) + + async def start_llm_usage_metrics(self, tokens: LLMTokenUsage): + """Accumulate token usage metrics during processing. + + This method intercepts the incremental token updates from NVIDIA's API + and accumulates them instead of passing each update to the metrics system. + The final accumulated totals are reported at the end of processing. + + Args: + tokens (LLMTokenUsage): The token usage metrics for the current chunk + of processing, containing prompt_tokens and completion_tokens counts. + """ + # Only accumulate metrics during active processing + if not self._is_processing: + return + + # Record prompt tokens the first time we see them + if not self._has_reported_prompt_tokens and tokens.prompt_tokens > 0: + self._prompt_tokens = tokens.prompt_tokens + self._has_reported_prompt_tokens = True + + # Update completion tokens count if it has increased + if tokens.completion_tokens > self._completion_tokens: + self._completion_tokens = tokens.completion_tokens