Skip to content

Commit

Permalink
Merge pull request #236 from pipecat-ai/aleix/report-only-initial-ttfb
Browse files Browse the repository at this point in the history
report only initial ttfb
  • Loading branch information
aconchillo authored Jun 13, 2024
2 parents 77a3b2e + 1170b30 commit 020b8eb
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Allow specifying frame processors' name through a new `name` constructor
argument.

- Added `report_only_initial_ttfb` to `PipelineParams`. This will make it so
only the initial TTFB metrics after the user stops talking are reported.

### Changed

- `FrameSerializer.deserialize()` can now return `None` in case it is not
Expand Down
3 changes: 2 additions & 1 deletion examples/foundational/07-interruptible.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ async def main(room_url: str, token):

task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True
enable_metrics=True,
report_only_initial_ttfb=True,
))

@transport.event_handler("on_first_participant_joined")
Expand Down
1 change: 1 addition & 0 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class StartFrame(SystemFrame):
"""This is the first frame that should be pushed down a pipeline."""
allow_interruptions: bool = False
enable_metrics: bool = False
report_only_initial_ttfb: bool = False


@dataclass
Expand Down
2 changes: 2 additions & 0 deletions src/pipecat/pipeline/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class PipelineParams(BaseModel):
allow_interruptions: bool = False
enable_metrics: bool = False
report_only_initial_ttfb: bool = False


class Source(FrameProcessor):
Expand Down Expand Up @@ -99,6 +100,7 @@ async def _process_down_queue(self):
start_frame = StartFrame(
allow_interruptions=self._params.allow_interruptions,
enable_metrics=self._params.enable_metrics,
report_only_initial_ttfb=self._params.report_only_initial_ttfb
)
await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM)
await self._source.process_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM)
Expand Down
3 changes: 3 additions & 0 deletions src/pipecat/processors/aggregators/llm_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def role(self):
# S I E I T -> X
# S E T -> X
# S E I T -> X
#
# The following case would not be supported:
#
# S I E T1 I T2 -> X
Expand All @@ -90,6 +91,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
self._seen_start_frame = True
self._seen_end_frame = False
self._seen_interim_results = False
await self.push_frame(frame, direction)
elif isinstance(frame, self._end_frame):
self._seen_end_frame = True
self._seen_start_frame = False
Expand All @@ -102,6 +104,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
# Send the aggregation if we are not aggregating anymore (i.e. no
# more interim results received).
send_aggregation = not self._aggregating
await self.push_frame(frame, direction)
elif isinstance(frame, self._accumulator_frame):
if self._aggregating:
self._aggregation += f" {frame.text}"
Expand Down
6 changes: 5 additions & 1 deletion src/pipecat/processors/aggregators/user_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def __init__(
# S I T E -> X
# S I E T -> X
# S I E I T -> X
# S E T -> X
# S E I T -> X
#
# The following case would not be supported:
#
Expand All @@ -91,18 +93,20 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
self._seen_start_frame = True
self._seen_end_frame = False
self._seen_interim_results = False
await self.push_frame(frame, direction)
elif isinstance(frame, self._end_frame):
self._seen_end_frame = True
self._seen_start_frame = False

# We might have received the end frame but we might still be
# aggregating (i.e. we have seen interim results but not the final
# text).
self._aggregating = self._seen_interim_results
self._aggregating = self._seen_interim_results or len(self._aggregation) == 0

# Send the aggregation if we are not aggregating anymore (i.e. no
# more interim results received).
send_aggregation = not self._aggregating
await self.push_frame(frame, direction)
elif isinstance(frame, self._accumulator_frame):
if self._aggregating:
self._aggregation += f" {frame.text}"
Expand Down
14 changes: 12 additions & 2 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from enum import Enum

from pipecat.frames.frames import ErrorFrame, Frame, MetricsFrame, StartFrame
from pipecat.frames.frames import ErrorFrame, Frame, MetricsFrame, StartFrame, UserStoppedSpeakingFrame
from pipecat.utils.utils import obj_count, obj_id

from loguru import logger
Expand All @@ -36,9 +36,11 @@ def __init__(
# Properties
self._allow_interruptions = False
self._enable_metrics = False
self._report_only_initial_ttfb = False

# Metrics
self._start_ttfb_time = 0
self._should_report_ttfb = True

@property
def interruptions_allowed(self):
Expand All @@ -48,12 +50,17 @@ def interruptions_allowed(self):
def metrics_enabled(self):
return self._enable_metrics

@property
def report_only_initial_ttfb(self):
return self._report_only_initial_ttfb

def can_generate_metrics(self) -> bool:
return False

async def start_ttfb_metrics(self):
if self.metrics_enabled:
if self.metrics_enabled and self._should_report_ttfb:
self._start_ttfb_time = time.time()
self._should_report_ttfb = not self._report_only_initial_ttfb

async def stop_ttfb_metrics(self):
if self.metrics_enabled and self._start_ttfb_time > 0:
Expand All @@ -77,6 +84,9 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame):
self._allow_interruptions = frame.allow_interruptions
self._enable_metrics = frame.enable_metrics
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
elif isinstance(frame, UserStoppedSpeakingFrame):
self._should_report_ttfb = True

async def push_error(self, error: ErrorFrame):
await self.push_frame(error, FrameDirection.UPSTREAM)
Expand Down
4 changes: 2 additions & 2 deletions src/pipecat/services/ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ def __init__(self,
(self._content, self._wave) = self._new_wave()
self._silence_num_frames = 0
# Volume exponential smoothing
self._smoothing_factor = 0.4
self._prev_volume = 1 - self._smoothing_factor
self._smoothing_factor = 0.2
self._prev_volume = 0

@abstractmethod
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
Expand Down
6 changes: 3 additions & 3 deletions src/pipecat/vad/vad_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class VADState(Enum):


class VADParams(BaseModel):
confidence: float = 0.6
confidence: float = 0.7
start_secs: float = 0.2
stop_secs: float = 0.8
min_volume: float = 0.6
Expand All @@ -46,8 +46,8 @@ def __init__(self, sample_rate: int, num_channels: int, params: VADParams):
self._vad_buffer = b""

# Volume exponential smoothing
self._smoothing_factor = 0.4
self._prev_volume = 1 - self._smoothing_factor
self._smoothing_factor = 0.2
self._prev_volume = 0

@property
def sample_rate(self):
Expand Down

0 comments on commit 020b8eb

Please sign in to comment.