Skip to main content

AudioAgent

A prebuilt realtime audio-to-audio agent backed by Gemini Live — mirrors ReactAgent's construction surface while wrapping a LiveAgent as the graph root.

Import path: agentflow.prebuilt.agent


Concept

AudioAgent opens a persistent WebSocket to Gemini Live and runs a duplex audio session. Unlike ReactAgent, the provider owns the turn loop. Agentflow wraps it in a graph node (LiveAgent) and exposes a separate execution path — arealtime() — that drives the session.

Session graph

The graph is a single node. The LIVE -> END edge exists only to satisfy graph validation; it is never traversed during a realtime session because LiveAgent holds the WebSocket for the full session lifetime.

Tool loop

Tools are registered through a ToolNode at build time and advertised to the Gemini Live session at connect time. When the model requests a tool call, LiveAgent dispatches it through the same ToolNode path used by ReactAgent (parallel execution, callbacks, publisher events), then feeds the result back over the socket without interrupting the audio stream. Barge-in works normally — an interrupted event discards any in-flight tool result accumulation.

System prompt and skills

system_prompt, skills, and memory are all supported. At connect time, the agent flattens them — including any {field} placeholder interpolation from AgentState — into a single system_instruction string sent to Gemini Live. This snapshot is fixed for the session; the model cannot receive a new system prompt mid-session. Dynamic behavior after connect goes through set_skill or memory tools.

Execution path

A graph containing a LiveAgent must be driven with arealtime() (async generator) or realtime() (sync wrapper). Calling invoke, ainvoke, stream, or astream raises a RuntimeError.

Transcripts and storage

Audio is never stored at rest. Finished speech turns are persisted as Message objects with metadata={"modality": "audio"} — both user (input transcription) and model (output transcription).

Provider requirement

LiveAgent v1 resolves the provider from the model string and requires "google". Passing any other provider string raises a ValueError at construction time.


Installation

pip install "10xscale-agentflow[realtime]"

Set your credentials:

export GEMINI_API_KEY=your-api-key

For Vertex AI, set GOOGLE_GENAI_USE_VERTEXAI=1 with standard ADC credentials.


Constructor Parameters

ParameterTypeDefaultDescription
modelstrrequiredGemini Live model identifier (e.g. "gemini-live-2.5-flash-preview")
realtime_configRealtimeConfig | NoneNoneVoice, modalities, VAD, reconnect policy, and other session settings. If None, a RealtimeConfig(model=model) is created automatically.
system_promptlist[dict] | NoneNoneSystem-role messages. Flattened into system_instruction at connect time; supports {field} placeholders interpolated from state.
toolsIterable[Callable] | NoneNoneTool functions exposed to the model. Executed by a ToolNode during the session.
clientAnyNoneFastMCP client for MCP-hosted tools.
pass_user_info_to_mcpboolFalseForward user_id / config to MCP tool calls.
skillsSkillConfig | NoneNoneDynamic skill injection; flattened into system_instruction at connect.
memoryMemoryConfig | NoneNoneLong-term semantic memory; preloaded into system_instruction at connect.
realtime_client_factoryCallable[[], RealtimeClient] | NoneNoneOverride the default GeminiLiveClient factory. Useful for testing or custom transport.
live_node_namestr"LIVE"Graph node name for the LiveAgent step.
stateAgentState | NoneNoneCustom AgentState subclass.
context_managerBaseContextManager | NoneNoneCustom context manager (e.g. for context trimming across reseed).
publisherBasePublisher | list[BasePublisher] | NoneNoneEvent publisher(s) for observability.
id_generatorBaseIDGeneratorDefaultIDGenerator()Snowflake / custom ID generator.
containerAny | NoneNoneInjectQ DI container.

compile() Parameters

ParameterTypeDefaultDescription
checkpointerBaseCheckpointerNonePersist and restore transcripts and the session resumption handle across sessions.
storeBaseStoreNoneLong-term cross-thread storage.
callback_managerCallbackManagerdefaultLifecycle hooks (on_graph_start, on_graph_end, on_turn_start, on_turn_end).
shutdown_timeoutfloat30.0Seconds to wait for clean shutdown.

compile() does not accept media_store, interrupt_before, or interrupt_after. Realtime media (images, video frames) is sent directly to the model via LiveInputQueue.send_image() — it is never routed through a media store. Interrupt hooks are not applicable to the realtime execution path.


Full Code

Minimal example

import asyncio
from agentflow.core.realtime.base import RealtimeConfig
from agentflow.core.realtime.queue import LiveInputQueue
from agentflow.prebuilt.agent import AudioAgent

MODEL = "gemini-live-2.5-flash-preview"

app = AudioAgent(
MODEL,
realtime_config=RealtimeConfig(model=MODEL, voice="Puck"),
system_prompt=[{"role": "system", "content": "You are a concise voice assistant."}],
).compile()


async def main():
queue = LiveInputQueue()
queue.send_text("Hello, what can you do?")

async for event in app.arealtime(queue, {"thread_id": "demo-1"}):
if event.type == "output_transcript" and event.finished:
print(f"agent: {event.text}")
elif event.type == "turn_complete":
queue.close()

await app.aclose()


asyncio.run(main())

With tools

import asyncio
from agentflow.core.realtime.base import RealtimeConfig
from agentflow.core.realtime.queue import LiveInputQueue
from agentflow.prebuilt.agent import AudioAgent

