Skip to content

Commit

Permalink
make model_name and set_model_name built in to all AIService types fo…
Browse files Browse the repository at this point in the history
…r consistency and metrics usage
  • Loading branch information
mattieruth committed Sep 20, 2024
1 parent 16b3341 commit 7048b86
Show file tree
Hide file tree
Showing 21 changed files with 150 additions and 125 deletions.
5 changes: 3 additions & 2 deletions examples/foundational/06-listen-and-respond.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
elif isinstance(d, ProcessingMetricsData):
print(f"!!! MetricsFrame: {frame}, processing: {d.value}")
elif isinstance(d, LLMUsageMetricsData):
tokens = d.value
print(
f"!!! MetricsFrame: {frame}, tokens: {
d.prompt_tokens}, characters: {
d.completion_tokens}")
tokens.prompt_tokens}, characters: {
tokens.completion_tokens}")
elif isinstance(d, TTSUsageMetricsData):
print(f"!!! MetricsFrame: {frame}, characters: {d.value}")
await self.push_frame(frame, direction)
Expand Down
15 changes: 6 additions & 9 deletions src/pipecat/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,28 @@

class MetricsData(BaseModel):
processor: str
model: Optional[str] = None


class TTFBMetricsData(MetricsData):
value: float
model: Optional[str] = None


class ProcessingMetricsData(MetricsData):
value: float
model: Optional[str] = None


class LLMUsageMetricsData(MetricsData):
model: str
class LLMTokenUsage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int
cache_read_input_tokens: Optional[int] = None
cache_creation_input_tokens: Optional[int] = None


class CacheUsageMetricsData(LLMUsageMetricsData):
cache_read_input_tokens: int
cache_creation_input_tokens: int
class LLMUsageMetricsData(MetricsData):
value: LLMTokenUsage


class TTSUsageMetricsData(MetricsData):
processor: str
model: Optional[str] = None
value: int
82 changes: 56 additions & 26 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame)
from pipecat.metrics.metrics import LLMUsageMetricsData, ProcessingMetricsData, TTFBMetricsData, TTSUsageMetricsData
from pipecat.metrics.metrics import (
LLMTokenUsage,
LLMUsageMetricsData,
MetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData)
from pipecat.utils.utils import obj_count, obj_id

from loguru import logger
Expand All @@ -32,47 +38,68 @@ class FrameDirection(Enum):

class FrameProcessorMetrics:
def __init__(self, name: str):
self._name = name
self._core_metrics_data = MetricsData(processor=name)
self._start_ttfb_time = 0
self._start_processing_time = 0
self._should_report_ttfb = True

def _processor_name(self):
return self._core_metrics_data.processor

def _model_name(self):
return self._core_metrics_data.model

def set_core_metrics_data(self, data: MetricsData):
self._core_metrics_data = data

async def start_ttfb_metrics(self, report_only_initial_ttfb):
if self._should_report_ttfb:
self._start_ttfb_time = time.time()
self._should_report_ttfb = not report_only_initial_ttfb

async def stop_ttfb_metrics(self, model: str | None = None):
async def stop_ttfb_metrics(self):
if self._start_ttfb_time == 0:
return None

value = time.time() - self._start_ttfb_time
logger.debug(f"{self._name} TTFB: {value}")
ttfb = TTFBMetricsData(processor=self._name, value=value, model=model)
logger.debug(f"{self._processor_name()} TTFB: {value}")
ttfb = TTFBMetricsData(
processor=self._processor_name(),
value=value,
model=self._model_name())
self._start_ttfb_time = 0
return MetricsFrame(data=[ttfb])

async def start_processing_metrics(self):
self._start_processing_time = time.time()

async def stop_processing_metrics(self, model: str | None = None):
async def stop_processing_metrics(self):
if self._start_processing_time == 0:
return None

value = time.time() - self._start_processing_time
logger.debug(f"{self._name} processing time: {value}")
processing = ProcessingMetricsData(processor=self._name, value=value, model=model)
logger.debug(f"{self._processor_name()} processing time: {value}")
processing = ProcessingMetricsData(
processor=self._processor_name(), value=value, model=self._model_name())
self._start_processing_time = 0
return MetricsFrame(data=[processing])

