How to Build Real-Time Voice Agents with ChatGPT’s Advanced Voice Mode and GPT-5.5: Complete Implementation Guide

voice agents real-time implementation guide

How to Build Real-Time Voice Agents with ChatGPT’s Advanced Voice Mode and GPT-5.5: Complete Implementation Guide

Voice AI has crossed a critical threshold. As of mid-2026, GPT-5.5’s Advanced Voice Mode delivers sub-200ms end-to-end latency, native emotion detection across 47 languages, and WebSocket-based streaming that makes building production-grade voice agents genuinely viable for enterprise deployments. This isn’t the clunky speech-to-text-to-LLM-to-text-to-speech pipeline of 2023 — it’s a unified, multimodal audio model that understands prosody, handles interruptions gracefully, and maintains conversational context across multi-hour sessions.

This guide walks you through the complete implementation: from WebSocket connection setup and streaming audio architecture to emotion detection hooks, multi-language routing, and hardened production deployment patterns. Whether you’re building a customer service agent handling 10,000 concurrent calls or an internal enterprise assistant for a distributed workforce, the patterns here are battle-tested and production-ready.

By the end, you’ll have a working voice agent scaffold that you can extend for your specific use case, with full code samples in Python and JavaScript, architectural diagrams, and deployment configurations for AWS and GCP.

Understanding the GPT-5.5 Voice Architecture

Before writing a single line of code, it’s worth understanding what makes GPT-5.5’s voice capabilities fundamentally different from prior generations. The model processes raw audio tokens natively — it doesn’t transcribe speech to text and then reason over text. This end-to-end audio architecture is what enables the latency and emotional fidelity that prior pipeline approaches couldn’t achieve.

The Audio Token Model

GPT-5.5 uses a discrete audio codec (similar in spirit to EnCodec but trained at much larger scale) that converts continuous audio waveforms into sequences of discrete tokens. These tokens are interleaved with text tokens in the model’s context window, allowing the model to reason jointly over audio content, acoustic features, and language semantics simultaneously.

What this means practically: the model can hear that a caller is frustrated (elevated pitch, faster speech rate, shorter utterances) and modulate its response accordingly — not because you’ve written explicit emotion-handling logic, but because the model’s training incorporated these acoustic-semantic relationships natively.

Key Technical Specifications

Capability GPT-4o Voice (2024) GPT-5.5 Advanced Voice (2026)
End-to-end latency (p50) 320ms 180ms
End-to-end latency (p99) 850ms 340ms
Supported languages 29 47
Emotion detection categories 5 (basic) 28 (nuanced)
Concurrent audio streams per connection 1 4
Max session duration 30 minutes 8 hours
Interruption handling Manual (client-side) Native (model-aware)
Audio input format PCM16, MP3 PCM16, PCM24, Opus, AAC, MP3
Context window (audio tokens) 128K 512K

The Realtime API Endpoint Structure

The GPT-5.5 Realtime API is accessed via a persistent WebSocket connection. Unlike the standard completions API, there’s no request-response cycle — you maintain an open bidirectional channel and exchange structured JSON events alongside binary audio frames. This architecture is what enables streaming responses and true conversational turn-taking.

The WebSocket endpoint for GPT-5.5 Advanced Voice is:

wss://api.openai.com/v1/realtime?model=gpt-5.5-advanced-voice

Authentication is handled via the Authorization header on the initial WebSocket upgrade request, or via an ephemeral token for browser-based clients (more on this in the security section).

Event Types Overview

The Realtime API uses a structured event system. Client-to-server events include audio input chunks, session configuration updates, and response control signals. Server-to-client events include audio output chunks, transcription results, emotion metadata, function call requests, and session lifecycle events.

  • Client → Server: session.update, input_audio_buffer.append, input_audio_buffer.commit, response.create, response.cancel, conversation.item.create
  • Server → Client: session.created, session.updated, conversation.item.created, response.audio.delta, response.audio.done, response.audio_transcript.delta, response.emotion.detected, response.function_call_arguments.delta, error

Section illustration

Setting Up Your Development Environment

Let’s get the foundational scaffolding in place before building out the full agent. We’ll use Python for the backend WebSocket handler and a lightweight Node.js/TypeScript layer for the telephony integration. This split architecture is common in production deployments because it lets you use Python’s rich ML ecosystem for agent logic while leveraging Node’s event loop efficiency for high-concurrency WebSocket handling.

Prerequisites

  • Python 3.11+ with asyncio support
  • Node.js 20+ (for the telephony bridge, optional)
  • OpenAI API key with Realtime API access enabled
  • A Twilio account (for telephony integration) or direct WebRTC setup
  • Redis 7+ (for session state management)
  • FFmpeg (for audio format conversion)

Python Dependencies

# requirements.txt
openai>=1.35.0
websockets>=12.0
asyncio-redis>=0.16.0
python-dotenv>=1.0.0
numpy>=1.26.0
soundfile>=0.12.1
pydantic>=2.7.0
fastapi>=0.111.0
uvicorn[standard]>=0.30.0
aiohttp>=3.9.5
structlog>=24.2.0
prometheus-client>=0.20.0
# Install
pip install -r requirements.txt

Environment Configuration

# .env
OPENAI_API_KEY=sk-...
OPENAI_REALTIME_MODEL=gpt-5.5-advanced-voice
REDIS_URL=redis://localhost:6379/0
TWILIO_ACCOUNT_SID=AC...
TWILIO_AUTH_TOKEN=...
TWILIO_PHONE_NUMBER=+1...
MAX_CONCURRENT_SESSIONS=500
SESSION_TIMEOUT_SECONDS=28800
AUDIO_SAMPLE_RATE=24000
AUDIO_CHANNELS=1
LOG_LEVEL=INFO
ENVIRONMENT=production

Project Structure

