Skip to main content

Production AI Agents: Observability, Retries, and Graceful Shutdown

· 6 min read
AgentFlow Team
Building production AI agents in Python

The "hello world" agent works on day one. The "we have paying customers and a pager" agent works on day 200. The gap is operational: observability, retries, idempotency, graceful shutdown.

These are the patterns we see in production Python agent codebases, and the failure modes they prevent.

Observability: what to log

The minimum you should record per agent run:

FieldWhy
thread_idFind related events
user_id / tenant_idMulti-tenant analysis
model and providerCost attribution
prompt_tokens, completion_tokens, tool_callsCost and performance
node (graph node)Debug routing
duration_ms per nodeFind slow nodes
outcome (success / error / interrupt)SLOs
error_class and error_messageTriage

Emit one log line per node boundary, plus one summary line per request. Anything more granular goes to traces, not logs.

Correlation

Use thread_id as your trace correlation ID. Every downstream call (tool HTTP request, vector query, database write) should carry it. When a user reports "the agent went weird at 3pm," you grep thread_id and reconstruct the entire run.

Structured logging

import structlog

logger = structlog.get_logger()

def log_node(state, node):
logger.info("agent.node",
thread_id=state.config.get("thread_id"),
node=node,
message_count=len(state.context),
last_role=state.context[-1].role if state.context else None,
)

JSON logs go to your platform of choice (Loki, Datadog, OpenSearch). Avoid pretty-printed logs in production. They cost CPU and lose searchability.

Tracing with OpenTelemetry

For deeper insight (per-tool latency, model timing, span trees):

from opentelemetry import trace

tracer = trace.get_tracer("agentflow.app")

def get_weather(location: str) -> str:
with tracer.start_as_current_span("tool.get_weather") as span:
span.set_attribute("location", location)
result = weather_client.get(location)
span.set_attribute("weather.summary", result.summary)
return result.text()

Pipe to Tempo, Jaeger, Honeycomb, or whatever you have. The agent runtime is one boundary; the tools are the other. Trace both.

Cost tracking

Tokens are money. Add a per-run total:

async for chunk in app.astream(...):
if chunk.event == StreamEvent.MESSAGE and chunk.message.usage:
total_prompt_tokens += chunk.message.usage.prompt_tokens
total_completion_tokens += chunk.message.usage.completion_tokens

logger.info("agent.cost",
thread_id=tid,
prompt_tokens=total_prompt_tokens,
completion_tokens=total_completion_tokens,
model=model,
)

A daily SQL query against this gives you cost per user, per feature, per model. Invaluable when finance asks why the bill jumped.

Retries: where to put them

LLM calls and tool calls fail. There are three failure layers, each with different retry rules:

Layer 1: provider transient errors

Rate limits (429), gateway timeouts (502/504), connection resets. Always retry with exponential backoff:

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from openai import RateLimitError, APIConnectionError

@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=30),
retry=retry_if_exception_type((RateLimitError, APIConnectionError)),
)
def call_llm(...):
...

Most provider SDKs (OpenAI, Anthropic, Google) have built-in retry; configure it once.

Layer 2: tool failures

Your tool calls a real API. The API returns 500. What do you do?

The wrong answer: retry inside the tool silently and let the agent think the call succeeded.

The right answer: surface the error to the agent as a tool result, and let the agent decide:

def get_weather(location: str) -> str:
"""Get current weather for a location."""
try:
return weather_client.get(location).text()
except Exception as e:
return f"Error fetching weather: {e}. Try a different city or fall back to a general estimate."

The agent, seeing the error, can retry the tool, ask the user, or move on. This is far more robust than swallowing errors.

Layer 3: graph-level retries

For a critical action (a payment, an email send), wrap the node in an idempotency-aware retry:

def charge_node(state):
idempotency_key = f"{state.config['thread_id']}-{state.config.get('attempt', 0)}"
return payment_client.charge(amount=state.amount, key=idempotency_key)

Every external write needs an idempotency key. Without it, retries become double-billing.

Idempotency

Two rules:

  1. Reads are free to retry. Search, look-up, fetch. Retry as much as you want.
  2. Writes need an idempotency key. Charges, emails, ticket creation, file uploads.

The key is usually thread_id + node_name + attempt_count. Provide it to the upstream API; if the API does not support keys, use a small dedup table:

CREATE TABLE agent_idempotency (
key VARCHAR PRIMARY KEY,
result JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);

INSERT ... ON CONFLICT DO NOTHING RETURNING result. If the key existed, return the previous result without re-doing the action.

Graceful shutdown

When your container gets a SIGTERM (deploy, autoscale-down, spot reclaim), you have ~30 seconds to clean up. For an agent server, that means:

  1. Stop accepting new requests. Most ASGI servers do this automatically.
  2. Let in-flight requests finish. Or at least checkpoint their state.
  3. Close the database pool.
  4. Flush logs and traces.

AgentFlow's CLI handles 1, 3, and 4. For 2. Letting in-flight runs finish. The checkpointer is your friend: even if the request is killed, the graph state is durable, and a new replica can resume the thread.

The pattern:

# In your FastAPI / agent entrypoint
import signal

shutdown = asyncio.Event()

def _on_term(*args):
shutdown.set()

signal.signal(signal.SIGTERM, _on_term)
signal.signal(signal.SIGINT, _on_term)

# In long-running loops, periodically check shutdown.is_set()

For a runnable example, see the graceful shutdown tutorial.

SLOs that actually mean something

For an agent service, the useful SLOs are:

  • TTFB (time-to-first-byte) p95 < 1.5 s. First token latency the user feels
  • Stream-success rate ≥ 99.5%. Streams that complete without error
  • Tool-error rate < 2%. Tool failures bubbling up to user
  • Cost per request p95 < $X. Pick a number for your model mix

Latency to completion is less useful for streaming agents. Users care about first token, not total time.

Failure modes you will hit

  1. Recursion runaway. Always set recursion_limit (10–25 for most workflows).
  2. Token blowout. A long thread + verbose tool output can blow past the model's context. Trim aggressively.
  3. Provider outage. Have a fallback model; provider abstractions make this a config change.
  4. Vector store unavailable. If long-term memory is down, degrade gracefully. Note in the logs and continue without it.
  5. Database lock contention. PgCheckpointer writes per node; under load, you need a connection pool sized appropriately.

A quick checklist before you go live

  • Structured logs with thread_id correlation
  • OpenTelemetry traces for tool calls and LLM calls
  • Cost (token) tracking per run
  • Provider-level retries configured
  • Tool errors surfaced as results, not exceptions
  • Idempotency keys on every external write
  • recursion_limit set on every invoke
  • Fallback model for provider outages
  • Graceful shutdown with checkpointer-backed resume
  • SLOs defined and dashboards in place

This is the boring stuff that keeps the pager quiet.

Further reading

If you are starting from a prototype and shipping to prod, Get started. The runtime gives you most of this checklist for free.