MODEL = "gemini-live-2.5-flash-preview"


def get_weather(city: str) -> str:
"""Return the current weather for a city."""
return f"Sunny, 24°C in {city}"


app = AudioAgent(
MODEL,
realtime_config=RealtimeConfig(model=MODEL, voice="Aoede"),
system_prompt=[{
"role": "system",
"content": "You are a helpful voice assistant. Use tools when they help you answer.",
}],
tools=[get_weather],
).compile()


async def main():
queue = LiveInputQueue()
queue.send_text("What's the weather in Tokyo?")

async for event in app.arealtime(queue, {"thread_id": "tools-1"}):
if event.type == "tool_call":
print(f"calling tool: {event.name}({event.args})")
elif event.type == "tool_result":
print(f"tool result: {event.result}")
elif event.type == "output_transcript" and event.finished:
print(f"agent: {event.text}")
elif event.type == "turn_complete":
queue.close()

await app.aclose()


asyncio.run(main())

With a checkpointer (persistent sessions)

A checkpointer stores both the transcript Message history and the session resumption handle. On reconnect, Agentflow either resumes via the provider handle (no reseed overhead) or falls back to replaying the full transcript into the new session.

import asyncio
from agentflow.core.realtime.base import RealtimeConfig
from agentflow.core.realtime.queue import LiveInputQueue
from agentflow.prebuilt.agent import AudioAgent
from agentflow.storage.checkpointer import InMemoryCheckpointer

MODEL = "gemini-live-2.5-flash-preview"

checkpointer = InMemoryCheckpointer()

app = AudioAgent(
MODEL,
realtime_config=RealtimeConfig(model=MODEL, voice="Puck"),
).compile(checkpointer=checkpointer)


async def main():
queue = LiveInputQueue()
queue.send_text("Remember that my name is Alex.")

async for event in app.arealtime(queue, {"thread_id": "persist-1"}):
if event.type == "output_transcript" and event.finished:
print(f"agent: {event.text}")
elif event.type == "turn_complete":
queue.close()

await app.aclose()


asyncio.run(main())

Handling barge-in

When the user speaks over the model, the provider emits an interrupted event. Discard any audio playback buffer on your side and resume listening.

async for event in app.arealtime(queue, {"thread_id": "barge-1"}):
if event.type == "audio_delta":
playback_buffer.extend(event.data)
elif event.type == "interrupted":
playback_buffer.clear() # flush in-flight model audio
elif event.type == "turn_complete":
pass # flush and play remaining buffer

VAD and push-to-talk

Voice activity detection (VAD) is enabled by default. To switch to push-to-talk (manual activity), disable VAD and signal boundaries explicitly:

from agentflow.core.realtime.base import RealtimeConfig, VADConfig
from agentflow.core.realtime.queue import LiveInputQueue

config = RealtimeConfig(
model="gemini-live-2.5-flash-preview",
vad=VADConfig(enabled=False),
)
app = AudioAgent("gemini-live-2.5-flash-preview", realtime_config=config).compile()

queue = LiveInputQueue()
queue.send_activity_start()
queue.send_audio(pcm_bytes, sample_rate=16000)
queue.send_activity_end()

RealtimeConfig key fields

FieldTypeDefaultDescription
modelstrrequiredGemini Live model name.
voicestr | NoneNoneVoice name (e.g. "Puck", "Aoede", "Charon").
response_modalitieslist[str]["AUDIO"]Must contain exactly one entry ("AUDIO" or "TEXT").
system_instructionstr | NoneNoneDirect string instruction; overridden if system_prompt / skills / memory are set on the agent.
input_audio_transcriptionboolTrueEmit input_transcript events for user speech.
output_audio_transcriptionboolTrueEmit output_transcript events for model speech.
vadVADConfigenabledVoice-activity-detection settings.
session_resumptionboolTrueEnable provider-level session resumption on reconnect.
context_window_compressionboolFalseAsk the provider to compress its context window.
reconnectReconnectConfigsee belowReconnect backoff policy for error-driven drops.
tools_tagslist[str] | NoneNoneFilter which ToolNode tools are advertised by tag.

ReconnectConfig defaults: base_delay=0.5, max_delay=10.0, max_attempts=5. Provider-initiated go_away rotations always reconnect immediately (no backoff).


RealtimeEvent types

Event typeWhen emitted
audio_deltaA chunk of PCM16 model audio output (24 kHz).
input_transcriptStreamed text of the user's speech. finished=True carries the complete turn.
output_transcriptStreamed text of the model's speech. finished=True carries the complete turn.
tool_callModel requested a tool invocation. LiveAgent handles dispatch automatically; emit is for observability.
tool_resultTool finished; result has been sent back to the model.
turn_completeModel finished generating a turn.
interruptedBarge-in detected; flush audio playback.
session_updateProvider issued or refreshed a resumption handle.
go_awayProvider will close the socket; reconnect is triggered automatically.
errorNormalized provider error. fatal=True means the session ended.

Further reading

  • How to build a realtime audio agent — end-to-end guide covering WAV file I/O, microphone streaming, image input, and the API WebSocket bridge.
  • Realtime reference — full API surface for RealtimeConfig, VADConfig, ReconnectConfig, LiveInputQueue, RealtimeClient, and all event types.