Skip to main content

Serving Agents

This page covers how the API/CLI layer exposes your compiled graph over HTTP, how authentication and authorization protect it, how publishers route execution events to external systems, and what a production deployment looks like.


agentflow.json — the project config

agentflow.json is the single file that wires everything together. The CLI and API server read it at startup.

{
"agent": "graph/agent.py:get_compiled_graph",
"auth": "auth/agent_auth.py:MyAuth",
"injectq": "graph/agent.py:container",
"evaluation": {
"evals_dir": "evals/",
"threshold": 0.8
}
}
KeyPurpose
agentmodule:callable that returns a CompiledGraph
authCustom BaseAuth subclass (optional — omit to disable auth)
injectqServices registered in the DI container
evaluationEval directory and pass threshold

Starting the server

agentflow api                                    # starts with auto-reload (development default)
agentflow api --host 0.0.0.0 --port 8000 # bind address
agentflow api --config agentflow.json # explicit config path
agentflow play # API + hosted playground in browser

The server loads the compiled graph once at startup and keeps it in memory. All requests share the same graph instance; per-request isolation comes from thread_id. In development --reload is on by default — any change to your source files restarts the server automatically. In production, run with multiple workers (see Production deployment) and omit --reload.


REST endpoints

RouterPrefixKey endpoints
Graph/v1/graphPOST /invoke, POST /stream, WebSocket /ws, POST /stop, GET /
Checkpointer/v1/threadsThread state CRUD, message CRUD
Store/v1/storeMemory store, search, get, update, delete, list, forget
Media/v1/mediaFile upload / download
A2A/a2aAgent-to-Agent protocol (Coming soon)
Health/pingHealth check

Authentication

Authentication is pluggable via BaseAuth. The framework ships with JwtAuth; you can replace it with any backend.

Built-in: JwtAuth

Point to the built-in class in agentflow.json using its importable path:

{
"auth": "agentflow_cli.src.app.core.auth.jwt_auth:JwtAuth"
}

Then set the required environment variables:

export JWT_SECRET_KEY="your-secret"
export JWT_ALGORITHM="HS256" # default; optional

Custom auth — subclass BaseAuth and point agentflow.json to your class:

# auth/agent_auth.py
from agentflow_cli.src.app.core.auth.base_auth import BaseAuth
from fastapi import Request

class FirebaseAuth(BaseAuth):
async def authenticate(self, request: Request) -> dict | None:
token = request.headers.get("Authorization", "").removeprefix("Bearer ")
try:
return firebase_admin.auth.verify_id_token(token)
except Exception:
return None # returning None → 401
{
"auth": "auth/agent_auth.py:FirebaseAuth"
}

Authorization

Authorization is a separate extension point from authentication. After a user is identified, AuthorizationBackend decides whether they can perform a specific operation on a specific resource.

# auth/agent_auth.py
from agentflow_cli.src.app.core.auth.authorization import AuthorizationBackend

class TenantAuthorizationBackend(AuthorizationBackend):
async def check(self, user: dict, operation: str, resource: str) -> bool:
# operation: "invoke" | "read_thread" | "delete_thread" | "store_memory" | ...
# resource: thread_id, memory_id, etc.
return user["tenant_id"] == extract_tenant(resource)
{
"authorization": "auth/agent_auth.py:TenantAuthorizationBackend"
}

The default DefaultAuthorizationBackend allows all authenticated users. Override it for RBAC, tenant scoping, or fine-grained permission checks.


Rate limiting

Rate limiting is pluggable via BaseRateLimitBackend. Two backends are built in; swap or extend via dependency injection.

BackendWhen to use
In-memorySingle-process development
RedisMulti-worker production — set REDIS_URL
CustomSubclass BaseRateLimitBackend and register via injectq
# services/rate_limit.py
from agentflow_cli.src.app.core.middleware.rate_limit.base import BaseRateLimitBackend

class CustomRateLimitBackend(BaseRateLimitBackend):
async def check(self, key: str, limit: int, window: int) -> bool:
# return True to allow, False to rate-limit (→ 429)
...

async def close(self) -> None:
...
{
"injectq": {
"BaseRateLimitBackend": "services/rate_limit.py:CustomRateLimitBackend"
}
}

