base_text_aggregator

Base text aggregator interface for Pipecat text processing.

This module defines the abstract base class for text aggregators that accumulate and process text tokens, typically used by TTS services to determine when aggregated text should be sent for speech synthesis.

class pipecat.utils.text.base_text_aggregator.AggregationType(*values)[source]

Bases: StrEnum

Built-in aggregation strings.

SENTENCE = 'sentence'
TOKEN = 'token'
WORD = 'word'
class pipecat.utils.text.base_text_aggregator.Aggregation(text: str, type: str)[source]

Bases: object

Data class representing aggregated text and its type.

An Aggregation object is created whenever a stream of text is aggregated by a text aggregator. It contains the aggregated text and a type indicating the nature of the aggregation.

Parameters:
  • text – The aggregated text content.

  • type – The type of aggregation the text represents (e.g., ‘sentence’, ‘word’, ‘token’, ‘my_custom_aggregation’).

text: str
type: str
class pipecat.utils.text.base_text_aggregator.BaseTextAggregator(*, aggregation_type: AggregationType = AggregationType.SENTENCE)[source]

Bases: ABC

Base class for text aggregators in the Pipecat framework.

Text aggregators are usually used by the TTS service to aggregate LLM tokens and decide when the aggregated text should be pushed to the TTS service.

Text aggregators can also be used to manipulate text while it’s being aggregated (e.g. reasoning blocks can be removed).

Subclasses must implement all abstract methods to define specific aggregation logic, text manipulation behavior, and state management for interruptions.

__init__(*, aggregation_type: AggregationType = AggregationType.SENTENCE)[source]

Initialize the base text aggregator.

Parameters:

aggregation_type – The aggregation strategy to use. SENTENCE buffers text until sentence boundaries are detected, TOKEN passes text through immediately, and WORD buffers until word boundaries.

property aggregation_type: AggregationType

Get the aggregation type for this aggregator.

Returns:

The aggregation type.

abstract property text: Aggregation

Get the currently aggregated text.

Subclasses must implement this property to return the text that has been accumulated so far in their internal buffer or storage.

Returns:

The text that has been accumulated so far.

abstractmethod async aggregate(text: str) AsyncIterator[Aggregation][source]

Aggregate the specified text and yield completed aggregations.

This method processes the input text character-by-character internally and yields Aggregation objects as they complete.

Subclasses should implement their specific logic for:

  • How to process text character-by-character

  • When to consider the aggregated text ready for processing

  • What criteria determine text completion (e.g., sentence boundaries)

  • When a completion occurs, yield an Aggregation object containing the aggregated text (stripped of leading/trailing whitespace) and its type

Parameters:

text – The text to be aggregated.

Yields:

Aggregation objects as they complete. Each Aggregation consists of the aggregated text (stripped of leading/trailing whitespace) and a string indicating the type of aggregation (e.g., ‘sentence’, ‘word’, ‘token’, ‘my_custom_aggregation’).

abstractmethod async flush() Aggregation | None[source]

Flush any pending aggregation.

This method is called at the end of a stream (e.g., when receiving LLMFullResponseEndFrame) to return any text that was buffered.

Returns:

An Aggregation object if there is pending text, or None if there is no pending text.

abstractmethod async handle_interruption()[source]

Handle interruptions in the text aggregation process.

When an interruption occurs it is possible that we might want to discard the aggregated text or do some internal modifications to the aggregated text.

Subclasses should implement this method to define how they respond to interruptions, such as clearing buffers, resetting state, or preserving partial content.

abstractmethod async reset()[source]

Clear the internally aggregated text and reset to initial state.

Subclasses should implement this method to return the aggregator to its initial state, discarding any previously accumulated text content and resetting any internal tracking variables.