Source code for pipecat.services.azure.image

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Azure OpenAI image generation service implementation.

This module provides integration with Azure's OpenAI image generation API
using REST endpoints for creating images from text prompts.
"""

import asyncio
import io
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field

import aiohttp
from PIL import Image

from pipecat.frames.frames import ErrorFrame, Frame, URLImageRawFrame
from pipecat.services.image_service import ImageGenService
from pipecat.services.settings import NOT_GIVEN, ImageGenSettings, _NotGiven


[docs] @dataclass class AzureImageGenSettings(ImageGenSettings): """Settings for the Azure image generation service. Parameters: model: Azure image generation model identifier. image_size: Target size for generated images. """ image_size: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
[docs] class AzureImageGenServiceREST(ImageGenService): """Azure OpenAI REST-based image generation service. Provides image generation using Azure's OpenAI service via REST API. Supports asynchronous image generation with polling for completion and automatic image download and processing. """ Settings = AzureImageGenSettings _settings: Settings
[docs] def __init__( self, *, image_size: str | None = None, api_key: str, endpoint: str, model: str | None = None, aiohttp_session: aiohttp.ClientSession, api_version="2023-06-01-preview", settings: Settings | None = None, ): """Initialize the AzureImageGenServiceREST. Args: image_size: Size specification for generated images (e.g., "1024x1024"). .. deprecated:: 0.0.105 Use ``settings=AzureImageGenServiceREST.Settings(image_size=...)`` instead. api_key: Azure OpenAI API key for authentication. endpoint: Azure OpenAI endpoint URL. model: The image generation model to use. .. deprecated:: 0.0.105 Use ``settings=AzureImageGenServiceREST.Settings(model=...)`` instead. aiohttp_session: Shared aiohttp session for HTTP requests. api_version: Azure API version string. Defaults to "2023-06-01-preview". settings: Runtime-updatable settings. When provided alongside deprecated parameters, ``settings`` values take precedence. """ # 1. Initialize default_settings with hardcoded defaults default_settings = self.Settings( model=None, image_size=None, ) # 2. Apply direct init arg overrides (deprecated) if model is not None: self._warn_init_param_moved_to_settings("model", "model") default_settings.model = model if image_size is not None: self._warn_init_param_moved_to_settings("image_size", "image_size") default_settings.image_size = image_size # 4. Apply settings delta (canonical API, always wins) if settings is not None: default_settings.apply_update(settings) super().__init__(settings=default_settings) self._api_key = api_key self._azure_endpoint = endpoint self._api_version = api_version self._aiohttp_session = aiohttp_session
[docs] async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]: """Generate an image from a text prompt using Azure OpenAI. Args: prompt: The text prompt describing the desired image. Yields: URLImageRawFrame containing the generated image data, or ErrorFrame if generation fails. """ url = f"{self._azure_endpoint}openai/images/generations:submit?api-version={self._api_version}" headers = {"api-key": self._api_key, "Content-Type": "application/json"} body = { "prompt": prompt, "n": 1, } if self._settings.image_size is not None: body["size"] = self._settings.image_size async with self._aiohttp_session.post(url, headers=headers, json=body) as submission: # We never get past this line, because this header isn't # defined on a 429 response, but something is eating our # exceptions! operation_location = submission.headers["operation-location"] status = "" attempts_left = 120 json_response = None while status != "succeeded": attempts_left -= 1 if attempts_left == 0: yield ErrorFrame("Image generation timed out") return await asyncio.sleep(1) response = await self._aiohttp_session.get(operation_location, headers=headers) json_response = await response.json() status = json_response["status"] image_url = json_response["result"]["data"][0]["url"] if json_response else None if not image_url: yield ErrorFrame("Image generation failed") return # Load the image from the url async with self._aiohttp_session.get(image_url) as response: image_stream = io.BytesIO(await response.content.read()) image = Image.open(image_stream) frame = URLImageRawFrame( url=image_url, image=image.tobytes(), size=image.size, format=image.format ) yield frame