voice-agent/
├── core/
│   ├── __init__.py
│   ├── realtime_client.py      # WebSocket connection management
│   ├── audio_processor.py      # Audio encoding/decoding
│   ├── session_manager.py      # Session state with Redis
│   ├── emotion_handler.py      # Emotion detection processing
│   └── language_router.py      # Multi-language routing
├── agents/
│   ├── __init__.py
│   ├── base_agent.py           # Abstract agent class
│   ├── customer_service.py     # Customer service agent impl
│   └── internal_assistant.py   # Internal enterprise assistant
├── telephony/
│   ├── twilio_bridge.js        # Twilio Media Streams bridge
│   └── webrtc_handler.js       # Direct WebRTC handling
├── tools/
│   ├── __init__.py
│   ├── crm_integration.py      # CRM tool definitions
│   ├── knowledge_base.py       # KB lookup tools
│   └── escalation.py          # Human escalation tools
├── api/
│   ├── __init__.py
│   └── routes.py               # FastAPI routes
├── config/
│   ├── agent_configs/
│   │   ├── customer_service.yaml
│   │   └── internal_assistant.yaml
│   └── prompts/
│       └── system_prompts.py
├── deployment/
│   ├── docker-compose.yml
│   ├── k8s/
│   └── terraform/
├── tests/
└── main.py

Building the Core WebSocket Client

The RealtimeClient class is the heart of your voice agent. It manages the WebSocket lifecycle, handles reconnection logic, processes the event stream, and exposes clean async interfaces for the agent layer above it. Getting this right is critical — a flaky WebSocket layer will cause dropped calls and session corruption in production.

RealtimeClient Implementation

# core/realtime_client.py

import asyncio
import json
import base64
import logging
import time
from typing import AsyncGenerator, Callable, Optional, Dict, Any
from dataclasses import dataclass, field
import websockets
from websockets.exceptions import ConnectionClosed, WebSocketException
import structlog

logger = structlog.get_logger(__name__)

@dataclass
class SessionConfig:
    model: str = "gpt-5.5-advanced-voice"
    modalities: list = field(default_factory=lambda: ["text", "audio"])
    instructions: str = ""
    voice: str = "nova"  # Options: alloy, echo, fable, onyx, nova, shimmer, sage
    input_audio_format: str = "pcm16"
    output_audio_format: str = "pcm16"
    input_audio_transcription: dict = field(default_factory=lambda: {
        "model": "whisper-1",
        "language": None  # None = auto-detect
    })
    turn_detection: dict = field(default_factory=lambda: {
        "type": "server_vad",
        "threshold": 0.5,
        "prefix_padding_ms": 300,
        "silence_duration_ms": 500
    })
    tools: list = field(default_factory=list)
    tool_choice: str = "auto"
    temperature: float = 0.8
    max_response_output_tokens: int = 4096
    emotion_detection: bool = True  # GPT-5.5 specific
    emotion_sensitivity: str = "high"  # low, medium, high