async def start_llm_usage_metrics(self, usage_params: LLMUsageMetricsData):
logger.debug(f"{self._name} prompt tokens: {
usage_params.prompt_tokens}, completion tokens: {usage_params.completion_tokens}")
return MetricsFrame(data=[usage_params])
async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
logger.debug(f"{self._processor_name()} prompt tokens: {
tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}")
value = LLMUsageMetricsData(
processor=self._processor_name(),
model=self._model_name(),
value=tokens)
return MetricsFrame(data=[value])

async def start_tts_usage_metrics(self, usage_params: TTSUsageMetricsData):
logger.debug(f"{self._name} usage characters: {usage_params.value}")
return MetricsFrame(data=[usage_params])
async def start_tts_usage_metrics(self, text: str):
characters = TTSUsageMetricsData(
processor=self._processor_name(),
model=self._model_name(),
value=len(text))
logger.debug(f"{self._processor_name()} usage characters: {characters.value}")
return MetricsFrame(data=[characters])


class FrameProcessor:
Expand Down Expand Up @@ -131,41 +158,44 @@ def report_only_initial_ttfb(self):
def can_generate_metrics(self) -> bool:
return False

def set_core_metrics_data(self, data: MetricsData):
self._metrics.set_core_metrics_data(data)

async def start_ttfb_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb)

async def stop_ttfb_metrics(self, model: str | None = None):
async def stop_ttfb_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_ttfb_metrics(model)
frame = await self._metrics.stop_ttfb_metrics()
if frame:
await self.push_frame(frame)

async def start_processing_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_processing_metrics()

async def stop_processing_metrics(self, model: str | None = None):
async def stop_processing_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_processing_metrics(model)
frame = await self._metrics.stop_processing_metrics()
if frame:
await self.push_frame(frame)

async def start_llm_usage_metrics(self, usage_params: LLMUsageMetricsData):
async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
if self.can_generate_metrics() and self.usage_metrics_enabled:
frame = await self._metrics.start_llm_usage_metrics(usage_params)
frame = await self._metrics.start_llm_usage_metrics(tokens)
if frame:
await self.push_frame(frame)

async def start_tts_usage_metrics(self, usage_params: TTSUsageMetricsData):
async def start_tts_usage_metrics(self, text: str):
if self.can_generate_metrics() and self.usage_metrics_enabled:
frame = await self._metrics.start_tts_usage_metrics(usage_params)
frame = await self._metrics.start_tts_usage_metrics(text)
if frame:
await self.push_frame(frame)

async def stop_all_metrics(self, model: str | None = None):
await self.stop_ttfb_metrics(model)
await self.stop_processing_metrics(model)
async def stop_all_metrics(self):
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()

async def cleanup(self):
pass
Expand Down
15 changes: 10 additions & 5 deletions src/pipecat/services/ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from abc import abstractmethod
from typing import AsyncGenerator, List, Optional, Tuple

