Source code for pipecat.processors.filters.function_filter

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Function-based frame filtering for Pipecat pipelines.

This module provides a processor that filters frames based on a custom function,
allowing for flexible frame filtering logic in processing pipelines.
"""

from collections.abc import Awaitable, Callable

from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame, SystemFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor

FilterType = Callable[[Frame], Awaitable[bool]]


[docs] class FunctionFilter(FrameProcessor): """A frame processor that filters frames using a custom function. This processor allows frames to pass through based on the result of a user-provided filter function. System and end frames always pass through regardless of the filter result. """
[docs] def __init__( self, filter: FilterType, direction: FrameDirection | None = FrameDirection.DOWNSTREAM, filter_system_frames: bool = False, **kwargs, ): """Initialize the function filter. Args: filter: An async function that takes a Frame and returns True if the frame should pass through, False otherwise. direction: The direction to apply filtering. Only frames moving in this direction will be filtered; frames in the other direction pass through unfiltered. If None, frames in both directions are filtered. Defaults to DOWNSTREAM. filter_system_frames: Whether to filter system frames. Defaults to False. **kwargs: Additional arguments passed to parent class. """ super().__init__(**kwargs) self._filter = filter self._direction = direction self._filter_system_frames = filter_system_frames
# # Frame processor # def _should_passthrough_frame(self, frame, direction): """Check if a frame should pass through without filtering.""" # Always passthrough frames in the wrong direction if self._direction and direction != self._direction: return True # Always passthrough lifecycle frames if isinstance(frame, (StartFrame, EndFrame, CancelFrame)): return True # If not filtering system frames, passthrough all other system frames if not self._filter_system_frames and isinstance(frame, SystemFrame): return True return False
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): """Process a frame through the filter. Args: frame: The frame to process. direction: The direction the frame is moving in the pipeline. """ await super().process_frame(frame, direction) passthrough = self._should_passthrough_frame(frame, direction) allowed = await self._filter(frame) if passthrough or allowed: await self.push_frame(frame, direction)