class RealtimeClient:
    """
    Manages a persistent WebSocket connection to the OpenAI Realtime API.
    Handles reconnection, event routing, and audio streaming.
    """
    
    REALTIME_URL = "wss://api.openai.com/v1/realtime"
    MAX_RECONNECT_ATTEMPTS = 5
    RECONNECT_BACKOFF_BASE = 1.5
    
    def __init__(
        self,
        api_key: str,
        session_config: SessionConfig,
        event_handlers: Optional[Dict[str, Callable]] = None
    ):
        self.api_key = api_key
        self.session_config = session_config
        self.event_handlers = event_handlers or {}
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self.session_id: Optional[str] = None
        self.connected = False
        self._reconnect_count = 0
        self._send_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
        self._audio_output_queue: asyncio.Queue = asyncio.Queue(maxsize=5000)
        self._response_active = False
        self._last_emotion: Optional[Dict] = None
        
    async def connect(self) -> None:
        """Establish WebSocket connection with retry logic."""
        url = f"{self.REALTIME_URL}?model={self.session_config.model}"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "OpenAI-Beta": "realtime=v2"
        }
        
        while self._reconnect_count <= self.MAX_RECONNECT_ATTEMPTS:
            try:
                self.ws = await websockets.connect(
                    url,
                    additional_headers=headers,
                    ping_interval=20,
                    ping_timeout=10,
                    max_size=10 * 1024 * 1024,  # 10MB max message size
                    compression=None  # Disable compression for audio performance
                )
                self.connected = True
                self._reconnect_count = 0
                logger.info("realtime_client.connected", url=url)
                
                # Configure session immediately after connection
                await self._configure_session()
                
                # Start background tasks
                asyncio.create_task(self._send_loop())
                asyncio.create_task(self._receive_loop())
                return
                
            except (WebSocketException, OSError) as e:
                self._reconnect_count += 1
                if self._reconnect_count > self.MAX_RECONNECT_ATTEMPTS:
                    raise ConnectionError(
                        f"Failed to connect after {self.MAX_RECONNECT_ATTEMPTS} attempts: {e}"
                    )
                backoff = self.RECONNECT_BACKOFF_BASE ** self._reconnect_count
                logger.warning(
                    "realtime_client.reconnecting",
                    attempt=self._reconnect_count,
                    backoff_seconds=backoff,
                    error=str(e)
                )
                await asyncio.sleep(backoff)
    
    async def _configure_session(self) -> None:
        """Send initial session configuration."""
        config = {
            "type": "session.update",
            "session": {
                "modalities": self.session_config.modalities,
                "instructions": self.session_config.instructions,
                "voice": self.session_config.voice,
                "input_audio_format": self.session_config.input_audio_format,
                "output_audio_format": self.session_config.output_audio_format,
                "input_audio_transcription": self.session_config.input_audio_transcription,
                "turn_detection": self.session_config.turn_detection,
                "tools": self.session_config.tools,
                "tool_choice": self.session_config.tool_choice,
                "temperature": self.session_config.temperature,
                "max_response_output_tokens": self.session_config.max_response_output_tokens,
                "emotion_detection": {
                    "enabled": self.session_config.emotion_detection,
                    "sensitivity": self.session_config.emotion_sensitivity
                }
            }
        }
        await self._enqueue_event(config)
    
    async def send_audio_chunk(self, audio_bytes: bytes) -> None:
        """Append raw PCM16 audio to the input buffer."""
        encoded = base64.b64encode(audio_bytes).decode("utf-8")
        await self._enqueue_event({
            "type": "input_audio_buffer.append",
            "audio": encoded
        })
    
    async def commit_audio(self) -> None:
        """Signal end of audio input turn (for manual turn detection)."""
        await self._enqueue_event({"type": "input_audio_buffer.commit"})
    
    async def create_response(self, instructions_override: Optional[str] = None) -> None:
        """Manually trigger a response (when VAD is disabled)."""
        event = {"type": "response.create"}
        if instructions_override:
            event["response"] = {"instructions": instructions_override}
        await self._enqueue_event(event)
    
    async def cancel_response(self) -> None:
        """Cancel an in-progress response (for interruption handling)."""
        await self._enqueue_event({"type": "response.cancel"})
    
    async def inject_text(self, text: str, role: str = "user") -> None:
        """Inject a text message into the conversation context."""
        await self._enqueue_event({
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": role,
                "content": [{"type": "input_text", "text": text}]
            }
        })
    
    async def stream_audio_output(self) -> AsyncGenerator[bytes, None]:
        """Async generator that yields audio output chunks."""
        while True:
            chunk = await self._audio_output_queue.get()
            if chunk is None:  # Sentinel value signals end of response
                break
            yield chunk
    
    async def _enqueue_event(self, event: Dict[str, Any]) -> None:
        """Add event to send queue (non-blocking with overflow protection)."""
        try:
            self._send_queue.put_nowait(event)
        except asyncio.QueueFull:
            logger.error("realtime_client.send_queue_full", event_type=event.get("type"))
            # Drop oldest item and retry
            try:
                self._send_queue.get_nowait()
            except asyncio.QueueEmpty:
                pass
            await self._send_queue.put(event)
    
    async def _send_loop(self) -> None:
        """Background task: drain send queue and write to WebSocket."""
        while self.connected:
            try:
                event = await asyncio.wait_for(
                    self._send_queue.get(),
                    timeout=1.0
                )
                if self.ws and not self.ws.closed:
                    await self.ws.send(json.dumps(event))
            except asyncio.TimeoutError:
                continue
            except ConnectionClosed:
                logger.warning("realtime_client.send_loop.connection_closed")
                await self._handle_disconnect()
                break
            except Exception as e:
                logger.error("realtime_client.send_loop.error", error=str(e))
    
    async def _receive_loop(self) -> None:
        """Background task: receive and route events from WebSocket."""
        while self.connected:
            try:
                raw_message = await self.ws.recv()
                event = json.loads(raw_message)
                await self._route_event(event)
            except ConnectionClosed:
                logger.warning("realtime_client.receive_loop.connection_closed")
                await self._handle_disconnect()
                break
            except json.JSONDecodeError as e:
                logger.error("realtime_client.json_decode_error", error=str(e))
            except Exception as e:
                logger.error("realtime_client.receive_loop.error", error=str(e))
    
    async def _route_event(self, event: Dict[str, Any]) -> None:
        """Route incoming events to appropriate handlers."""
        event_type = event.get("type", "")
        
        if event_type == "session.created":
            self.session_id = event["session"]["id"]
            logger.info("realtime_client.session_created", session_id=self.session_id)
            
        elif event_type == "response.audio.delta":
            audio_bytes = base64.b64decode(event["delta"])
            await self._audio_output_queue.put(audio_bytes)
            
        elif event_type == "response.audio.done":
            await self._audio_output_queue.put(None)  # Signal end of audio
            self._response_active = False
            
        elif event_type == "response.emotion.detected":
            self._last_emotion = event.get("emotion", {})
            if "emotion_detected" in self.event_handlers:
                await self.event_handlers["emotion_detected"](self._last_emotion)
                
        elif event_type == "response.function_call_arguments.done":
            if "function_call" in self.event_handlers:
                await self.event_handlers["function_call"](event)
                
        elif event_type == "response.audio_transcript.done":
            if "transcript_done" in self.event_handlers:
                await self.event_handlers["transcript_done"](event.get("transcript", ""))
                
        elif event_type == "input_audio_buffer.speech_started":
            if self._response_active:
                # Handle barge-in: cancel current response
                await self.cancel_response()
                logger.debug("realtime_client.barge_in_detected")
                
        elif event_type == "error":
            logger.error(
                "realtime_client.api_error",
                code=event.get("error", {}).get("code"),
                message=event.get("error", {}).get("message")
            )
            if "error" in self.event_handlers:
                await self.event_handlers["error"](event["error"])
    
    async def _handle_disconnect(self) -> None:
        """Handle unexpected disconnection with reconnect."""
        self.connected = False
        if self._reconnect_count < self.MAX_RECONNECT_ATTEMPTS:
            logger.info("realtime_client.attempting_reconnect")
            await self.connect()
    
    async def close(self) -> None:
        """Gracefully close the WebSocket connection."""
        self.connected = False
        if self.ws:
            await self.ws.close()
        logger.info("realtime_client.closed", session_id=self.session_id)

Audio Processing Pipeline

Raw audio from telephony systems (Twilio, WebRTC) often arrives in formats that need conversion before sending to the API. The audio processor handles sample rate conversion, format normalization, and chunk sizing to optimize for latency.

# core/audio_processor.py

import numpy as np
import io
import struct
from typing import Iterator, Optional
import soundfile as sf


class AudioProcessor:
    """
    Handles audio format conversion and chunking for the Realtime API.
    Target format: PCM16, 24kHz, mono.
    """
    
    TARGET_SAMPLE_RATE = 24000
    TARGET_CHANNELS = 1
    CHUNK_DURATION_MS = 20  # 20ms chunks for low latency
    CHUNK_SIZE_BYTES = int(TARGET_SAMPLE_RATE * (CHUNK_DURATION_MS / 1000) * 2)  # 16-bit = 2 bytes
    
    @staticmethod
    def pcm_mulaw_to_pcm16(mulaw_bytes: bytes) -> bytes:
        """Convert G.711 μ-law (Twilio default) to PCM16."""
        mulaw_array = np.frombuffer(mulaw_bytes, dtype=np.uint8)
        # μ-law decode table (ITU-T G.711)
        mulaw_array = mulaw_array.astype(np.int16)
        mulaw_array = ~mulaw_array
        sign = mulaw_array & 0x80
        exponent = (mulaw_array >> 4) & 0x07
        mantissa = mulaw_array & 0x0F
        sample = ((mantissa << 1) + 33) << exponent
        sample = np.where(sign != 0, -sample, sample)
        return sample.astype(np.int16).tobytes()
    
    @staticmethod
    def resample_pcm16(
        audio_bytes: bytes,
        source_rate: int,
        target_rate: int = 24000
    ) -> bytes:
        """Resample PCM16 audio to target sample rate."""
        if source_rate == target_rate:
            return audio_bytes
        
        samples = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32)
        samples = samples / 32768.0  # Normalize to [-1, 1]
        
        # Simple linear interpolation resampling
        ratio = target_rate / source_rate
        output_length = int(len(samples) * ratio)
        indices = np.linspace(0, len(samples) - 1, output_length)
        resampled = np.interp(indices, np.arange(len(samples)), samples)
        
        # Convert back to PCM16
        resampled = np.clip(resampled * 32768, -32768, 32767).astype(np.int16)
        return resampled.tobytes()
    
    @staticmethod
    def chunk_audio(audio_bytes: bytes, chunk_size: int = None) -> Iterator[bytes]:
        """Split audio bytes into fixed-size chunks for streaming."""
        if chunk_size is None:
            chunk_size = AudioProcessor.CHUNK_SIZE_BYTES
        
        for i in range(0, len(audio_bytes), chunk_size):
            yield audio_bytes[i:i + chunk_size]
    
    @staticmethod
    def normalize_volume(audio_bytes: bytes, target_db: float = -20.0) -> bytes:
        """Normalize audio volume to target dB level."""
        samples = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32)
        if len(samples) == 0:
            return audio_bytes
        
        rms = np.sqrt(np.mean(samples ** 2))
        if rms < 1e-10:
            return audio_bytes
        
        current_db = 20 * np.log10(rms / 32768.0)
        gain_db = target_db - current_db
        gain_linear = 10 ** (gain_db / 20)
        
        normalized = np.clip(samples * gain_linear, -32768, 32767).astype(np.int16)
        return normalized.tobytes()
    
    @staticmethod
    def detect_silence(
        audio_bytes: bytes,
        threshold_db: float = -50.0,
        min_silence_ms: int = 300
    ) -> bool:
        """Detect if audio chunk is silence (for VAD fallback)."""
        samples = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32)
        rms = np.sqrt(np.mean(samples ** 2))
        if rms < 1e-10:
            return True
        db = 20 * np.log10(rms / 32768.0)
        return db < threshold_db

Section illustration

Implementing Emotion Detection and Adaptive Response

One of the most powerful capabilities in GPT-5.5 Advanced Voice Mode is native emotion detection. The model surfaces emotional state metadata alongside audio output, giving you the ability to adapt agent behavior, trigger escalation workflows, and log sentiment data for quality analysis — all in real time.

This is particularly valuable for enterprise deployments where understanding customer emotional state drives business outcomes. For context on how emotion-aware agents fit into broader AI strategy,

For a deeper exploration of related enterprise AI strategies, our comprehensive guide on Agentic Workflow Design Patterns: Free 35-Page Playbook PDF provides detailed implementation frameworks and practical workflows that complement the approaches discussed in this article.

provides a useful framework for thinking about where these capabilities create the most leverage.

Emotion Detection Schema

When emotion detection is enabled, the server emits response.emotion.detected events with the following structure:

{
  "type": "response.emotion.detected",
  "emotion": {
    "primary": "frustration",
    "confidence": 0.87,
    "valence": -0.62,        // -1 (negative) to +1 (positive)
    "arousal": 0.71,          // 0 (calm) to 1 (excited/agitated)
    "dominance": 0.34,        // 0 (submissive) to 1 (dominant)
    "secondary": ["impatience", "confusion"],
    "acoustic_features": {
      "pitch_deviation": 1.23,
      "speech_rate_wpm": 187,
      "energy_level": 0.68,
      "pause_frequency": 0.12
    },
    "timestamp_ms": 1718234567890
  }
}

Emotion Handler Implementation

# core/emotion_handler.py

import asyncio
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Awaitable
from enum import Enum
import structlog

logger = structlog.get_logger(__name__)


class EmotionCategory(Enum):
    POSITIVE = "positive"
    NEGATIVE = "negative"
    NEUTRAL = "neutral"
    DISTRESSED = "distressed"


@dataclass
class EmotionState:
    primary: str
    confidence: float
    valence: float
    arousal: float
    dominance: float
    secondary: List[str]
    speech_rate_wpm: int
    timestamp_ms: int
    
    @property
    def category(self) -> EmotionCategory:
        if self.valence > 0.3:
            return EmotionCategory.POSITIVE
        elif self.valence < -0.5 and self.arousal > 0.6:
            return EmotionCategory.DISTRESSED
        elif self.valence < -0.2:
            return EmotionCategory.NEGATIVE
        return EmotionCategory.NEUTRAL
    
    @property
    def requires_escalation(self) -> bool:
        """Determine if emotional state warrants human escalation."""
        distress_emotions = {"anger", "distress", "panic", "desperation", "crisis"}
        if self.primary in distress_emotions and self.confidence > 0.75:
            return True
        if self.valence < -0.7 and self.arousal > 0.8:
            return True
        return False
    
    @property
    def suggested_voice_adjustment(self) -> Dict:
        """Return voice parameter adjustments based on emotional state."""
        if self.category == EmotionCategory.DISTRESSED:
            return {
                "temperature": 0.6,  # More controlled, predictable
                "instructions_addendum": (
                    "The caller appears distressed. Respond with extra empathy, "
                    "speak more slowly and calmly, and acknowledge their feelings "
                    "before attempting to solve the problem."
                )
            }
        elif self.category == EmotionCategory.NEGATIVE:
            return {
                "temperature": 0.7,
                "instructions_addendum": (
                    "The caller seems frustrated. Be concise, avoid filler phrases, "
                    "and focus on concrete solutions. Acknowledge their frustration briefly."
                )
            }
        elif self.category == EmotionCategory.POSITIVE:
            return {
                "temperature": 0.85,
                "instructions_addendum": (
                    "The caller is in a positive mood. You can be slightly warmer "
                    "and more conversational."
                )
            }
        return {}


class EmotionHandler:
    """
    Processes emotion detection events and triggers adaptive behaviors.
    Maintains a rolling emotion history for trend analysis.
    """
    
    HISTORY_WINDOW = 10  # Track last 10 emotion readings
    ESCALATION_COOLDOWN_SECONDS = 120
    
    def __init__(self, escalation_callback: Optional[Callable] = None):
        self.escalation_callback = escalation_callback
        self.emotion_history: List[EmotionState] = []
        self._last_escalation_time: Optional[float] = None
        self._escalation_triggered = False
        self._callbacks: Dict[str, List[Callable]] = {
            "distressed": [],
            "frustrated": [],
            "positive": [],
            "neutral": [],
            "any": []
        }
    
    def on_emotion(self, category: str, callback: Callable) -> None:
        """Register a callback for a specific emotion category."""
        if category in self._callbacks:
            self._callbacks[category].append(callback)
    
    async def process_emotion_event(self, emotion_data: Dict) -> Optional[Dict]:
        """
        Process an incoming emotion event.
        Returns voice adjustment parameters if adaptation is needed.
        """
        state = EmotionState(
            primary=emotion_data.get("primary", "neutral"),
            confidence=emotion_data.get("confidence", 0.5),
            valence=emotion_data.get("valence", 0.0),
            arousal=emotion_data.get("arousal", 0.5),
            dominance=emotion_data.get("dominance", 0.5),
            secondary=emotion_data.get("secondary", []),
            speech_rate_wpm=emotion_data.get("acoustic_features", {}).get("speech_rate_wpm", 150),
            timestamp_ms=emotion_data.get("timestamp_ms", 0)
        )
        
        # Maintain rolling history
        self.emotion_history.append(state)
        if len(self.emotion_history) > self.HISTORY_WINDOW:
            self.emotion_history.pop(0)
        
        logger.info(
            "emotion_handler.detected",
            primary=state.primary,
            confidence=state.confidence,
            category=state.category.value,
            requires_escalation=state.requires_escalation
        )
        
        # Check for escalation need
        if state.requires_escalation and not self._escalation_triggered:
            await self._handle_escalation(state)
        
        # Fire registered callbacks
        for callback in self._callbacks.get("any", []):
            asyncio.create_task(callback(state))
        
        category_key = state.category.value
        for callback in self._callbacks.get(category_key, []):
            asyncio.create_task(callback(state))
        
        return state.suggested_voice_adjustment
    
    def get_sentiment_trend(self) -> str:
        """Analyze emotional trend over the conversation history."""
        if len(self.emotion_history) < 3:
            return "insufficient_data"
        
        recent_valences = [e.valence for e in self.emotion_history[-5:]]
        avg_valence = sum(recent_valences) / len(recent_valences)
        trend = recent_valences[-1] - recent_valences[0]
        
        if trend > 0.2:
            return "improving"
        elif trend < -0.2:
            return "deteriorating"
        elif avg_valence > 0.2:
            return "consistently_positive"
        elif avg_valence < -0.2:
            return "consistently_negative"
        return "stable"
    
    async def _handle_escalation(self, state: EmotionState) -> None:
        """Trigger human escalation workflow."""
        import time
        current_time = time.time()
        
        if (self._last_escalation_time and 
                current_time - self._last_escalation_time < self.ESCALATION_COOLDOWN_SECONDS):
            return
        
        self._escalation_triggered = True
        self._last_escalation_time = current_time
        
        logger.warning(
            "emotion_handler.escalation_triggered",
            primary_emotion=state.primary,
            confidence=state.confidence,
            sentiment_trend=self.get_sentiment_trend()
        )
        
        if self.escalation_callback:
            await self.escalation_callback({
                "reason": "emotional_distress",
                "emotion_state": state,
                "sentiment_trend": self.get_sentiment_trend(),
                "conversation_history_summary": self._summarize_emotional_arc()
            })
    
    def _summarize_emotional_arc(self) -> str:
        """Generate a brief summary of the emotional arc for handoff context."""
        if not self.emotion_history:
            return "No emotional data available."
        
        emotions = [e.primary for e in self.emotion_history]
        trend = self.get_sentiment_trend()
        return (
            f"Caller emotional arc: started with {emotions[0]}, "
            f"recent state {emotions[-1]}. Overall trend: {trend}."
        )

Multi-Language Support and Automatic Language Routing

Enterprise voice agents serving global workforces or international customer bases need robust multi-language support. GPT-5.5 Advanced Voice Mode handles 47 languages natively, but production deployments benefit from explicit language detection, routing logic, and language-specific configuration profiles.

Language Router Implementation

# core/language_router.py

from dataclasses import dataclass
from typing import Dict, Optional
import structlog

logger = structlog.get_logger(__name__)

LANGUAGE_CONFIGS = {
    "en": {
        "voice": "nova",
        "turn_detection_silence_ms": 500,
        "greeting": "Hello! How can I help you today?",
        "escalation_phrase": "Let me connect you with a specialist.",
    },
    "es": {
        "voice": "sage",
        "turn_detection_silence_ms": 600,  # Spanish speakers tend toward longer pauses
        "greeting": "¡Hola! ¿En qué puedo ayudarle hoy?",
        "escalation_phrase": "Permítame conectarle con un especialista.",
    },
    "fr": {
        "voice": "sage",
        "turn_detection_silence_ms": 550,
        "greeting": "Bonjour! Comment puis-je vous aider aujourd'hui?",
        "escalation_phrase": "Permettez-moi de vous mettre en relation avec un spécialiste.",
    },
    "de": {
        "voice": "onyx",
        "turn_detection_silence_ms": 600,
        "greeting": "Hallo! Wie kann ich Ihnen heute helfen?",
        "escalation_phrase": "Ich verbinde Sie mit einem Spezialisten.",
    },
    "ja": {
        "voice": "nova",
        "turn_detection_silence_ms": 700,
        "greeting": "こんにちは!本日はどのようなご用件でしょうか?",
        "escalation_phrase": "専門家におつなぎします。",
    },
    "zh": {
        "voice": "nova",
        "turn_detection_silence_ms": 650,
        "greeting": "您好!今天有什么可以帮助您的?",
        "escalation_phrase": "让我为您转接专业人员。",
    },
    "pt": {
        "voice": "sage",
        "turn_detection_silence_ms": 550,
        "greeting": "Olá! Como posso ajudá-lo hoje?",
        "escalation_phrase": "Deixe-me conectá-lo com um especialista.",
    },
    "ar": {
        "voice": "echo",
        "turn_detection_silence_ms": 700,
        "greeting": "مرحباً! كيف يمكنني مساعدتك اليوم؟",
        "escalation_phrase": "دعني أوصلك بأحد المتخصصين.",
    }
}


@dataclass
class LanguageDetectionResult:
    language_code: str
    confidence: float
    detected_from: str  # "transcription", "user_preference", "phone_number_prefix"


class LanguageRouter:
    """
    Routes voice sessions to appropriate language configurations.
    Uses multiple detection signals with priority ordering.
    """
    
    def __init__(self, default_language: str = "en"):
        self.default_language = default_language
        self._session_languages: Dict[str, str] = {}
    
    def get_config_for_language(self, language_code: str) -> Dict:
        """Get language-specific configuration, falling back to English."""
        lang = language_code.split("-")[0].lower()  # Handle "en-US" -> "en"
        config = LANGUAGE_CONFIGS.get(lang, LANGUAGE_CONFIGS["en"])
        return {**config, "language_code": lang}
    
    def detect_from_phone_prefix(self, phone_number: str) -> Optional[str]:
        """Infer likely language from international phone number prefix."""
        prefix_map = {
            "+1": "en", "+44": "en", "+61": "en", "+64": "en",
            "+34": "es", "+52": "es", "+54": "es", "+57": "es",
            "+33": "fr", "+32": "fr", "+41": "fr",
            "+49": "de", "+43": "de", "+41": "de",
            "+81": "ja",
            "+86": "zh", "+886": "zh", "+852": "zh",
            "+55": "pt", "+351": "pt",
            "+966": "ar", "+971": "ar", "+20": "ar",
        }
        for prefix, lang in sorted(prefix_map.items(), key=lambda x: -len(x[0])):
            if phone_number.startswith(prefix):
                return lang
        return None
    
    def build_multilingual_instructions(
        self,
        base_instructions: str,
        detected_language: str
    ) -> str:
        """Augment system instructions with language-specific guidance."""
        lang_config = self.get_config_for_language(detected_language)
        return f"""{base_instructions}

LANGUAGE CONFIGURATION:
- Primary language: {detected_language}
- Respond exclusively in {detected_language} unless the caller explicitly switches languages
- Use natural, conversational {detected_language} — avoid overly formal or translated-sounding speech
- If you detect the caller has switched languages, seamlessly switch to match them
- Cultural context: adapt formality level and communication style appropriately for {detected_language} speakers"""
    
    async def update_session_language(
        self,
        session_id: str,
        language_code: str,
        realtime_client
    ) -> None:
        """Dynamically update session language configuration mid-call."""
        if self._session_languages.get(session_id) == language_code:
            return  # No change needed
        
        self._session_languages[session_id] = language_code
        lang_config = self.get_config_for_language(language_code)
        
        # Update the live session
        await realtime_client._enqueue_event({
            "type": "session.update",
            "session": {
                "voice": lang_config["voice"],
                "input_audio_transcription": {
                    "model": "whisper-1",
                    "language": language_code
                },
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.5,
                    "prefix_padding_ms": 300,
                    "silence_duration_ms": lang_config["turn_detection_silence_ms"]
                }
            }
        })
        
        logger.info(
            "language_router.session_updated",
            session_id=session_id,
            new_language=language_code
        )

Building the Complete Agent Layer

With the core infrastructure in place, we can build the actual agent logic. The base agent class wires together the RealtimeClient, EmotionHandler, LanguageRouter, and tool integrations into a cohesive conversational system.

Base Agent Class

# agents/base_agent.py

import asyncio
import time
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any
import structlog

from core.realtime_client import RealtimeClient, SessionConfig
from core.audio_processor import AudioProcessor
from core.emotion_handler import EmotionHandler
from core.language_router import LanguageRouter
from core.session_manager import SessionManager

logger = structlog.get_logger(__name__)


class BaseVoiceAgent(ABC):
    """
    Abstract base class for voice agents.
    Subclass this to create domain-specific agents.
    """
    
    def __init__(
        self,
        api_key: str,
        session_id: str,
        session_manager: SessionManager,
        config: Dict[str, Any]
    ):
        self.api_key = api_key
        self.session_id = session_id
        self.session_manager = session_manager
        self.config = config
        
        self.audio_processor = AudioProcessor()
        self.emotion_handler = EmotionHandler(
            escalation_callback=self._handle_escalation_request
        )
        self.language_router = LanguageRouter(
            default_language=config.get("default_language", "en")
        )
        
        self.realtime_client: Optional[RealtimeClient] = None
        self._call_start_time = time.time()
        self._tools = self._build_tools()
        
    @abstractmethod
    def _get_system_instructions(self) -> str:
        """Return the system prompt for this agent type."""
        pass
    
    @abstractmethod
    def _build_tools(self) -> List[Dict]:
        """Return the tool definitions for this agent."""
        pass
    
    @abstractmethod
    async def _execute_tool(self, tool_name: str, arguments: Dict) -> str:
        """Execute a tool call and return the result as a string."""
        pass
    
    async def initialize(self, caller_phone: Optional[str] = None) -> None:
        """Initialize the agent, detect language, and connect to Realtime API."""
        
        # Detect initial language from phone number if available
        initial_language = self.config.get("default_language", "en")
        if caller_phone:
            detected = self.language_router.detect_from_phone_prefix(caller_phone)
            if detected:
                initial_language = detected
        
        instructions = self.language_router.build_multilingual_instructions(
            self._get_system_instructions(),
            initial_language
        )
        
        session_config = SessionConfig(
            instructions=instructions,
            voice=self.language_router.get_config_for_language(initial_language)["voice"],
            tools=self._tools,
            emotion_detection=True,
            emotion_sensitivity="high",
            turn_detection={
                "type": "server_vad",
                "threshold": 0.5,
                "prefix_padding_ms": 300,
                "silence_duration_ms": self.language_router.get_config_for_language(
                    initial_language
                )["turn_detection_silence_ms"]
            }
        )
        
        self.realtime_client = RealtimeClient(
            api_key=self.api_key,
            session_config=session_config,
            event_handlers={
                "emotion_detected": self._on_emotion_detected,
                "function_call": self._on_function_call,
                "transcript_done": self._on_transcript_done,
                "error": self._on_error
            }
        )
        
        await self.realtime_client.connect()
        
        # Store session metadata
        await self.session_manager.create_session(self.session_id, {
            "agent_type": self.__class__.__name__,
            "language": initial_language,
            "caller_phone": caller_phone,
            "start_time": self._call_start_time
        })
        
        logger.info(
            "voice_agent.initialized",
            session_id=self.session_id,
            language=initial_language,
            agent_type=self.__class__.__name__
        )
    
    async def process_audio_input(self, audio_bytes: bytes, source_format: str = "pcm16") -> None:
        """Process incoming audio from telephony/WebRTC."""
        
        # Convert format if necessary
        if source_format == "mulaw":
            audio_bytes = AudioProcessor.pcm_mulaw_to_pcm16(audio_bytes)
        
        # Resample if needed (Twilio sends 8kHz, we need 24kHz)
        source_rate = 8000 if source_format == "mulaw" else 24000
        if source_rate != 24000:
            audio_bytes = AudioProcessor.resample_pcm16(audio_bytes, source_rate)
        
        # Stream in chunks for optimal latency
        for chunk in AudioProcessor.chunk_audio(audio_bytes):
            await self.realtime_client.send_audio_chunk(chunk)
    
    async def get_audio_output(self):
        """Async generator yielding audio output chunks."""
        async for chunk in self.realtime_client.stream_audio_output():
            yield chunk
    
    async def _on_emotion_detected(self, emotion_data: Dict) -> None:
        """Handle emotion detection events."""
        adjustments = await self.emotion_handler.process_emotion_event(emotion_data)
        
        if adjustments and adjustments.get("instructions_addendum"):
            # Inject adaptive instructions into the conversation
            await self.realtime_client.inject_text(
                f"[SYSTEM CONTEXT UPDATE: {adjustments['instructions_addendum']}]",
                role="system"
            )
    
    async def _on_function_call(self, event: Dict) -> None:
        """Handle tool/function call requests from the model."""
        tool_name = event.get("name", "")
        call_id = event.get("call_id", "")
        
        try:
            arguments = event.get("arguments", {})
            if isinstance(arguments, str):
                import json
                arguments = json.loads(arguments)
            
            logger.info(
                "voice_agent.tool_call",
                tool=tool_name,
                session_id=self.session_id
            )
            
            result = await self._execute_tool(tool_name, arguments)
            
            # Return tool result to the model
            await self.realtime_client._enqueue_event({
                "type": "conversation.item.create",
                "item": {
                    "type": "function_call_output",
                    "call_id": call_id,
                    "output": result
                }
            })
            await self.realtime_client.create_response()
            
        except Exception as e:
            logger.error(
                "voice_agent.tool_call_error",
                tool=tool_name,
                error=str(e)
            )
            await self.realtime_client._enqueue_event({
                "type": "conversation.item.create",
                "item": {
                    "type": "function_call_output",
                    "call_id": call_id,
                    "output": f"Error executing {tool_name}: {str(e)}"
                }
            })
    
    async def _on_transcript_done(self, transcript: str) -> None:
        """Store completed transcript segment."""
        await self.session_manager.append_transcript(
            self.session_id,
            {"role": "assistant", "content": transcript, "timestamp": time.time()}
        )
    
    async def _on_error(self, error: Dict) -> None:
        """Handle API errors."""
        logger.error(
            "voice_agent.api_error",
            code=error.get("code"),
            message=error.get("message"),
            session_id=self.session_id
        )
    
    async def _handle_escalation_request(self, escalation_data: Dict) -> None:
        """Handle requests for human escalation."""
        logger.warning(
            "voice_agent.escalation_requested",
            reason=escalation_data.get("reason"),
            session_id=self.session_id
        )
        # Subclasses implement domain-specific escalation routing
        await self._escalate_to_human(escalation_data)
    
    @abstractmethod
    async def _escalate_to_human(self, escalation_data: Dict) -> None:
        """Implement human escalation for this agent type."""
        pass
    
    async def end_session(self) -> Dict:
        """Gracefully end the session and return summary metrics."""
        duration = time.time() - self._call_start_time
        sentiment_trend = self.emotion_handler.get_sentiment_trend()
        
        summary = {
            "session_id": self.session_id,
            "duration_seconds": duration,
            "sentiment_trend": sentiment_trend,
            "escalated": self.emotion_handler._escalation_triggered,
            "emotion_history_count": len(self.emotion_handler.emotion_history)
        }
        
        await self.session_manager.close_session(self.session_id, summary)
        await self.realtime_client.close()
        
        logger.info("voice_agent.session_ended", **summary)
        return summary

Telephony Integration with Twilio Media Streams

For enterprise deployments handling inbound calls, Twilio Media Streams is the most common integration path. It provides a WebSocket bridge between the PSTN (public telephone network) and your application, streaming bidirectional μ-law audio at 8kHz. Your voice agent acts as the WebSocket server that Twilio connects to.

Understanding the nuances of telephony integration is critical for production deployments.

For a deeper exploration of related enterprise AI strategies, our comprehensive guide on 12 Agentic Workflow Design Patterns for 2026 provides detailed implementation frameworks and practical workflows that complement the approaches discussed in this article.

covers authentication patterns and rate limiting strategies that apply directly to high-volume telephony use cases.

Twilio Media Streams Bridge (Node.js)

// telephony/twilio_bridge.js

const WebSocket = require('ws');
const express = require('express');
const { createClient } = require('redis');
const fetch = require('node-fetch');