Publishers

BasePublisher emits an EventModel on every execution event — node start/end, tool calls, state updates, errors. Wire one or more publishers at StateGraph initialization; they compose automatically.

from agentflow.runtime.publisher import RedisPublisher, KafkaPublisher, CompositePublisher
from agentflow.core.graph import StateGraph

publisher = CompositePublisher([
RedisPublisher(url="redis://localhost:6379", channel="agentflow.events"),
KafkaPublisher(bootstrap_servers="kafka:9092", topic="agentflow"),
])

graph = StateGraph(publisher=publisher)
# ... add nodes and edges ...
compiled = graph.compile()
PublisherTransportUse case
ConsolePublisherstdoutDevelopment / debugging
RedisPublisherRedis pub/subReal-time dashboards, fan-out
KafkaPublisherKafka topicHigh-throughput event pipelines
RabbitMQPublisherRabbitMQ exchangeQueue-based workflows, notifications
OtelPublisherOpenTelemetryDistributed tracing (Jaeger, Honeycomb, Langfuse)

Custom publisher — subclass BasePublisher:

from agentflow.runtime.publisher.base_publisher import BasePublisher
from agentflow.runtime.publisher.events import EventModel

class DatadogPublisher(BasePublisher):
async def publish(self, event: EventModel) -> None:
datadog.send_event(event.dict())

async def close(self) -> None:
pass

Dependency injection

InjectQ is the DI container shipped with 10xscale-agentflow. Register service instances into it once, pass it to StateGraph, and node functions receive their dependencies automatically.

Registering services

# graph/agent.py
from injectq import InjectQ
from services.db import DatabaseService

container = InjectQ.get_instance()
container.bind_instance(DatabaseService, DatabaseService())

# Named scalar values (retrieved by key, not by type)
container["api_version"] = "v2"

Pass the container to StateGraph at init time:

graph = StateGraph(container=container)

Consuming injected dependencies in nodes

Declare dependencies as default parameters using Inject[T]:

from injectq import Inject
from services.db import DatabaseService

async def my_node(
state: AgentState,
config: dict,
db: DatabaseService = Inject[DatabaseService],
) -> Message:
result = await db.query("SELECT ...")
return Message.text_message(str(result), role="assistant")

To read named scalar values inside a node:

from injectq import InjectQ

async def my_node(state: AgentState, config: dict) -> Message:
inq = InjectQ.get_instance()
api_version = inq.get("api_version") # raises if missing
request_id = inq.try_get("request_id", "default-id") # returns default if missing
...

Always-injected parameters — no annotation needed:

Parameter nameValue
stateCurrent AgentState
configRun config dict (thread_id, user_id, etc.)
tool_call_idID of the tool call (inside ToolNode only)

Wiring the container via agentflow.json

When using agentflow api, point injectq to the exported InjectQ instance in your graph module. The server loads that object and activates it as the global singleton.

{
"injectq": "graph/agent.py:container"
}

The value is a module:attribute path that resolves to an InjectQ instance — not a class, not a dict.


Thread name generator

By default the API generates an AI-powered name for each new thread. Override it by subclassing ThreadNameGenerator and registering it via injectq:

# services/naming.py
from agentflow_cli.src.app.utils.thread_name_generator import ThreadNameGenerator

class SlugThreadNameGenerator(ThreadNameGenerator):
async def generate_name(self, messages: list) -> str:
return slugify(messages[0].text[:40])
{
"thread_name_generator": "graph.thread_name_generator:SlugThreadNameGenerator"
}

Production deployment

agentflow build generates a production-ready Dockerfile (and optional docker-compose.yml):

agentflow build                          # Dockerfile only
agentflow build --docker-compose # + docker-compose.yml
agentflow build --python-version 3.13

Key environment variables — set them in a .env file, via export, or as Docker ENV / --env-file:

