#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Idle frame processor for timeout-based callback execution."""
import asyncio
from collections.abc import Awaitable, Callable
from pipecat.frames.frames import Frame, StartFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
[docs]
class IdleFrameProcessor(FrameProcessor):
"""Monitors frame activity and triggers callbacks on timeout.
This processor waits to receive any frame or specific frame types within a
given timeout period. If the timeout is reached before receiving the expected
frames, the provided callback will be executed.
"""
[docs]
def __init__(
self,
*,
callback: Callable[["IdleFrameProcessor"], Awaitable[None]],
timeout: float,
types: list[type] | None = None,
**kwargs,
):
"""Initialize the idle frame processor.
Args:
callback: Async callback function to execute on timeout. Receives
this processor instance as an argument.
timeout: Timeout duration in seconds before triggering the callback.
types: Optional list of frame types to monitor. If None, monitors
all frames.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
self._callback = callback
self._timeout = timeout
self._types = types or []
self._idle_task = None
[docs]
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and manage idle timeout monitoring.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
self._create_idle_task()
await self.push_frame(frame, direction)
# If we are not waiting for any specific frame set the event, otherwise
# check if we have received one of the desired frames.
if not self._types:
self._idle_event.set()
else:
for t in self._types:
if isinstance(frame, t):
self._idle_event.set()
[docs]
async def cleanup(self):
"""Clean up resources and cancel pending tasks."""
if self._idle_task:
await self.cancel_task(self._idle_task)
def _create_idle_task(self):
"""Create and start the idle monitoring task."""
if not self._idle_task:
self._idle_event = asyncio.Event()
self._idle_task = self.create_task(self._idle_task_handler())
async def _idle_task_handler(self):
"""Handle idle timeout monitoring and callback execution."""
while True:
try:
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
self._idle_event.clear()
except TimeoutError:
await self._callback(self)