from attr import has
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
Expand All @@ -33,6 +32,7 @@
UserImageRequestFrame,
VisionImageRawFrame
)
from pipecat.metrics.metrics import MetricsData
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transcriptions.language import Language
from pipecat.utils.audio import calculate_audio_volume
Expand All @@ -47,6 +47,11 @@
class AIService(FrameProcessor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.model_name: str = ""

def set_model_name(self, model: str):
self.model_name = model
self.set_core_metrics_data(MetricsData(processor=self.name, model=self.model_name))

async def start(self, frame: StartFrame):
pass
Expand Down Expand Up @@ -159,7 +164,7 @@ def sample_rate(self) -> int:

@abstractmethod
async def set_model(self, model: str):
pass
self.set_model_name(model)

@abstractmethod
async def set_voice(self, voice: str):
Expand Down Expand Up @@ -368,7 +373,7 @@ def __init__(self, **kwargs):

@abstractmethod
async def set_model(self, model: str):
pass
self.set_model_name(model)

@abstractmethod
async def set_language(self, language: Language):
Expand Down Expand Up @@ -483,7 +488,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
await self.push_frame(frame, direction)
await self.start_processing_metrics()
await self.process_generator(self.run_image_gen(frame.text))
await self.stop_processing_metrics(self._model if hasattr(self, "_model") else None)
await self.stop_processing_metrics()
else:
await self.push_frame(frame, direction)

Expand All @@ -505,6 +510,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, VisionImageRawFrame):
await self.start_processing_metrics()
await self.process_generator(self.run_vision(frame))
await self.stop_processing_metrics(self._model if hasattr(self, "_model") else None)
await self.stop_processing_metrics()
else:
await self.push_frame(frame, direction)
16 changes: 7 additions & 9 deletions src/pipecat/services/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
FunctionCallInProgressFrame,
StartInterruptionFrame
)
from pipecat.metrics.metrics import CacheUsageMetricsData
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService
from pipecat.processors.aggregators.openai_llm_context import (
Expand Down Expand Up @@ -85,7 +85,7 @@ def __init__(
**kwargs):
super().__init__(**kwargs)
self._client = AsyncAnthropic(api_key=api_key)
self._model = model
self.set_model_name(model)
self._max_tokens = max_tokens
self._enable_prompt_caching_beta = enable_prompt_caching_beta

Expand Down Expand Up @@ -138,11 +138,11 @@ async def _process_context(self, context: OpenAILLMContext):
tools=context.tools or [],
system=context.system,
messages=messages,
model=self._model,
model=self.model_name,
max_tokens=self._max_tokens,
stream=True)

await self.stop_ttfb_metrics(self._model)
await self.stop_ttfb_metrics()

# Function calling
tool_use_block = None
Expand Down Expand Up @@ -206,7 +206,7 @@ async def _process_context(self, context: OpenAILLMContext):
except Exception as e:
logger.exception(f"{self} exception: {e}")
finally:
await self.stop_processing_metrics(self._model)
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
comp_tokens = completion_tokens if not use_completion_tokens_estimate else completion_tokens_estimate
await self._report_usage_metrics(
Expand All @@ -232,7 +232,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
context = AnthropicLLMContext.from_image_frame(frame)
elif isinstance(frame, LLMModelUpdateFrame):
logger.debug(f"Switching LLM model to: [{frame.model}]")
self._model = frame.model
self.set_model_name(frame.model)
elif isinstance(frame, LLMEnablePromptCachingFrame):
logger.debug(f"Setting enable prompt caching to: [{frame.enable}]")
self._enable_prompt_caching_beta = frame.enable
Expand All @@ -252,9 +252,7 @@ async def _report_usage_metrics(
cache_creation_input_tokens: int,
cache_read_input_tokens: int):
if prompt_tokens or completion_tokens or cache_creation_input_tokens or cache_read_input_tokens:
tokens = CacheUsageMetricsData(
processor=self.name,
model=self._model,
tokens = LLMTokenUsage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cache_creation_input_tokens=cache_creation_input_tokens,
Expand Down
4 changes: 2 additions & 2 deletions src/pipecat/services/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
result = await asyncio.to_thread(self._speech_synthesizer.speak_ssml, (ssml))

if result.reason == ResultReason.SynthesizingAudioCompleted:
await self.start_tts_usage_metrics(TTSUsageMetricsData(processor=self.name, value=len(text)))
await self.start_tts_usage_metrics(text)
await self.stop_ttfb_metrics()
await self.push_frame(TTSStartedFrame())
# Azure always sends a 44-byte header. Strip it off.
Expand Down Expand Up @@ -192,7 +192,7 @@ def __init__(
self._api_key = api_key
self._azure_endpoint = endpoint
self._api_version = api_version
self._model = model
self.set_model_name(model)
self._image_size = image_size
self._aiohttp_session = aiohttp_session

Expand Down
Loading

0 comments on commit 7048b86

Please sign in to comment.