# .env  (or export VAR=value, or Docker ENV in Dockerfile)
MODE=production # enables production guards (warns on ORIGINS=*, etc.)
REDIS_URL=redis://redis:6379
JWT_SECRET_KEY=your-secret-here
SENTRY_DSN=https://...@sentry.io/123
OTEL_ENABLED=true
OTEL_SERVICE_NAME=my-agent
OTEL_EXPORTER_OTLP_ENDPOINT=http://collector:4317
OTEL_LEVEL=standard
VariableDefaultPurpose
MODEdevelopmentSet to production to enable security guards
REDIS_URLNoneRedis for state cache, rate limiter, pub/sub
JWT_SECRET_KEYNoneRequired for JwtAuth
JWT_ALGORITHMHS256JWT signing algorithm
SENTRY_DSNNoneSentry error tracking
OTEL_ENABLEDfalseEnable OpenTelemetry tracing (see OpenTelemetry)
OTEL_SERVICE_NAMEagentflow-apiService name reported in all traces
OTEL_EXPORTER_OTLP_ENDPOINTNoneOTLP collector URL — omit to print spans to console
OTEL_LEVELstandardSpan detail level: spans | standard | full
ORIGINS*CORS allowed origins — restrict in production

OpenTelemetry

AgentFlow has first-class OpenTelemetry support at two independent layers. You can use either or both.

API layer — automatic when OTEL_ENABLED=true

Setting OTEL_ENABLED=true in your environment is all that's required. The API server automatically:

  • Creates a TracerProvider with your OTEL_SERVICE_NAME
  • Instruments the FastAPI app with FastAPIInstrumentor (HTTP-level spans)
  • Wires OtelPublisher into the graph so every LLM call, tool call, and node transition becomes a child span
  • Exports via OTLP when OTEL_EXPORTER_OTLP_ENDPOINT is set; falls back to console output in non-production
OTEL_ENABLED=true
OTEL_SERVICE_NAME=my-agent
OTEL_EXPORTER_OTLP_ENDPOINT=http://collector:4317 # omit to print spans to console
OTEL_LEVEL=standard # spans | standard | full

No code changes are needed. The SDK does not need to be configured separately — the API configures OtelPublisher automatically and merges it with any existing publisher (such as RedisPublisher) without replacing it.

Graph layer — OtelPublisher and ObservabilityLevel

When running the graph directly (without agentflow api), pass OtelPublisher to StateGraph at init time:

from agentflow.core.graph import StateGraph
from agentflow.runtime.publisher import OtelPublisher
from agentflow.runtime.publisher.otel_publisher import ObservabilityLevel

graph = StateGraph(publisher=OtelPublisher(level=ObservabilityLevel.STANDARD))
# ... add nodes and edges ...
compiled = graph.compile()

ObservabilityLevel controls how much data is emitted as span attributes:

LevelWhat it includes
STANDARDToken counts, model name, request params, finish reason (default)
FULLAll of STANDARD + prompt messages, completions, tool I/O — may contain PII

With an explicit TracerProvider (e.g. to export to Jaeger or Honeycomb):

from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry import trace

from agentflow.core.graph import StateGraph
from agentflow.runtime.publisher import OtelPublisher
from agentflow.runtime.publisher.otel_publisher import ObservabilityLevel

provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint="http://collector:4317")))
trace.set_tracer_provider(provider)

graph = StateGraph(publisher=OtelPublisher(level=ObservabilityLevel.FULL))
# ... add nodes and edges ...
compiled = graph.compile()

Span hierarchy

Every graph run produces a consistent span tree:

agentflow.graph          ← one per ainvoke / astream call
agentflow.node ← one per node execution (e.g. "MAIN", "TOOL")
agentflow.llm ← one per LLM call (tokens, model, finish reason)
agentflow.tool ← one per tool call (name, type: local | mcp)

The agentflow.graph span carries thread_id as session.id so tools like Langfuse automatically group multi-turn conversations.

Install:

pip install "10xscale-agentflow[otel]"           # graph-level spans (OtelPublisher)
pip install "10xscale-agentflow-cli[otel]" # API layer (FastAPIInstrumentor + OTLP exporter)

What's next

PageWhat it covers
Connecting ClientsTypeScript SDK, streaming, remote tools
MemoryPgCheckpointer, Redis cache, long-term vector store
ExtensibilityBaseAuth, AuthorizationBackend, BasePublisher and all other ABCs
Quality & ObservabilityGraphLifecycleHook with OpenTelemetry, evaluation, testing