Source code for pipecat.processors.aggregators.sentence

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Text sentence aggregation processor for Pipecat.

This module provides a frame processor that accumulates text frames into
complete sentences, only outputting when a sentence-ending pattern is detected.
"""

from pipecat.frames.frames import EndFrame, Frame, InterimTranscriptionFrame, TextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.string import match_endofsentence


[docs] class SentenceAggregator(FrameProcessor): """Aggregates text frames into complete sentences. This processor accumulates incoming text frames until a sentence-ending pattern is detected, then outputs the complete sentence as a single frame. Useful for ensuring downstream processors receive coherent, complete sentences rather than fragmented text. Frame input/output:: TextFrame("Hello,") -> None TextFrame(" world.") -> TextFrame("Hello, world.") """
[docs] def __init__(self): """Initialize the sentence aggregator. Sets up internal state for accumulating text frames into complete sentences. """ super().__init__() self._aggregation = ""
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): """Process incoming frames and aggregate text into complete sentences. Args: frame: The incoming frame to process. direction: The direction of frame flow in the pipeline. """ await super().process_frame(frame, direction) # We ignore interim description at this point. if isinstance(frame, InterimTranscriptionFrame): return if isinstance(frame, TextFrame): self._aggregation += frame.text if match_endofsentence(self._aggregation): await self.push_frame(TextFrame(self._aggregation)) self._aggregation = "" elif isinstance(frame, EndFrame): if self._aggregation: await self.push_frame(TextFrame(self._aggregation)) await self.push_frame(frame) else: await self.push_frame(frame, direction)