Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible optimisation for BaseInputTransport ThreadPoolExecutor? #910

Open
ovistoica opened this issue Dec 25, 2024 · 0 comments
Open

Possible optimisation for BaseInputTransport ThreadPoolExecutor? #910

ovistoica opened this issue Dec 25, 2024 · 0 comments

Comments

@ovistoica
Copy link

ovistoica commented Dec 25, 2024

Description

While having a conversation with Claude to understand this project better, we hit this subject based on BaseInputTransport.py. Conversation:

Conversation

User: Instruction: Given that this class has a thread pool executor, explain how the task from the event loop runs in this thread pool executor

Selected text:

async def _vad_analyze(self, audio_frames: bytes) -> VADState:
        state = VADState.QUIET
        vad_analyzer = self.vad_analyzer()
        if vad_analyzer:
            state = await self.get_event_loop().run_in_executor(
                self._executor, vad_analyzer.analyze_audio, audio_frames
            )
        return state

AI: This code demonstrates how to run CPU-intensive work in a thread pool without blocking the event loop. Here's how it works:

  1. Thread Pool Setup:
self._executor = ThreadPoolExecutor(max_workers=5)
  1. Execution Flow:
await self.get_event_loop().run_in_executor(
    self._executor,           # Thread pool to use
    vad_analyzer.analyze_audio,  # CPU-intensive function
    audio_frames              # Arguments to the function
)

The process:

  1. Event loop receives request to analyze audio
  2. Instead of blocking, delegates work to thread pool
  3. Thread pool executes vad_analyzer.analyze_audio
  4. Event loop continues processing other tasks
  5. When thread completes, result returns to event loop

This pattern is used because:

  • VAD analysis is CPU-intensive
  • Would block event loop if run directly
  • Thread pool allows parallel processing
  • Keeps event loop responsive

The await ensures we get the result before continuing but doesn't block the event loop while waiting.

User: If I create 10 ThreadPoolExecutors with max threads 5 and my cpu has 8 cores, how will this be handled?

AI: Creating 10 ThreadPoolExecutors with 5 max_workers each means you're potentially creating a pool of 50 threads, but this doesn't map directly to CPU cores. Here's what happens:

  1. Thread Creation:
  • 10 pools × 5 threads = 50 potential threads
  • Threads are OS-level resources, not bound to CPU cores
  • All 50 threads can exist simultaneously
  1. CPU Core Usage:
  • 8 cores means only 8 threads run truly parallel
  • OS scheduler manages thread-to-core mapping
  • Threads time-share CPU cores through context switching
  1. Performance Impact:
  • Context switching adds overhead
  • Memory usage increases with each thread
  • Too many threads can degrade performance

Better Approach:

# Single pool sized for your CPU
executor = ThreadPoolExecutor(max_workers=min(32, os.cpu_count() + 4))

Rule of thumb:

  • CPU-bound: use cores + 1 threads
  • I/O-bound: use cores × 2 or more threads
  • Avoid multiple pools unless specifically needed

Creating multiple ThreadPoolExecutors is usually unnecessary and potentially harmful to performance.

User: I see. So in the case of BaseInputTransport, the fact that it creates a ThreadPoolExecutor for each instantiation is bad?

AI: Yes, the current implementation in BaseInputTransport could be improved. Looking at the code:

class BaseInputTransport(FrameProcessor):
    def __init__(self, params: TransportParams, *kwargs):
        super().__init__(*kwargs)
        self._params = params
        self._executor = ThreadPoolExecutor(max_workers=5)  # Created per instance!

Problems:

  1. Each transport instance creates its own thread pool
  2. Resources are wasted if multiple transports exist
  3. No central control over total threads

Better Design:

* At module level or in a shared resource manager
_shared_executor = ThreadPoolExecutor(
    max_workers=min(32, os.cpu_count() + 4)
)

class BaseInputTransport(FrameProcessor):
    def __init__(self, params: TransportParams, *kwargs):
        super().__init__(*kwargs)
        self._params = params
        self._executor = _shared_executor  # Use shared pool

Or even better:

class TransportExecutor:
    _instance = None

    @classmethod
    def get_executor(cls):
        if cls._instance is None:
            cls._instance = ThreadPoolExecutor(
                max_workers=min(32, os.cpu_count() + 4)
            )
        return cls._instance

This would be a good issue to raise on the project - it's an easy optimization with real benefits.

// End conversation

Issue description

Based on this conversation, the BaseInputTransport class creates a new ThreadPoolExecutor for each instantiation. I realize that for each pipeline, you will have just one transport input; however, if you create a big application that makes and receives calls, you can have multiple pipelines in parallel, and then you potentially can have multiple Thread Pools for the same work. Code:

class BaseInputTransport(FrameProcessor):
    def __init__(self, params: TransportParams, **kwargs):
        super().__init__(**kwargs)

        self._params = params

       # new thread pool for each instantiation
        self._executor = ThreadPoolExecutor(max_workers=5)

        # Task to process incoming audio (VAD) and push audio frames downstream
        # if passthrough is enabled.
        self._audio_task = None

Please let me know if this issue is relevant. I can open a PR with a fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant