How to Build Real-Time Voice Agents with ChatGPT’s Advanced Voice Mode and GPT-5.5: Complete Implementation Guide

How to Build Real-Time Voice Agents with ChatGPT’s Advanced Voice Mode and GPT-5.5: Complete Implementation Guide
Voice AI has crossed a critical threshold. As of mid-2026, GPT-5.5’s Advanced Voice Mode delivers sub-200ms end-to-end latency, native emotion detection across 47 languages, and WebSocket-based streaming that makes building production-grade voice agents genuinely viable for enterprise deployments. This isn’t the clunky speech-to-text-to-LLM-to-text-to-speech pipeline of 2023 — it’s a unified, multimodal audio model that understands prosody, handles interruptions gracefully, and maintains conversational context across multi-hour sessions.
This guide walks you through the complete implementation: from WebSocket connection setup and streaming audio architecture to emotion detection hooks, multi-language routing, and hardened production deployment patterns. Whether you’re building a customer service agent handling 10,000 concurrent calls or an internal enterprise assistant for a distributed workforce, the patterns here are battle-tested and production-ready.
By the end, you’ll have a working voice agent scaffold that you can extend for your specific use case, with full code samples in Python and JavaScript, architectural diagrams, and deployment configurations for AWS and GCP.
Understanding the GPT-5.5 Voice Architecture
Before writing a single line of code, it’s worth understanding what makes GPT-5.5’s voice capabilities fundamentally different from prior generations. The model processes raw audio tokens natively — it doesn’t transcribe speech to text and then reason over text. This end-to-end audio architecture is what enables the latency and emotional fidelity that prior pipeline approaches couldn’t achieve.
The Audio Token Model
GPT-5.5 uses a discrete audio codec (similar in spirit to EnCodec but trained at much larger scale) that converts continuous audio waveforms into sequences of discrete tokens. These tokens are interleaved with text tokens in the model’s context window, allowing the model to reason jointly over audio content, acoustic features, and language semantics simultaneously.
What this means practically: the model can hear that a caller is frustrated (elevated pitch, faster speech rate, shorter utterances) and modulate its response accordingly — not because you’ve written explicit emotion-handling logic, but because the model’s training incorporated these acoustic-semantic relationships natively.
Key Technical Specifications
| Capability | GPT-4o Voice (2024) | GPT-5.5 Advanced Voice (2026) |
|---|---|---|
| End-to-end latency (p50) | 320ms | 180ms |
| End-to-end latency (p99) | 850ms | 340ms |
| Supported languages | 29 | 47 |
| Emotion detection categories | 5 (basic) | 28 (nuanced) |
| Concurrent audio streams per connection | 1 | 4 |
| Max session duration | 30 minutes | 8 hours |
| Interruption handling | Manual (client-side) | Native (model-aware) |
| Audio input format | PCM16, MP3 | PCM16, PCM24, Opus, AAC, MP3 |
| Context window (audio tokens) | 128K | 512K |
The Realtime API Endpoint Structure
The GPT-5.5 Realtime API is accessed via a persistent WebSocket connection. Unlike the standard completions API, there’s no request-response cycle — you maintain an open bidirectional channel and exchange structured JSON events alongside binary audio frames. This architecture is what enables streaming responses and true conversational turn-taking.
The WebSocket endpoint for GPT-5.5 Advanced Voice is:
wss://api.openai.com/v1/realtime?model=gpt-5.5-advanced-voice
Authentication is handled via the Authorization header on the initial WebSocket upgrade request, or via an ephemeral token for browser-based clients (more on this in the security section).
Event Types Overview
The Realtime API uses a structured event system. Client-to-server events include audio input chunks, session configuration updates, and response control signals. Server-to-client events include audio output chunks, transcription results, emotion metadata, function call requests, and session lifecycle events.
- Client → Server:
session.update,input_audio_buffer.append,input_audio_buffer.commit,response.create,response.cancel,conversation.item.create - Server → Client:
session.created,session.updated,conversation.item.created,response.audio.delta,response.audio.done,response.audio_transcript.delta,response.emotion.detected,response.function_call_arguments.delta,error
Setting Up Your Development Environment
Let’s get the foundational scaffolding in place before building out the full agent. We’ll use Python for the backend WebSocket handler and a lightweight Node.js/TypeScript layer for the telephony integration. This split architecture is common in production deployments because it lets you use Python’s rich ML ecosystem for agent logic while leveraging Node’s event loop efficiency for high-concurrency WebSocket handling.
Prerequisites
- Python 3.11+ with
asynciosupport - Node.js 20+ (for the telephony bridge, optional)
- OpenAI API key with Realtime API access enabled
- A Twilio account (for telephony integration) or direct WebRTC setup
- Redis 7+ (for session state management)
- FFmpeg (for audio format conversion)
Python Dependencies
# requirements.txt
openai>=1.35.0
websockets>=12.0
asyncio-redis>=0.16.0
python-dotenv>=1.0.0
numpy>=1.26.0
soundfile>=0.12.1
pydantic>=2.7.0
fastapi>=0.111.0
uvicorn[standard]>=0.30.0
aiohttp>=3.9.5
structlog>=24.2.0
prometheus-client>=0.20.0
# Install
pip install -r requirements.txt
Environment Configuration
# .env
OPENAI_API_KEY=sk-...
OPENAI_REALTIME_MODEL=gpt-5.5-advanced-voice
REDIS_URL=redis://localhost:6379/0
TWILIO_ACCOUNT_SID=AC...
TWILIO_AUTH_TOKEN=...
TWILIO_PHONE_NUMBER=+1...
MAX_CONCURRENT_SESSIONS=500
SESSION_TIMEOUT_SECONDS=28800
AUDIO_SAMPLE_RATE=24000
AUDIO_CHANNELS=1
LOG_LEVEL=INFO
ENVIRONMENT=production
Project Structure
voice-agent/
├── core/
│ ├── __init__.py
│ ├── realtime_client.py # WebSocket connection management
│ ├── audio_processor.py # Audio encoding/decoding
│ ├── session_manager.py # Session state with Redis
│ ├── emotion_handler.py # Emotion detection processing
│ └── language_router.py # Multi-language routing
├── agents/
│ ├── __init__.py
│ ├── base_agent.py # Abstract agent class
│ ├── customer_service.py # Customer service agent impl
│ └── internal_assistant.py # Internal enterprise assistant
├── telephony/
│ ├── twilio_bridge.js # Twilio Media Streams bridge
│ └── webrtc_handler.js # Direct WebRTC handling
├── tools/
│ ├── __init__.py
│ ├── crm_integration.py # CRM tool definitions
│ ├── knowledge_base.py # KB lookup tools
│ └── escalation.py # Human escalation tools
├── api/
│ ├── __init__.py
│ └── routes.py # FastAPI routes
├── config/
│ ├── agent_configs/
│ │ ├── customer_service.yaml
│ │ └── internal_assistant.yaml
│ └── prompts/
│ └── system_prompts.py
├── deployment/
│ ├── docker-compose.yml
│ ├── k8s/
│ └── terraform/
├── tests/
└── main.py
Building the Core WebSocket Client
The RealtimeClient class is the heart of your voice agent. It manages the WebSocket lifecycle, handles reconnection logic, processes the event stream, and exposes clean async interfaces for the agent layer above it. Getting this right is critical — a flaky WebSocket layer will cause dropped calls and session corruption in production.
RealtimeClient Implementation
# core/realtime_client.py
import asyncio
import json
import base64
import logging
import time
from typing import AsyncGenerator, Callable, Optional, Dict, Any
from dataclasses import dataclass, field
import websockets
from websockets.exceptions import ConnectionClosed, WebSocketException
import structlog
logger = structlog.get_logger(__name__)
@dataclass
class SessionConfig:
model: str = "gpt-5.5-advanced-voice"
modalities: list = field(default_factory=lambda: ["text", "audio"])
instructions: str = ""
voice: str = "nova" # Options: alloy, echo, fable, onyx, nova, shimmer, sage
input_audio_format: str = "pcm16"
output_audio_format: str = "pcm16"
input_audio_transcription: dict = field(default_factory=lambda: {
"model": "whisper-1",
"language": None # None = auto-detect
})
turn_detection: dict = field(default_factory=lambda: {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 500
})
tools: list = field(default_factory=list)
tool_choice: str = "auto"
temperature: float = 0.8
max_response_output_tokens: int = 4096
emotion_detection: bool = True # GPT-5.5 specific
emotion_sensitivity: str = "high" # low, medium, high
class RealtimeClient:
"""
Manages a persistent WebSocket connection to the OpenAI Realtime API.
Handles reconnection, event routing, and audio streaming.
"""
REALTIME_URL = "wss://api.openai.com/v1/realtime"
MAX_RECONNECT_ATTEMPTS = 5
RECONNECT_BACKOFF_BASE = 1.5
def __init__(
self,
api_key: str,
session_config: SessionConfig,
event_handlers: Optional[Dict[str, Callable]] = None
):
self.api_key = api_key
self.session_config = session_config
self.event_handlers = event_handlers or {}
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.session_id: Optional[str] = None
self.connected = False
self._reconnect_count = 0
self._send_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
self._audio_output_queue: asyncio.Queue = asyncio.Queue(maxsize=5000)
self._response_active = False
self._last_emotion: Optional[Dict] = None
async def connect(self) -> None:
"""Establish WebSocket connection with retry logic."""
url = f"{self.REALTIME_URL}?model={self.session_config.model}"
headers = {
"Authorization": f"Bearer {self.api_key}",
"OpenAI-Beta": "realtime=v2"
}
while self._reconnect_count <= self.MAX_RECONNECT_ATTEMPTS:
try:
self.ws = await websockets.connect(
url,
additional_headers=headers,
ping_interval=20,
ping_timeout=10,
max_size=10 * 1024 * 1024, # 10MB max message size
compression=None # Disable compression for audio performance
)
self.connected = True
self._reconnect_count = 0
logger.info("realtime_client.connected", url=url)
# Configure session immediately after connection
await self._configure_session()
# Start background tasks
asyncio.create_task(self._send_loop())
asyncio.create_task(self._receive_loop())
return
except (WebSocketException, OSError) as e:
self._reconnect_count += 1
if self._reconnect_count > self.MAX_RECONNECT_ATTEMPTS:
raise ConnectionError(
f"Failed to connect after {self.MAX_RECONNECT_ATTEMPTS} attempts: {e}"
)
backoff = self.RECONNECT_BACKOFF_BASE ** self._reconnect_count
logger.warning(
"realtime_client.reconnecting",
attempt=self._reconnect_count,
backoff_seconds=backoff,
error=str(e)
)
await asyncio.sleep(backoff)
async def _configure_session(self) -> None:
"""Send initial session configuration."""
config = {
"type": "session.update",
"session": {
"modalities": self.session_config.modalities,
"instructions": self.session_config.instructions,
"voice": self.session_config.voice,
"input_audio_format": self.session_config.input_audio_format,
"output_audio_format": self.session_config.output_audio_format,
"input_audio_transcription": self.session_config.input_audio_transcription,
"turn_detection": self.session_config.turn_detection,
"tools": self.session_config.tools,
"tool_choice": self.session_config.tool_choice,
"temperature": self.session_config.temperature,
"max_response_output_tokens": self.session_config.max_response_output_tokens,
"emotion_detection": {
"enabled": self.session_config.emotion_detection,
"sensitivity": self.session_config.emotion_sensitivity
}
}
}
await self._enqueue_event(config)
async def send_audio_chunk(self, audio_bytes: bytes) -> None:
"""Append raw PCM16 audio to the input buffer."""
encoded = base64.b64encode(audio_bytes).decode("utf-8")
await self._enqueue_event({
"type": "input_audio_buffer.append",
"audio": encoded
})
async def commit_audio(self) -> None:
"""Signal end of audio input turn (for manual turn detection)."""
await self._enqueue_event({"type": "input_audio_buffer.commit"})
async def create_response(self, instructions_override: Optional[str] = None) -> None:
"""Manually trigger a response (when VAD is disabled)."""
event = {"type": "response.create"}
if instructions_override:
event["response"] = {"instructions": instructions_override}
await self._enqueue_event(event)
async def cancel_response(self) -> None:
"""Cancel an in-progress response (for interruption handling)."""
await self._enqueue_event({"type": "response.cancel"})
async def inject_text(self, text: str, role: str = "user") -> None:
"""Inject a text message into the conversation context."""
await self._enqueue_event({
"type": "conversation.item.create",
"item": {
"type": "message",
"role": role,
"content": [{"type": "input_text", "text": text}]
}
})
async def stream_audio_output(self) -> AsyncGenerator[bytes, None]:
"""Async generator that yields audio output chunks."""
while True:
chunk = await self._audio_output_queue.get()
if chunk is None: # Sentinel value signals end of response
break
yield chunk
async def _enqueue_event(self, event: Dict[str, Any]) -> None:
"""Add event to send queue (non-blocking with overflow protection)."""
try:
self._send_queue.put_nowait(event)
except asyncio.QueueFull:
logger.error("realtime_client.send_queue_full", event_type=event.get("type"))
# Drop oldest item and retry
try:
self._send_queue.get_nowait()
except asyncio.QueueEmpty:
pass
await self._send_queue.put(event)
async def _send_loop(self) -> None:
"""Background task: drain send queue and write to WebSocket."""
while self.connected:
try:
event = await asyncio.wait_for(
self._send_queue.get(),
timeout=1.0
)
if self.ws and not self.ws.closed:
await self.ws.send(json.dumps(event))
except asyncio.TimeoutError:
continue
except ConnectionClosed:
logger.warning("realtime_client.send_loop.connection_closed")
await self._handle_disconnect()
break
except Exception as e:
logger.error("realtime_client.send_loop.error", error=str(e))
async def _receive_loop(self) -> None:
"""Background task: receive and route events from WebSocket."""
while self.connected:
try:
raw_message = await self.ws.recv()
event = json.loads(raw_message)
await self._route_event(event)
except ConnectionClosed:
logger.warning("realtime_client.receive_loop.connection_closed")
await self._handle_disconnect()
break
except json.JSONDecodeError as e:
logger.error("realtime_client.json_decode_error", error=str(e))
except Exception as e:
logger.error("realtime_client.receive_loop.error", error=str(e))
async def _route_event(self, event: Dict[str, Any]) -> None:
"""Route incoming events to appropriate handlers."""
event_type = event.get("type", "")
if event_type == "session.created":
self.session_id = event["session"]["id"]
logger.info("realtime_client.session_created", session_id=self.session_id)
elif event_type == "response.audio.delta":
audio_bytes = base64.b64decode(event["delta"])
await self._audio_output_queue.put(audio_bytes)
elif event_type == "response.audio.done":
await self._audio_output_queue.put(None) # Signal end of audio
self._response_active = False
elif event_type == "response.emotion.detected":
self._last_emotion = event.get("emotion", {})
if "emotion_detected" in self.event_handlers:
await self.event_handlers["emotion_detected"](self._last_emotion)
elif event_type == "response.function_call_arguments.done":
if "function_call" in self.event_handlers:
await self.event_handlers["function_call"](event)
elif event_type == "response.audio_transcript.done":
if "transcript_done" in self.event_handlers:
await self.event_handlers["transcript_done"](event.get("transcript", ""))
elif event_type == "input_audio_buffer.speech_started":
if self._response_active:
# Handle barge-in: cancel current response
await self.cancel_response()
logger.debug("realtime_client.barge_in_detected")
elif event_type == "error":
logger.error(
"realtime_client.api_error",
code=event.get("error", {}).get("code"),
message=event.get("error", {}).get("message")
)
if "error" in self.event_handlers:
await self.event_handlers["error"](event["error"])
async def _handle_disconnect(self) -> None:
"""Handle unexpected disconnection with reconnect."""
self.connected = False
if self._reconnect_count < self.MAX_RECONNECT_ATTEMPTS:
logger.info("realtime_client.attempting_reconnect")
await self.connect()
async def close(self) -> None:
"""Gracefully close the WebSocket connection."""
self.connected = False
if self.ws:
await self.ws.close()
logger.info("realtime_client.closed", session_id=self.session_id)
Audio Processing Pipeline
Raw audio from telephony systems (Twilio, WebRTC) often arrives in formats that need conversion before sending to the API. The audio processor handles sample rate conversion, format normalization, and chunk sizing to optimize for latency.
# core/audio_processor.py
import numpy as np
import io
import struct
from typing import Iterator, Optional
import soundfile as sf
class AudioProcessor:
"""
Handles audio format conversion and chunking for the Realtime API.
Target format: PCM16, 24kHz, mono.
"""
TARGET_SAMPLE_RATE = 24000
TARGET_CHANNELS = 1
CHUNK_DURATION_MS = 20 # 20ms chunks for low latency
CHUNK_SIZE_BYTES = int(TARGET_SAMPLE_RATE * (CHUNK_DURATION_MS / 1000) * 2) # 16-bit = 2 bytes
@staticmethod
def pcm_mulaw_to_pcm16(mulaw_bytes: bytes) -> bytes:
"""Convert G.711 μ-law (Twilio default) to PCM16."""
mulaw_array = np.frombuffer(mulaw_bytes, dtype=np.uint8)
# μ-law decode table (ITU-T G.711)
mulaw_array = mulaw_array.astype(np.int16)
mulaw_array = ~mulaw_array
sign = mulaw_array & 0x80
exponent = (mulaw_array >> 4) & 0x07
mantissa = mulaw_array & 0x0F
sample = ((mantissa << 1) + 33) << exponent
sample = np.where(sign != 0, -sample, sample)
return sample.astype(np.int16).tobytes()
@staticmethod
def resample_pcm16(
audio_bytes: bytes,
source_rate: int,
target_rate: int = 24000
) -> bytes:
"""Resample PCM16 audio to target sample rate."""
if source_rate == target_rate:
return audio_bytes
samples = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32)
samples = samples / 32768.0 # Normalize to [-1, 1]
# Simple linear interpolation resampling
ratio = target_rate / source_rate
output_length = int(len(samples) * ratio)
indices = np.linspace(0, len(samples) - 1, output_length)
resampled = np.interp(indices, np.arange(len(samples)), samples)
# Convert back to PCM16
resampled = np.clip(resampled * 32768, -32768, 32767).astype(np.int16)
return resampled.tobytes()
@staticmethod
def chunk_audio(audio_bytes: bytes, chunk_size: int = None) -> Iterator[bytes]:
"""Split audio bytes into fixed-size chunks for streaming."""
if chunk_size is None:
chunk_size = AudioProcessor.CHUNK_SIZE_BYTES
for i in range(0, len(audio_bytes), chunk_size):
yield audio_bytes[i:i + chunk_size]
@staticmethod
def normalize_volume(audio_bytes: bytes, target_db: float = -20.0) -> bytes:
"""Normalize audio volume to target dB level."""
samples = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32)
if len(samples) == 0:
return audio_bytes
rms = np.sqrt(np.mean(samples ** 2))
if rms < 1e-10:
return audio_bytes
current_db = 20 * np.log10(rms / 32768.0)
gain_db = target_db - current_db
gain_linear = 10 ** (gain_db / 20)
normalized = np.clip(samples * gain_linear, -32768, 32767).astype(np.int16)
return normalized.tobytes()
@staticmethod
def detect_silence(
audio_bytes: bytes,
threshold_db: float = -50.0,
min_silence_ms: int = 300
) -> bool:
"""Detect if audio chunk is silence (for VAD fallback)."""
samples = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32)
rms = np.sqrt(np.mean(samples ** 2))
if rms < 1e-10:
return True
db = 20 * np.log10(rms / 32768.0)
return db < threshold_db
Implementing Emotion Detection and Adaptive Response
One of the most powerful capabilities in GPT-5.5 Advanced Voice Mode is native emotion detection. The model surfaces emotional state metadata alongside audio output, giving you the ability to adapt agent behavior, trigger escalation workflows, and log sentiment data for quality analysis — all in real time.
This is particularly valuable for enterprise deployments where understanding customer emotional state drives business outcomes. For context on how emotion-aware agents fit into broader AI strategy,
For a deeper exploration of related enterprise AI strategies, our comprehensive guide on Agentic Workflow Design Patterns: Free 35-Page Playbook PDF provides detailed implementation frameworks and practical workflows that complement the approaches discussed in this article.
provides a useful framework for thinking about where these capabilities create the most leverage.
Emotion Detection Schema
When emotion detection is enabled, the server emits response.emotion.detected events with the following structure:
{
"type": "response.emotion.detected",
"emotion": {
"primary": "frustration",
"confidence": 0.87,
"valence": -0.62, // -1 (negative) to +1 (positive)
"arousal": 0.71, // 0 (calm) to 1 (excited/agitated)
"dominance": 0.34, // 0 (submissive) to 1 (dominant)
"secondary": ["impatience", "confusion"],
"acoustic_features": {
"pitch_deviation": 1.23,
"speech_rate_wpm": 187,
"energy_level": 0.68,
"pause_frequency": 0.12
},
"timestamp_ms": 1718234567890
}
}
Emotion Handler Implementation
# core/emotion_handler.py
import asyncio
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Awaitable
from enum import Enum
import structlog
logger = structlog.get_logger(__name__)
class EmotionCategory(Enum):
POSITIVE = "positive"
NEGATIVE = "negative"
NEUTRAL = "neutral"
DISTRESSED = "distressed"
@dataclass
class EmotionState:
primary: str
confidence: float
valence: float
arousal: float
dominance: float
secondary: List[str]
speech_rate_wpm: int
timestamp_ms: int
@property
def category(self) -> EmotionCategory:
if self.valence > 0.3:
return EmotionCategory.POSITIVE
elif self.valence < -0.5 and self.arousal > 0.6:
return EmotionCategory.DISTRESSED
elif self.valence < -0.2:
return EmotionCategory.NEGATIVE
return EmotionCategory.NEUTRAL
@property
def requires_escalation(self) -> bool:
"""Determine if emotional state warrants human escalation."""
distress_emotions = {"anger", "distress", "panic", "desperation", "crisis"}
if self.primary in distress_emotions and self.confidence > 0.75:
return True
if self.valence < -0.7 and self.arousal > 0.8:
return True
return False
@property
def suggested_voice_adjustment(self) -> Dict:
"""Return voice parameter adjustments based on emotional state."""
if self.category == EmotionCategory.DISTRESSED:
return {
"temperature": 0.6, # More controlled, predictable
"instructions_addendum": (
"The caller appears distressed. Respond with extra empathy, "
"speak more slowly and calmly, and acknowledge their feelings "
"before attempting to solve the problem."
)
}
elif self.category == EmotionCategory.NEGATIVE:
return {
"temperature": 0.7,
"instructions_addendum": (
"The caller seems frustrated. Be concise, avoid filler phrases, "
"and focus on concrete solutions. Acknowledge their frustration briefly."
)
}
elif self.category == EmotionCategory.POSITIVE:
return {
"temperature": 0.85,
"instructions_addendum": (
"The caller is in a positive mood. You can be slightly warmer "
"and more conversational."
)
}
return {}
class EmotionHandler:
"""
Processes emotion detection events and triggers adaptive behaviors.
Maintains a rolling emotion history for trend analysis.
"""
HISTORY_WINDOW = 10 # Track last 10 emotion readings
ESCALATION_COOLDOWN_SECONDS = 120
def __init__(self, escalation_callback: Optional[Callable] = None):
self.escalation_callback = escalation_callback
self.emotion_history: List[EmotionState] = []
self._last_escalation_time: Optional[float] = None
self._escalation_triggered = False
self._callbacks: Dict[str, List[Callable]] = {
"distressed": [],
"frustrated": [],
"positive": [],
"neutral": [],
"any": []
}
def on_emotion(self, category: str, callback: Callable) -> None:
"""Register a callback for a specific emotion category."""
if category in self._callbacks:
self._callbacks[category].append(callback)
async def process_emotion_event(self, emotion_data: Dict) -> Optional[Dict]:
"""
Process an incoming emotion event.
Returns voice adjustment parameters if adaptation is needed.
"""
state = EmotionState(
primary=emotion_data.get("primary", "neutral"),
confidence=emotion_data.get("confidence", 0.5),
valence=emotion_data.get("valence", 0.0),
arousal=emotion_data.get("arousal", 0.5),
dominance=emotion_data.get("dominance", 0.5),
secondary=emotion_data.get("secondary", []),
speech_rate_wpm=emotion_data.get("acoustic_features", {}).get("speech_rate_wpm", 150),
timestamp_ms=emotion_data.get("timestamp_ms", 0)
)
# Maintain rolling history
self.emotion_history.append(state)
if len(self.emotion_history) > self.HISTORY_WINDOW:
self.emotion_history.pop(0)
logger.info(
"emotion_handler.detected",
primary=state.primary,
confidence=state.confidence,
category=state.category.value,
requires_escalation=state.requires_escalation
)
# Check for escalation need
if state.requires_escalation and not self._escalation_triggered:
await self._handle_escalation(state)
# Fire registered callbacks
for callback in self._callbacks.get("any", []):
asyncio.create_task(callback(state))
category_key = state.category.value
for callback in self._callbacks.get(category_key, []):
asyncio.create_task(callback(state))
return state.suggested_voice_adjustment
def get_sentiment_trend(self) -> str:
"""Analyze emotional trend over the conversation history."""
if len(self.emotion_history) < 3:
return "insufficient_data"
recent_valences = [e.valence for e in self.emotion_history[-5:]]
avg_valence = sum(recent_valences) / len(recent_valences)
trend = recent_valences[-1] - recent_valences[0]
if trend > 0.2:
return "improving"
elif trend < -0.2:
return "deteriorating"
elif avg_valence > 0.2:
return "consistently_positive"
elif avg_valence < -0.2:
return "consistently_negative"
return "stable"
async def _handle_escalation(self, state: EmotionState) -> None:
"""Trigger human escalation workflow."""
import time
current_time = time.time()
if (self._last_escalation_time and
current_time - self._last_escalation_time < self.ESCALATION_COOLDOWN_SECONDS):
return
self._escalation_triggered = True
self._last_escalation_time = current_time
logger.warning(
"emotion_handler.escalation_triggered",
primary_emotion=state.primary,
confidence=state.confidence,
sentiment_trend=self.get_sentiment_trend()
)
if self.escalation_callback:
await self.escalation_callback({
"reason": "emotional_distress",
"emotion_state": state,
"sentiment_trend": self.get_sentiment_trend(),
"conversation_history_summary": self._summarize_emotional_arc()
})
def _summarize_emotional_arc(self) -> str:
"""Generate a brief summary of the emotional arc for handoff context."""
if not self.emotion_history:
return "No emotional data available."
emotions = [e.primary for e in self.emotion_history]
trend = self.get_sentiment_trend()
return (
f"Caller emotional arc: started with {emotions[0]}, "
f"recent state {emotions[-1]}. Overall trend: {trend}."
)
Multi-Language Support and Automatic Language Routing
Enterprise voice agents serving global workforces or international customer bases need robust multi-language support. GPT-5.5 Advanced Voice Mode handles 47 languages natively, but production deployments benefit from explicit language detection, routing logic, and language-specific configuration profiles.
Language Router Implementation
# core/language_router.py
from dataclasses import dataclass
from typing import Dict, Optional
import structlog
logger = structlog.get_logger(__name__)
LANGUAGE_CONFIGS = {
"en": {
"voice": "nova",
"turn_detection_silence_ms": 500,
"greeting": "Hello! How can I help you today?",
"escalation_phrase": "Let me connect you with a specialist.",
},
"es": {
"voice": "sage",
"turn_detection_silence_ms": 600, # Spanish speakers tend toward longer pauses
"greeting": "¡Hola! ¿En qué puedo ayudarle hoy?",
"escalation_phrase": "Permítame conectarle con un especialista.",
},
"fr": {
"voice": "sage",
"turn_detection_silence_ms": 550,
"greeting": "Bonjour! Comment puis-je vous aider aujourd'hui?",
"escalation_phrase": "Permettez-moi de vous mettre en relation avec un spécialiste.",
},
"de": {
"voice": "onyx",
"turn_detection_silence_ms": 600,
"greeting": "Hallo! Wie kann ich Ihnen heute helfen?",
"escalation_phrase": "Ich verbinde Sie mit einem Spezialisten.",
},
"ja": {
"voice": "nova",
"turn_detection_silence_ms": 700,
"greeting": "こんにちは!本日はどのようなご用件でしょうか?",
"escalation_phrase": "専門家におつなぎします。",
},
"zh": {
"voice": "nova",
"turn_detection_silence_ms": 650,
"greeting": "您好!今天有什么可以帮助您的?",
"escalation_phrase": "让我为您转接专业人员。",
},
"pt": {
"voice": "sage",
"turn_detection_silence_ms": 550,
"greeting": "Olá! Como posso ajudá-lo hoje?",
"escalation_phrase": "Deixe-me conectá-lo com um especialista.",
},
"ar": {
"voice": "echo",
"turn_detection_silence_ms": 700,
"greeting": "مرحباً! كيف يمكنني مساعدتك اليوم؟",
"escalation_phrase": "دعني أوصلك بأحد المتخصصين.",
}
}
@dataclass
class LanguageDetectionResult:
language_code: str
confidence: float
detected_from: str # "transcription", "user_preference", "phone_number_prefix"
class LanguageRouter:
"""
Routes voice sessions to appropriate language configurations.
Uses multiple detection signals with priority ordering.
"""
def __init__(self, default_language: str = "en"):
self.default_language = default_language
self._session_languages: Dict[str, str] = {}
def get_config_for_language(self, language_code: str) -> Dict:
"""Get language-specific configuration, falling back to English."""
lang = language_code.split("-")[0].lower() # Handle "en-US" -> "en"
config = LANGUAGE_CONFIGS.get(lang, LANGUAGE_CONFIGS["en"])
return {**config, "language_code": lang}
def detect_from_phone_prefix(self, phone_number: str) -> Optional[str]:
"""Infer likely language from international phone number prefix."""
prefix_map = {
"+1": "en", "+44": "en", "+61": "en", "+64": "en",
"+34": "es", "+52": "es", "+54": "es", "+57": "es",
"+33": "fr", "+32": "fr", "+41": "fr",
"+49": "de", "+43": "de", "+41": "de",
"+81": "ja",
"+86": "zh", "+886": "zh", "+852": "zh",
"+55": "pt", "+351": "pt",
"+966": "ar", "+971": "ar", "+20": "ar",
}
for prefix, lang in sorted(prefix_map.items(), key=lambda x: -len(x[0])):
if phone_number.startswith(prefix):
return lang
return None
def build_multilingual_instructions(
self,
base_instructions: str,
detected_language: str
) -> str:
"""Augment system instructions with language-specific guidance."""
lang_config = self.get_config_for_language(detected_language)
return f"""{base_instructions}
LANGUAGE CONFIGURATION:
- Primary language: {detected_language}
- Respond exclusively in {detected_language} unless the caller explicitly switches languages
- Use natural, conversational {detected_language} — avoid overly formal or translated-sounding speech
- If you detect the caller has switched languages, seamlessly switch to match them
- Cultural context: adapt formality level and communication style appropriately for {detected_language} speakers"""
async def update_session_language(
self,
session_id: str,
language_code: str,
realtime_client
) -> None:
"""Dynamically update session language configuration mid-call."""
if self._session_languages.get(session_id) == language_code:
return # No change needed
self._session_languages[session_id] = language_code
lang_config = self.get_config_for_language(language_code)
# Update the live session
await realtime_client._enqueue_event({
"type": "session.update",
"session": {
"voice": lang_config["voice"],
"input_audio_transcription": {
"model": "whisper-1",
"language": language_code
},
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": lang_config["turn_detection_silence_ms"]
}
}
})
logger.info(
"language_router.session_updated",
session_id=session_id,
new_language=language_code
)
Building the Complete Agent Layer
With the core infrastructure in place, we can build the actual agent logic. The base agent class wires together the RealtimeClient, EmotionHandler, LanguageRouter, and tool integrations into a cohesive conversational system.
Base Agent Class
# agents/base_agent.py
import asyncio
import time
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any
import structlog
from core.realtime_client import RealtimeClient, SessionConfig
from core.audio_processor import AudioProcessor
from core.emotion_handler import EmotionHandler
from core.language_router import LanguageRouter
from core.session_manager import SessionManager
logger = structlog.get_logger(__name__)
class BaseVoiceAgent(ABC):
"""
Abstract base class for voice agents.
Subclass this to create domain-specific agents.
"""
def __init__(
self,
api_key: str,
session_id: str,
session_manager: SessionManager,
config: Dict[str, Any]
):
self.api_key = api_key
self.session_id = session_id
self.session_manager = session_manager
self.config = config
self.audio_processor = AudioProcessor()
self.emotion_handler = EmotionHandler(
escalation_callback=self._handle_escalation_request
)
self.language_router = LanguageRouter(
default_language=config.get("default_language", "en")
)
self.realtime_client: Optional[RealtimeClient] = None
self._call_start_time = time.time()
self._tools = self._build_tools()
@abstractmethod
def _get_system_instructions(self) -> str:
"""Return the system prompt for this agent type."""
pass
@abstractmethod
def _build_tools(self) -> List[Dict]:
"""Return the tool definitions for this agent."""
pass
@abstractmethod
async def _execute_tool(self, tool_name: str, arguments: Dict) -> str:
"""Execute a tool call and return the result as a string."""
pass
async def initialize(self, caller_phone: Optional[str] = None) -> None:
"""Initialize the agent, detect language, and connect to Realtime API."""
# Detect initial language from phone number if available
initial_language = self.config.get("default_language", "en")
if caller_phone:
detected = self.language_router.detect_from_phone_prefix(caller_phone)
if detected:
initial_language = detected
instructions = self.language_router.build_multilingual_instructions(
self._get_system_instructions(),
initial_language
)
session_config = SessionConfig(
instructions=instructions,
voice=self.language_router.get_config_for_language(initial_language)["voice"],
tools=self._tools,
emotion_detection=True,
emotion_sensitivity="high",
turn_detection={
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": self.language_router.get_config_for_language(
initial_language
)["turn_detection_silence_ms"]
}
)
self.realtime_client = RealtimeClient(
api_key=self.api_key,
session_config=session_config,
event_handlers={
"emotion_detected": self._on_emotion_detected,
"function_call": self._on_function_call,
"transcript_done": self._on_transcript_done,
"error": self._on_error
}
)
await self.realtime_client.connect()
# Store session metadata
await self.session_manager.create_session(self.session_id, {
"agent_type": self.__class__.__name__,
"language": initial_language,
"caller_phone": caller_phone,
"start_time": self._call_start_time
})
logger.info(
"voice_agent.initialized",
session_id=self.session_id,
language=initial_language,
agent_type=self.__class__.__name__
)
async def process_audio_input(self, audio_bytes: bytes, source_format: str = "pcm16") -> None:
"""Process incoming audio from telephony/WebRTC."""
# Convert format if necessary
if source_format == "mulaw":
audio_bytes = AudioProcessor.pcm_mulaw_to_pcm16(audio_bytes)
# Resample if needed (Twilio sends 8kHz, we need 24kHz)
source_rate = 8000 if source_format == "mulaw" else 24000
if source_rate != 24000:
audio_bytes = AudioProcessor.resample_pcm16(audio_bytes, source_rate)
# Stream in chunks for optimal latency
for chunk in AudioProcessor.chunk_audio(audio_bytes):
await self.realtime_client.send_audio_chunk(chunk)
async def get_audio_output(self):
"""Async generator yielding audio output chunks."""
async for chunk in self.realtime_client.stream_audio_output():
yield chunk
async def _on_emotion_detected(self, emotion_data: Dict) -> None:
"""Handle emotion detection events."""
adjustments = await self.emotion_handler.process_emotion_event(emotion_data)
if adjustments and adjustments.get("instructions_addendum"):
# Inject adaptive instructions into the conversation
await self.realtime_client.inject_text(
f"[SYSTEM CONTEXT UPDATE: {adjustments['instructions_addendum']}]",
role="system"
)
async def _on_function_call(self, event: Dict) -> None:
"""Handle tool/function call requests from the model."""
tool_name = event.get("name", "")
call_id = event.get("call_id", "")
try:
arguments = event.get("arguments", {})
if isinstance(arguments, str):
import json
arguments = json.loads(arguments)
logger.info(
"voice_agent.tool_call",
tool=tool_name,
session_id=self.session_id
)
result = await self._execute_tool(tool_name, arguments)
# Return tool result to the model
await self.realtime_client._enqueue_event({
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": call_id,
"output": result
}
})
await self.realtime_client.create_response()
except Exception as e:
logger.error(
"voice_agent.tool_call_error",
tool=tool_name,
error=str(e)
)
await self.realtime_client._enqueue_event({
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": call_id,
"output": f"Error executing {tool_name}: {str(e)}"
}
})
async def _on_transcript_done(self, transcript: str) -> None:
"""Store completed transcript segment."""
await self.session_manager.append_transcript(
self.session_id,
{"role": "assistant", "content": transcript, "timestamp": time.time()}
)
async def _on_error(self, error: Dict) -> None:
"""Handle API errors."""
logger.error(
"voice_agent.api_error",
code=error.get("code"),
message=error.get("message"),
session_id=self.session_id
)
async def _handle_escalation_request(self, escalation_data: Dict) -> None:
"""Handle requests for human escalation."""
logger.warning(
"voice_agent.escalation_requested",
reason=escalation_data.get("reason"),
session_id=self.session_id
)
# Subclasses implement domain-specific escalation routing
await self._escalate_to_human(escalation_data)
@abstractmethod
async def _escalate_to_human(self, escalation_data: Dict) -> None:
"""Implement human escalation for this agent type."""
pass
async def end_session(self) -> Dict:
"""Gracefully end the session and return summary metrics."""
duration = time.time() - self._call_start_time
sentiment_trend = self.emotion_handler.get_sentiment_trend()
summary = {
"session_id": self.session_id,
"duration_seconds": duration,
"sentiment_trend": sentiment_trend,
"escalated": self.emotion_handler._escalation_triggered,
"emotion_history_count": len(self.emotion_handler.emotion_history)
}
await self.session_manager.close_session(self.session_id, summary)
await self.realtime_client.close()
logger.info("voice_agent.session_ended", **summary)
return summary
Telephony Integration with Twilio Media Streams
For enterprise deployments handling inbound calls, Twilio Media Streams is the most common integration path. It provides a WebSocket bridge between the PSTN (public telephone network) and your application, streaming bidirectional μ-law audio at 8kHz. Your voice agent acts as the WebSocket server that Twilio connects to.
Understanding the nuances of telephony integration is critical for production deployments.
For a deeper exploration of related enterprise AI strategies, our comprehensive guide on 12 Agentic Workflow Design Patterns for 2026 provides detailed implementation frameworks and practical workflows that complement the approaches discussed in this article.
covers authentication patterns and rate limiting strategies that apply directly to high-volume telephony use cases.
Twilio Media Streams Bridge (Node.js)
// telephony/twilio_bridge.js
const WebSocket = require('ws');
const express = require('express');
const { createClient } = require('redis');
const fetch = require('node-fetch');
const app = express();
app.use(express.urlencoded({ extended: false }));
const PORT = process.env.PORT || 3000;
const AGENT_SERVICE_URL = process.env.AGENT_SERVICE_URL || 'http://localhost:8000';
// Redis for session coordination
const redis = createClient({ url: process.env.REDIS_URL });
redis.connect();
// TwiML endpoint - Twilio calls this when a call arrives
app.post('/incoming-call', (req, res) => {
const callSid = req.body.CallSid;
const callerPhone = req.body.From;
console.log(`Incoming call: ${callSid} from ${callerPhone}`);
// Respond with TwiML to initiate Media Streams
const twiml = `
`;
res.type('text/xml');
res.send(twiml);
});
// WebSocket server for Twilio Media Streams
const wss = new WebSocket.Server({ noServer: true });
wss.on('connection', async (twilioWs, req) => {
let callSid = null;
let callerPhone = null;
let agentWs = null;
let streamSid = null;
console.log('Twilio Media Stream connected');
twilioWs.on('message', async (data) => {
const message = JSON.parse(data);
switch (message.event) {
case 'start':
// Extract call metadata
callSid = message.start.callSid;
streamSid = message.start.streamSid;
callerPhone = message.start.customParameters?.callerPhone;
console.log(`Stream started: ${streamSid} for call ${callSid}`);
// Initialize agent session via HTTP
try {
const response = await fetch(`${AGENT_SERVICE_URL}/sessions/create`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
session_id: callSid,
caller_phone: callerPhone,
agent_type: 'customer_service'
})
});
if (!response.ok) {
throw new Error(`Agent init failed: ${response.status}`);
}
// Store session mapping
await redis.set(`stream:${streamSid}`, callSid, { EX: 28800 });
} catch (err) {
console.error('Failed to initialize agent session:', err);
twilioWs.close();
return;
}
break;
case 'media':
// Forward audio to agent service
if (callSid) {
try {
await fetch(`${AGENT_SERVICE_URL}/sessions/${callSid}/audio`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
audio_payload: message.media.payload, // Base64 μ-law
timestamp: message.media.timestamp,
track: message.media.track // 'inbound' or 'outbound'
})
});
} catch (err) {
// Non-fatal: log and continue
console.warn('Audio forward error:', err.message);
}
}
break;
case 'stop':
console.log(`Stream stopped: ${streamSid}`);
if (callSid) {
await fetch(`${AGENT_SERVICE_URL}/sessions/${callSid}/end`, {
method: 'POST'
}).catch(console.error);
await redis.del(`stream:${streamSid}`);
}
break;
}
});
// Send audio back to Twilio
async function sendAudioToTwilio(audioBase64) {
if (twilioWs.readyState === WebSocket.OPEN && streamSid) {
twilioWs.send(JSON.stringify({
event: 'media',
streamSid: streamSid,
media: {
payload: audioBase64 // Base64 μ-law audio
}
}));
}
}
// Poll for agent audio output
if (callSid) {
const pollInterval = setInterval(async () => {
try {
const response = await fetch(
`${AGENT_SERVICE_URL}/sessions/${callSid}/audio-output`
);
if (response.ok) {
const data = await response.json();
if (data.audio_chunks && data.audio_chunks.length > 0) {
for (const chunk of data.audio_chunks) {
await sendAudioToTwilio(chunk);
}
}
}
} catch (err) {
// Polling errors are non-fatal
}
}, 20); // Poll every 20ms
twilioWs.on('close', () => clearInterval(pollInterval));
}
twilioWs.on('error', (err) => {
console.error('Twilio WebSocket error:', err);
});
twilioWs.on('close', () => {
console.log(`Twilio WebSocket closed for call ${callSid}`);
});
});
// HTTP server with WebSocket upgrade
const server = app.listen(PORT, () => {
console.log(`Twilio bridge listening on port ${PORT}`);
});
server.on('upgrade', (request, socket, head) => {
if (request.url === '/media-stream') {
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
});
} else {
socket.destroy();
}
});
Access 40,000+ AI Prompts for ChatGPT, Claude & Codex — Free!
Subscribe to get instant access to our complete Notion Prompt Library — the largest curated collection of prompts for ChatGPT, Claude, OpenAI Codex, and other leading AI models. Optimized for real-world workflows across coding, research, content creation, and business.
Production Deployment Architecture
Deploying voice agents at enterprise scale requires careful attention to connection management, fault tolerance, observability, and cost optimization. A single GPT-5.5 Advanced Voice session consumes approximately 0.06 audio tokens per millisecond of audio (input + output combined), which translates to meaningful infrastructure cost at high call volumes.
Kubernetes Deployment Configuration
# deployment/k8s/voice-agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: voice-agent-service
namespace: ai-services
labels:
app: voice-agent
version: "2.0"
spec:
replicas: 10
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 2
maxUnavailable: 0 # Zero-downtime: critical for active calls
selector:
matchLabels:
app: voice-agent
template:
metadata:
labels:
app: voice-agent
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9090"
spec:
# Ensure pods don't share nodes to avoid noisy neighbor issues
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- voice-agent
topologyKey: kubernetes.io/hostname
containers:
- name: voice-agent
image: your-registry/voice-agent:2.0.0
ports:
- containerPort: 8000
name: http
- container


