Source code for pipecat.utils.frame_queue

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Frame queue utilities for Pipecat pipeline processors."""

import asyncio
from collections.abc import Callable
from typing import Any

from pipecat.frames.frames import Frame, UninterruptibleFrame


[docs] class FrameQueue(asyncio.Queue): """An asyncio.Queue that tracks whether any UninterruptibleFrame is enqueued. Extends ``asyncio.Queue`` and maintains an O(1) ``has_uninterruptible`` flag so interrupt-handling code can decide whether to cancel a task or merely drain non-uninterruptible items without scanning the queue. Items may be raw ``Frame`` objects or tuples whose first element is a ``Frame`` (e.g. ``(frame, direction, callback)``). Pass a ``frame_getter`` callable to extract the frame from each item; the default treats the item itself as the frame. Also exposes a ``reset()`` helper that drains all non-``UninterruptibleFrame`` items while keeping uninterruptible ones in place. """
[docs] def __init__(self, frame_getter: Callable[[Any], Frame] = lambda item: item): """Initialize the FrameQueue. Args: frame_getter: Callable that extracts a ``Frame`` from a queue item. Defaults to the identity function (item is a raw ``Frame``). Pass ``lambda item: item[0]`` when items are ``(frame, direction, callback)`` tuples. """ super().__init__() self._frame_getter = frame_getter self._uninterruptible_count: int = 0
[docs] def has_frame(self, frame_type: type[Frame] | type[UninterruptibleFrame]) -> bool: """Return True if any frame of the given type is in the queue. ``frame_type`` may be ``Frame``, ``UninterruptibleFrame`` (a mixin, not a ``Frame`` subclass), or any concrete frame type. Note: This inspects the internal `_queue` (deque) of asyncio.Queue. This is not part of the public API but is stable in CPython. Args: frame_type: The frame class to check for. Returns: True if at least one enqueued frame is an instance of ``frame_type``. """ for item in self._queue: # pyright: ignore[reportAttributeAccessIssue] if isinstance(self._frame_getter(item), frame_type): return True return False
@property def has_uninterruptible(self) -> bool: """Return True if any UninterruptibleFrame is currently in the queue.""" return self._uninterruptible_count > 0 def _put(self, item: Any) -> None: if isinstance(self._frame_getter(item), UninterruptibleFrame): self._uninterruptible_count += 1 super()._put(item) def _get(self) -> Any: item = super()._get() if isinstance(self._frame_getter(item), UninterruptibleFrame): self._uninterruptible_count -= 1 return item
[docs] def reset(self) -> None: """Remove all non-UninterruptibleFrame items, keeping uninterruptible ones.""" kept: asyncio.Queue = asyncio.Queue() while not self.empty(): item = self.get_nowait() if isinstance(self._frame_getter(item), UninterruptibleFrame): kept.put_nowait(item) self.task_done() while not kept.empty(): item = kept.get_nowait() self.put_nowait(item) kept.task_done()