Skip to main content

StructuredOutputAgent

An agent that guarantees its output matches a Pydantic schema — with automatic validation and self-repair on failure.

Import path: agentflow.prebuilt.agent


Concept

Standard LLMs sometimes produce malformed JSON or output that does not conform to your schema. StructuredOutputAgent adds a validation-and-repair loop: if the output fails validation, it injects a correction message that includes the exact error and the full JSON Schema, then tries again.

Full graph — with tools

Without tools

When no tools are supplied, the TOOL node is omitted and the loop is purely generate → validate → repair:

Routing logic

def _route(state: AgentState) -> str:
last = state.context[-1]

# Tool calls take priority — run them before validating.
if has_tools and last.tools_calls:
return "TOOL"

attempts = state.execution_meta.internal_data.get("soa_attempts", 0)
is_valid, _ = _validate_message(last, adapter)

if is_valid:
return END

if attempts >= max_attempts:
return END # best-effort — return what we have

return "REPAIR"

Two-stage validation

The validator tries two paths in order:

  1. message.parsed_content — populated by the provider SDK when native structured-output mode is used. Validated directly as a Python object.
  2. message.text() — parsed as JSON (markdown code fences stripped first), then validated with pydantic.TypeAdapter.

Two repair modes

ModeWhenWhat happens
Lightweight (default)repair_system_prompt=NoneA small async function injects a user message containing the validation error + full JSON Schema, then increments the attempt counter. No extra LLM call.
LLM repairrepair_system_prompt=[...] setA second Agent instance receives the correction prompt and actively rewrites the output. Uses more tokens but can fix structural problems the lightweight mode cannot.

The repair message (lightweight mode) reads:

Your previous response did not conform to the required output schema.
Validation error:
<pydantic error>

Target JSON Schema:
<full JSON Schema>

Please produce a response that is valid JSON and strictly matches the schema above.
Output only the JSON object — no extra text or code fences.

Constructor Parameters

ParameterTypeDefaultDescription
modelstrrequiredLLM model identifier
providerstrrequiredLLM provider ("openai", "google", "anthropic")
output_schematyperequiredPydantic BaseModel or TypedDict subclass
toolsIterable[Callable]NoneOptional tools for the GENERATE↔TOOL loop
system_promptlist[dict]NoneSystem prompt for the generation agent
max_attemptsint2Max validation+repair cycles before returning best-effort
repair_system_promptlist[dict] | NoneNoneSet to enable a dedicated LLM repair agent
reasoning_configdict | boolTrueApplied to the generation (and repair) agent
memoryMemoryConfigNoneLong-term memory
retry_configAnyTrueRetry on LLM errors
fallback_modelslistNoneBackup models
trim_contextboolFalseTrim old messages when context grows long

compile() Parameters

ParameterTypeDefaultDescription
checkpointerBaseCheckpointerNonePersist and restore conversation state
storeBaseStoreNoneLong-term cross-thread storage
interrupt_beforelist[str]NonePause before the named nodes
interrupt_afterlist[str]NonePause after the named nodes
callback_managerCallbackManagerdefaultLifecycle hooks
media_storeBaseMediaStoreNoneBinary/media file storage
shutdown_timeoutfloat30.0Seconds to wait for clean shutdown

Full Code

Pydantic model output

import asyncio
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from agentflow.prebuilt.agent import StructuredOutputAgent
from agentflow.core.state import Message

load_dotenv()


class ProductAnalysis(BaseModel):
product_name: str
sentiment: str = Field(description="positive, negative, or neutral")
score: float = Field(ge=0.0, le=10.0)
key_points: list[str]


agent = StructuredOutputAgent(
model="gpt-4o-mini",
provider="openai",
output_schema=ProductAnalysis,
system_prompt=[{
"role": "system",
"content": "Analyze the given product review and return a structured analysis.",
}],
max_attempts=3,
)

app = agent.compile()


