Skip to content

Commit

Permalink
introduce PipelineParams
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed May 28, 2024
1 parent 650a2b4 commit 7a4cccf
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 11 deletions.
4 changes: 2 additions & 2 deletions examples/foundational/07-interruptible.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.elevenlabs import ElevenLabsTTSService
Expand Down Expand Up @@ -74,7 +74,7 @@ async def main(room_url: str, token):
tma_out # Assistant spoken responses
])

task = PipelineTask(pipeline, allow_interruptions=True)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
4 changes: 2 additions & 2 deletions examples/foundational/07a-interruptible-anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.elevenlabs import ElevenLabsTTSService
Expand Down Expand Up @@ -77,7 +77,7 @@ async def main(room_url: str, token):
tma_out # Assistant spoken responses
])

task = PipelineTask(pipeline, allow_interruptions=True)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
4 changes: 2 additions & 2 deletions examples/foundational/14-wake-phrase.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pipecat.processors.filters.wake_check_filter import WakeCheckFilter
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.elevenlabs import ElevenLabsTTSService
Expand Down Expand Up @@ -77,7 +77,7 @@ async def main(room_url: str, token):
tma_out # Assistant spoken responses
])

task = PipelineTask(pipeline, allow_interruptions=True)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
4 changes: 2 additions & 2 deletions examples/simple-chatbot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import (
AudioRawFrame,
Expand Down Expand Up @@ -149,7 +149,7 @@ async def main(room_url: str, token):
assistant_response,
])

task = PipelineTask(pipeline, allow_interruptions=True)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
await task.queue_frame(quiet_frame)

@transport.event_handler("on_first_participant_joined")
Expand Down
12 changes: 9 additions & 3 deletions src/pipecat/pipeline/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@

from typing import AsyncIterable, Iterable

from pydantic import BaseModel

from pipecat.frames.frames import CancelFrame, EndFrame, ErrorFrame, Frame, StartFrame, StopTaskFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.utils import obj_count, obj_id

from loguru import logger


class PipelineParams(BaseModel):
allow_interruptions: bool = False


class Source(FrameProcessor):

def __init__(self, up_queue: asyncio.Queue):
Expand All @@ -31,12 +37,12 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):

class PipelineTask:

def __init__(self, pipeline: FrameProcessor, allow_interruptions=False):
def __init__(self, pipeline: FrameProcessor, params: PipelineParams = PipelineParams()):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"

self._pipeline = pipeline
self._allow_interruptions = allow_interruptions
self._params = params

self._down_queue = asyncio.Queue()
self._up_queue = asyncio.Queue()
Expand Down Expand Up @@ -77,7 +83,7 @@ async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):

async def _process_down_queue(self):
await self._source.process_frame(
StartFrame(allow_interruptions=self._allow_interruptions), FrameDirection.DOWNSTREAM)
StartFrame(allow_interruptions=self._params.allow_interruptions), FrameDirection.DOWNSTREAM)
running = True
should_cleanup = True
while running:
Expand Down

0 comments on commit 7a4cccf

Please sign in to comment.