const app = express();
app.use(express.urlencoded({ extended: false }));

const PORT = process.env.PORT || 3000;
const AGENT_SERVICE_URL = process.env.AGENT_SERVICE_URL || 'http://localhost:8000';

// Redis for session coordination
const redis = createClient({ url: process.env.REDIS_URL });
redis.connect();

// TwiML endpoint - Twilio calls this when a call arrives
app.post('/incoming-call', (req, res) => {
  const callSid = req.body.CallSid;
  const callerPhone = req.body.From;
  
  console.log(`Incoming call: ${callSid} from ${callerPhone}`);
  
  // Respond with TwiML to initiate Media Streams
  const twiml = `

  
    
      
      
    
  
`;
  
  res.type('text/xml');
  res.send(twiml);
});

// WebSocket server for Twilio Media Streams
const wss = new WebSocket.Server({ noServer: true });

wss.on('connection', async (twilioWs, req) => {
  let callSid = null;
  let callerPhone = null;
  let agentWs = null;
  let streamSid = null;
  
  console.log('Twilio Media Stream connected');
  
  twilioWs.on('message', async (data) => {
    const message = JSON.parse(data);
    
    switch (message.event) {
      case 'start':
        // Extract call metadata
        callSid = message.start.callSid;
        streamSid = message.start.streamSid;
        callerPhone = message.start.customParameters?.callerPhone;
        
        console.log(`Stream started: ${streamSid} for call ${callSid}`);
        
        // Initialize agent session via HTTP
        try {
          const response = await fetch(`${AGENT_SERVICE_URL}/sessions/create`, {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify({
              session_id: callSid,
              caller_phone: callerPhone,
              agent_type: 'customer_service'
            })
          });
          
          if (!response.ok) {
            throw new Error(`Agent init failed: ${response.status}`);
          }
          
          // Store session mapping
          await redis.set(`stream:${streamSid}`, callSid, { EX: 28800 });
          
        } catch (err) {
          console.error('Failed to initialize agent session:', err);
          twilioWs.close();
          return;
        }
        break;
        
      case 'media':
        // Forward audio to agent service
        if (callSid) {
          try {
            await fetch(`${AGENT_SERVICE_URL}/sessions/${callSid}/audio`, {
              method: 'POST',
              headers: { 'Content-Type': 'application/json' },
              body: JSON.stringify({
                audio_payload: message.media.payload,  // Base64 μ-law
                timestamp: message.media.timestamp,
                track: message.media.track  // 'inbound' or 'outbound'
              })
            });
          } catch (err) {
            // Non-fatal: log and continue
            console.warn('Audio forward error:', err.message);
          }
        }
        break;
        
      case 'stop':
        console.log(`Stream stopped: ${streamSid}`);
        if (callSid) {
          await fetch(`${AGENT_SERVICE_URL}/sessions/${callSid}/end`, {
            method: 'POST'
          }).catch(console.error);
          await redis.del(`stream:${streamSid}`);
        }
        break;
    }
  });
  
  // Send audio back to Twilio
  async function sendAudioToTwilio(audioBase64) {
    if (twilioWs.readyState === WebSocket.OPEN && streamSid) {
      twilioWs.send(JSON.stringify({
        event: 'media',
        streamSid: streamSid,
        media: {
          payload: audioBase64  // Base64 μ-law audio
        }
      }));
    }
  }
  
  // Poll for agent audio output
  if (callSid) {
    const pollInterval = setInterval(async () => {
      try {
        const response = await fetch(
          `${AGENT_SERVICE_URL}/sessions/${callSid}/audio-output`
        );
        if (response.ok) {
          const data = await response.json();
          if (data.audio_chunks && data.audio_chunks.length > 0) {
            for (const chunk of data.audio_chunks) {
              await sendAudioToTwilio(chunk);
            }
          }
        }
      } catch (err) {
        // Polling errors are non-fatal
      }
    }, 20);  // Poll every 20ms
    
    twilioWs.on('close', () => clearInterval(pollInterval));
  }
  
  twilioWs.on('error', (err) => {
    console.error('Twilio WebSocket error:', err);
  });
  
  twilioWs.on('close', () => {
    console.log(`Twilio WebSocket closed for call ${callSid}`);
  });
});

// HTTP server with WebSocket upgrade
const server = app.listen(PORT, () => {
  console.log(`Twilio bridge listening on port ${PORT}`);
});

server.on('upgrade', (request, socket, head) => {
  if (request.url === '/media-stream') {
    wss.handleUpgrade(request, socket, head, (ws) => {
      wss.emit('connection', ws, request);
    });
  } else {
    socket.destroy();
  }
});

Access 40,000+ AI Prompts for ChatGPT, Claude & Codex — Free!

Subscribe to get instant access to our complete Notion Prompt Library — the largest curated collection of prompts for ChatGPT, Claude, OpenAI Codex, and other leading AI models. Optimized for real-world workflows across coding, research, content creation, and business.

Get Free Access to the Prompt Library →

Production Deployment Architecture

Deploying voice agents at enterprise scale requires careful attention to connection management, fault tolerance, observability, and cost optimization. A single GPT-5.5 Advanced Voice session consumes approximately 0.06 audio tokens per millisecond of audio (input + output combined), which translates to meaningful infrastructure cost at high call volumes.

Kubernetes Deployment Configuration

# deployment/k8s/voice-agent-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
name: voice-agent-service
namespace: ai-services
labels:
app: voice-agent
version: "2.0"
spec:
replicas: 10
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 2
maxUnavailable: 0 # Zero-downtime: critical for active calls
selector:
matchLabels:
app: voice-agent
template:
metadata:
labels:
app: voice-agent
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9090"
spec:
# Ensure pods don't share nodes to avoid noisy neighbor issues
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- voice-agent
topologyKey: kubernetes.io/hostname

containers:
- name: voice-agent
image: your-registry/voice-agent:2.0.0
ports:
- containerPort: 8000
name: http
- container

Get Free Access to 40,000+ AI Prompts for ChatGPT, Claude & Codex

Subscribe for instant access to the largest curated Notion Prompt Library for AI workflows.

More on this