async def main():
result = await app.ainvoke(
{"messages": [Message.text_message(
"Review: 'Amazing build quality but the battery life is terrible.'"
)]},
config={"thread_id": "struct-1"},
)
print(result["context"][-1].text())
# {"product_name": "...", "sentiment": "neutral", "score": 6.5, "key_points": [...]}


asyncio.run(main())

TypedDict output

from typing import TypedDict
from agentflow.prebuilt.agent import StructuredOutputAgent

class WeatherReport(TypedDict):
city: str
temperature_celsius: float
conditions: str

agent = StructuredOutputAgent(
model="gpt-4o-mini",
provider="openai",
output_schema=WeatherReport,
system_prompt=[{"role": "system", "content": "Extract weather data as JSON."}],
)
app = agent.compile()

With tools and structured output

Tools run inside the GENERATE↔TOOL loop before validation is attempted. The final response must still match the schema:

from agentflow.prebuilt.agent import StructuredOutputAgent
from agentflow.prebuilt.tools import google_web_search
from pydantic import BaseModel

class ProductAnalysis(BaseModel):
product_name: str
sentiment: str
score: float
key_points: list[str]

agent = StructuredOutputAgent(
model="gpt-4o-mini",
provider="openai",
output_schema=ProductAnalysis,
tools=[google_web_search],
system_prompt=[{
"role": "system",
"content": (
"Search for reviews of the given product, then return a structured analysis. "
"Output must be valid JSON matching the required schema."
),
}],
max_attempts=2,
)
app = agent.compile()

With a dedicated LLM repair agent

Use this when the lightweight repair prompt is not enough — for example when the schema is complex or the model frequently produces structurally broken JSON:

from agentflow.prebuilt.agent import StructuredOutputAgent
from pydantic import BaseModel

class ProductAnalysis(BaseModel):
product_name: str
sentiment: str
score: float
key_points: list[str]

agent = StructuredOutputAgent(
model="gpt-4o",
provider="openai",
output_schema=ProductAnalysis,
max_attempts=2,
repair_system_prompt=[{
"role": "system",
"content": (
"You are a JSON repair expert. Fix the JSON to match the schema exactly. "
"Output only valid JSON — no explanation, no code fences."
),
}],
)

Google Gemini

from agentflow.prebuilt.agent import StructuredOutputAgent
from pydantic import BaseModel

class Summary(BaseModel):
title: str
body: str
tags: list[str]

agent = StructuredOutputAgent(
model="google/gemini-2.5-flash",
provider="google",
output_schema=Summary,
system_prompt=[{
"role": "system",
"content": "Summarize the given text into a structured JSON object.",
}],
max_attempts=3,
trim_context=True,
)
app = agent.compile()

Streaming

import asyncio
from agentflow.prebuilt.agent import StructuredOutputAgent
from pydantic import BaseModel
from agentflow.core.state import Message

class MovieReview(BaseModel):
title: str
rating: float
summary: str

agent = StructuredOutputAgent(
model="gpt-4o-mini",
provider="openai",
output_schema=MovieReview,
)
app = agent.compile()


async def main():
async for event in app.astream(
{"messages": [Message.text_message("Review the film Inception.")]},
config={"thread_id": "stream-struct-1"},
):
print(event)


asyncio.run(main())

Running with agentflow play

graph.py

from pydantic import BaseModel
from agentflow.prebuilt.agent import StructuredOutputAgent


class SummaryOutput(BaseModel):
title: str
summary: str
tags: list[str]


agent = StructuredOutputAgent(
model="gpt-4o-mini",
provider="openai",
output_schema=SummaryOutput,
system_prompt=[{
"role": "system",
"content": "Summarize the given text. Return a JSON object with title, summary, and tags.",
}],
max_attempts=3,
)

app = agent.compile()

agentflow.json

{
"agent": "graph:app",
"env": ".env",
"auth": null,
"checkpointer": null,
"injectq": null,
"store": null,
"redis": null,
"thread_name_generator": null
}

.env

OPENAI_API_KEY=sk-...
agentflow play