Production AI Agents: Observability, Retries, and Graceful Shutdown
The "hello world" agent works on day one. The "we have paying customers and a pager" agent works on day 200. The gap is operational: observability, retries, idempotency, graceful shutdown.
These are the patterns we see in production Python agent codebases, and the failure modes they prevent.
Observability: what to log
The minimum you should record per agent run:
| Field | Why |
|---|---|
thread_id | Find related events |
user_id / tenant_id | Multi-tenant analysis |
model and provider | Cost attribution |
prompt_tokens, completion_tokens, tool_calls | Cost and performance |
node (graph node) | Debug routing |
duration_ms per node | Find slow nodes |
outcome (success / error / interrupt) | SLOs |
error_class and error_message | Triage |
Emit one log line per node boundary, plus one summary line per request. Anything more granular goes to traces, not logs.
Correlation
Use thread_id as your trace correlation ID. Every downstream call (tool HTTP request, vector query, database write) should carry it. When a user reports "the agent went weird at 3pm," you grep thread_id and reconstruct the entire run.
Structured logging
import structlog
logger = structlog.get_logger()
def log_node(state, node):
logger.info("agent.node",
thread_id=state.config.get("thread_id"),
node=node,
message_count=len(state.context),
last_role=state.context[-1].role if state.context else None,
)
JSON logs go to your platform of choice (Loki, Datadog, OpenSearch). Avoid pretty-printed logs in production. They cost CPU and lose searchability.
Tracing with OpenTelemetry
For deeper insight (per-tool latency, model timing, span trees):
from opentelemetry import trace
tracer = trace.get_tracer("agentflow.app")
def get_weather(location: str) -> str:
with tracer.start_as_current_span("tool.get_weather") as span:
span.set_attribute("location", location)
result = weather_client.get(location)
span.set_attribute("weather.summary", result.summary)
return result.text()
Pipe to Tempo, Jaeger, Honeycomb, or whatever you have. The agent runtime is one boundary; the tools are the other. Trace both.
Cost tracking
Tokens are money. Add a per-run total:
async for chunk in app.astream(...):
if chunk.event == StreamEvent.MESSAGE and chunk.message.usage:
total_prompt_tokens += chunk.message.usage.prompt_tokens
total_completion_tokens += chunk.message.usage.completion_tokens
logger.info("agent.cost",
thread_id=tid,
prompt_tokens=total_prompt_tokens,
completion_tokens=total_completion_tokens,
model=model,
)
A daily SQL query against this gives you cost per user, per feature, per model. Invaluable when finance asks why the bill jumped.
Retries: where to put them
LLM calls and tool calls fail. There are three failure layers, each with different retry rules:
Layer 1: provider transient errors
Rate limits (429), gateway timeouts (502/504), connection resets. Always retry with exponential backoff:
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from openai import RateLimitError, APIConnectionError
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=30),
retry=retry_if_exception_type((RateLimitError, APIConnectionError)),
)
def call_llm(...):
...
Most provider SDKs (OpenAI, Anthropic, Google) have built-in retry; configure it once.
Layer 2: tool failures
Your tool calls a real API. The API returns 500. What do you do?
The wrong answer: retry inside the tool silently and let the agent think the call succeeded.
The right answer: surface the error to the agent as a tool result, and let the agent decide:
def get_weather(location: str) -> str:
"""Get current weather for a location."""
try:
return weather_client.get(location).text()
except Exception as e:
return f"Error fetching weather: {e}. Try a different city or fall back to a general estimate."
The agent, seeing the error, can retry the tool, ask the user, or move on. This is far more robust than swallowing errors.
Layer 3: graph-level retries
For a critical action (a payment, an email send), wrap the node in an idempotency-aware retry:
def charge_node(state):
idempotency_key = f"{state.config['thread_id']}-{state.config.get('attempt', 0)}"
return payment_client.charge(amount=state.amount, key=idempotency_key)
Every external write needs an idempotency key. Without it, retries become double-billing.
Idempotency
Two rules:
- Reads are free to retry. Search, look-up, fetch. Retry as much as you want.
- Writes need an idempotency key. Charges, emails, ticket creation, file uploads.
The key is usually thread_id + node_name + attempt_count. Provide it to the upstream API; if the API does not support keys, use a small dedup table:
CREATE TABLE agent_idempotency (
key VARCHAR PRIMARY KEY,
result JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
INSERT ... ON CONFLICT DO NOTHING RETURNING result. If the key existed, return the previous result without re-doing the action.
Graceful shutdown
When your container gets a SIGTERM (deploy, autoscale-down, spot reclaim), you have ~30 seconds to clean up. For an agent server, that means:
- Stop accepting new requests. Most ASGI servers do this automatically.
- Let in-flight requests finish. Or at least checkpoint their state.
- Close the database pool.
- Flush logs and traces.
AgentFlow's CLI handles 1, 3, and 4. For 2. Letting in-flight runs finish. The checkpointer is your friend: even if the request is killed, the graph state is durable, and a new replica can resume the thread.
The pattern:
# In your FastAPI / agent entrypoint
import signal
shutdown = asyncio.Event()
def _on_term(*args):
shutdown.set()
signal.signal(signal.SIGTERM, _on_term)
signal.signal(signal.SIGINT, _on_term)
# In long-running loops, periodically check shutdown.is_set()
For a runnable example, see the graceful shutdown tutorial.
SLOs that actually mean something
For an agent service, the useful SLOs are:
- TTFB (time-to-first-byte) p95 < 1.5 s. First token latency the user feels
- Stream-success rate ≥ 99.5%. Streams that complete without error
- Tool-error rate < 2%. Tool failures bubbling up to user
- Cost per request p95 < $X. Pick a number for your model mix
Latency to completion is less useful for streaming agents. Users care about first token, not total time.
Failure modes you will hit
- Recursion runaway. Always set
recursion_limit(10–25 for most workflows). - Token blowout. A long thread + verbose tool output can blow past the model's context. Trim aggressively.
- Provider outage. Have a fallback model; provider abstractions make this a config change.
- Vector store unavailable. If long-term memory is down, degrade gracefully. Note in the logs and continue without it.
- Database lock contention.
PgCheckpointerwrites per node; under load, you need a connection pool sized appropriately.
A quick checklist before you go live
- Structured logs with
thread_idcorrelation - OpenTelemetry traces for tool calls and LLM calls
- Cost (token) tracking per run
- Provider-level retries configured
- Tool errors surfaced as results, not exceptions
- Idempotency keys on every external write
-
recursion_limitset on every invoke - Fallback model for provider outages
- Graceful shutdown with checkpointer-backed resume
- SLOs defined and dashboards in place
This is the boring stuff that keeps the pager quiet.
Further reading
- Production runtime concept
- Auth and authorization
- Production checkpointing
- Deployment guide
- Graceful shutdown example
If you are starting from a prototype and shipping to prod, Get started. The runtime gives you most of this checklist for free.