Skip to main content

Agents, Tools & Control

This page covers how agents and tools interact in the ReAct loop, how to write tools, which prebuilt agents are available, and the three surfaces for controlling and observing execution: Command, CallbackManager, and GraphLifecycleHook.


The ReAct loop

An Agent and a ToolNode alternate: the LLM decides what to call, the tool node runs it, the result flows back to the LLM. This continues until the LLM produces a final answer.


Writing tools

Any Python function decorated with @tool becomes a tool. The docstring becomes the description sent to the LLM. The decorator can be used with or without arguments:

from agentflow.utils import tool

# Without arguments — name and description inferred from function
@tool
def get_weather(city: str) -> str:
"""Return current weather for the given city."""
return f"It is 22°C and sunny in {city}."

# With arguments — explicit name, description, and tags
@tool(name="web_search", description="Search the web for a query.", tags=["search"])
def search(query: str) -> str:
...

Pass tools to ToolNode:

from agentflow.core.graph import ToolNode

tool_node = ToolNode([get_weather, search])

Or attach them inline to an Agent:

from agentflow.core.graph import Agent, ToolNode

agent = Agent(
model="gpt-4o",
tool_node=ToolNode([get_weather]),
)

Reference a named graph node instead of an inline ToolNode — resolved at compile time:

agent = Agent(model="gpt-4o", tool_node="TOOL")

Prebuilt agents

For common patterns, use a prebuilt agent instead of wiring the graph manually.

ClassPattern
ReactAgentSingle agent + tool loop; auto-wires MAIN + TOOL nodes and routing
RAGAgentReAct + retrieval-augmented generation
PlanActReflectAgentPlan → Act → Reflect loop
StructuredOutputAgentGuaranteed Pydantic-model output
SupervisorTeamAgentSupervisor routes to named worker agents
SwarmAgentPeer-to-peer handoff via transfer_to_<name> tools

ReactAgent takes model and tools directly — no separate Agent or ToolNode construction needed:

from agentflow.prebuilt.agent import ReactAgent

compiled = ReactAgent(
model="gpt-4o",
tools=[get_weather, search],
system_prompt=[{"role": "system", "content": "You are a helpful assistant."}],
).compile()

All prebuilt agents accept checkpointer, store, interrupt_before, and interrupt_after in .compile().


Prebuilt tools

Import pathCallable nameWhat it does
agentflow.prebuilt.toolsgoogle_web_searchGoogle web search
agentflow.prebuilt.toolsvertex_ai_searchVertex AI search
agentflow.prebuilt.toolsfetch_urlFetch and extract text from a URL
agentflow.prebuilt.toolsfile_readRead a file from the local filesystem
agentflow.prebuilt.toolsfile_writeWrite a file to the local filesystem
agentflow.prebuilt.toolsfile_searchSearch files by glob pattern
agentflow.prebuilt.toolssafe_calculatorSafe arithmetic expression evaluator
agentflow.prebuilt.toolsmemory_toolAgent long-term memory (store / search / delete)
agentflow.prebuilt.toolscreate_handoff_toolFactory for swarm handoff tools
from agentflow.prebuilt.tools import fetch_url, safe_calculator, file_read

tool_node = ToolNode([fetch_url, safe_calculator, file_read])

Control: Command

Return a Command from a node to jump to a specific node and optionally update state in one step. Use this when the routing target is determined by data computed inside the node — not by a separate route function.

from agentflow.utils import Command, END
from agentflow.core.state import AgentState

class MyState(AgentState):
score: float = 0.0
attempts: int = 0

async def review_node(state: MyState) -> Command:
if state.score < 0.5:
# goto a node name, optionally carry a state update
return Command(goto="RETRY", update={"attempts": state.attempts + 1})
return Command(goto=END)

Command vs conditional edges:

Commandadd_conditional_edges
Routing logic locationInside the nodeSeparate route function
Can update state while routingYesNo
Best forData-driven jumps, retry loopsStructural branching

Observability: CallbackManager

CallbackManager fires on every individual LLM call, tool call, MCP call, or skill call. Register callbacks per InvocationType using register_before_invoke, register_after_invoke, and register_on_error.

InvocationType values:

ValueFires
InvocationType.AIBefore/after each LLM call
InvocationType.TOOLBefore/after each local tool function
InvocationType.MCPBefore/after each MCP server call
InvocationType.INPUT_VALIDATIONBefore messages are sent to the LLM
InvocationType.SKILLBefore/after each skill invocation

Each callback receives a CallbackContext (invocation_type, node_name, function_name, metadata) as its first argument:

from agentflow.utils.callbacks import (
CallbackManager,
CallbackContext,
InvocationType,
)

cb = CallbackManager()

# before_invoke(context, input_data) → return input_data (optionally modified)
async def log_before(context: CallbackContext, input_data):
print(f"→ {context.invocation_type} on node '{context.node_name}'")
return input_data

# after_invoke(context, input_data, output_data) → return output_data (optionally modified)
async def log_after(context: CallbackContext, input_data, output_data):
print(f"← {context.invocation_type}")
return output_data

# on_error(context, input_data, error) → return a recovery value or None to re-raise
async def handle_error(context: CallbackContext, input_data, error: Exception):
print(f"✗ {context.invocation_type}: {error}")
return None

cb.register_before_invoke(InvocationType.AI, log_before)
cb.register_after_invoke(InvocationType.AI, log_after)
cb.register_on_error(InvocationType.TOOL, handle_error)

compiled = graph.compile(callback_manager=cb)

Register input validators on CallbackManager to screen messages before they reach the LLM:

from agentflow.utils.validators import PromptInjectionValidator

cb.register_input_validator(PromptInjectionValidator())

GraphLifecycleHook can also be registered via CallbackManager rather than passed directly to compile():

cb.register_lifecycle_hook(MyHook())

Observability: GraphLifecycleHook

GraphLifecycleHook fires on graph-level structural events — once per run start/end, on each node transition, checkpoint, interrupt, or error. All methods are optional; only override what you need.

Each method receives a GraphLifecycleContext as its first argument, which carries .thread_id and .run_id.

from agentflow.utils.callbacks import GraphLifecycleHook, GraphLifecycleContext

class MyHook(GraphLifecycleHook):

async def on_graph_start(self, ctx: GraphLifecycleContext, state):
# Called after state is loaded, before execution loop starts.
# Return modified state to replace the initial state, or None to keep it.
self.span = tracer.start_span(ctx.run_id)
return None

async def on_graph_end(self, ctx: GraphLifecycleContext, final_state, messages, total_steps):
# Called after successful completion, before final state sync.
# Return modified state to persist, or None to keep current state.
self.span.finish()
return None

async def on_graph_error(self, ctx: GraphLifecycleContext, error, partial_state, messages, step, node_name):
# Called when an unhandled error escapes the execution loop.
# Return (modified_state, error_message) or None. Exception is always re-raised.
self.span.set_error(error)
return None

async def on_checkpoint(self, ctx: GraphLifecycleContext, state, messages, is_context_trimmed):
# Called before every durable checkpoint write.
# Return (state, messages), state only, or None.
return None

async def on_state_update(self, ctx: GraphLifecycleContext, node_name, old_state, new_state, step):
# Called after each node result is merged into state.
# Return modified state to replace new_state, or None.
metrics.record(node_name)
return None

async def on_interrupt(self, ctx: GraphLifecycleContext, interrupted_node, interrupt_type, state):
# Called when execution pauses at an interrupt node.
# Return modified state to persist at interrupt, or None.
notify_human(ctx.thread_id)
return None

async def on_resume(self, ctx: GraphLifecycleContext, resumed_node, state, resume_data):
# Called when a previously interrupted execution is about to resume.
# Return modified state to continue with, or None.
return None

cb = CallbackManager()
cb.register_lifecycle_hook(MyHook())
compiled = graph.compile(callback_manager=cb)

Validators

Validators run before each LLM call to screen messages. Two built-ins ship with the framework:

ClassWhat it checks
PromptInjectionValidatorDetects common injection and jailbreak patterns in user messages
MessageContentValidatorEnforces allowed roles and content block schemas
from agentflow.utils.validators import PromptInjectionValidator, MessageContentValidator

cb.register_input_validator(PromptInjectionValidator(strict_mode=True))
cb.register_input_validator(MessageContentValidator(allowed_roles=["user", "assistant", "system"]))

Implement custom validation by extending BaseValidator. validate is async and returns bool — raise to block, return True to pass:

from agentflow.utils.callbacks import BaseValidator
from agentflow.core.state import Message

class ProfanityValidator(BaseValidator):
async def validate(self, messages: list[Message]) -> bool:
for msg in messages:
if contains_profanity(msg.text()):
raise ValueError("Message rejected by profanity filter")
return True

cb.register_input_validator(ProfanityValidator())

CallbackManager vs GraphLifecycleHook

CallbackManagerGraphLifecycleHook
FiresEvery LLM / tool / MCP / skill callEvery graph run or node transition
Input validationYes — register_input_validatorNo
State accessInput/output data + CallbackContextFull AgentState
Best forPer-call logging, validation, cost trackingTracing, metrics, PII redaction, interrupt handling

Background tasks

Use BackgroundTaskManager to fire-and-forget async work from inside a node without blocking the graph — logging, analytics writes, cache warming, notifications.

from agentflow.utils.background_task_manager import BackgroundTaskManager
from injectq import Inject

async def my_node(
state: MyState,
task_manager: Inject[BackgroundTaskManager],
) -> Message:
# Fire and forget — does not block the graph
task_manager.create_task(
log_to_analytics(state),
name="analytics_log",
timeout=5.0,
)
return Message.assistant_message("Done")

BackgroundTaskManager is automatically registered in the DI container at compile time — inject it in any node with Inject[BackgroundTaskManager].

MethodPurpose
create_task(coro, name, timeout)Schedule a coroutine to run in the background
wait_for_all(timeout)Block until all tracked tasks finish
cancel_all()Cancel all running tasks
get_task_count()Number of currently active tasks

Always call aclose() on shutdown — it flushes all pending background tasks (including async checkpointer writes) before the process exits:

# In a FastAPI lifespan, signal handler, or test teardown
await compiled.aclose()

Skipping aclose() on a server with a PgCheckpointer can cause the last state write to be lost.


Streaming

Streaming is how you interact with a compiled graph in real time. It is independent of memory — chunks arrive while state is being built and checkpointed in the background.

Execution methods compared

MethodReturnsUse when
compiled.invoke(input, config)Final AgentStateYou only need the end result
compiled.stream(input, config)Sync generator of StreamChunkSync context, real-time display
compiled.astream(input, config)Async generator of StreamChunkAsync context (FastAPI, WebSocket)
compiled.ainvoke(input, config)Awaitable AgentStateAsync context, no streaming needed

Always use ainvoke / astream inside an async context. invoke and stream are sync wrappers over asyncio.run().

StreamChunk fields

FieldTypeDescription
eventStreamEventMESSAGE, STATE, ERROR, UPDATES
deltaboolIndicates if this chunk is an incremental text fragment (only on MESSAGE events)
messageMessage | NoneThe new message produced in this chunk
stateAgentState | NoneFull state snapshot (only on STATE events)
datadict | NoneEvent-specific metadata
thread_idstrThe thread this chunk belongs to
run_idstrThe run this chunk belongs to
timestampdatetimeWhen the chunk was emitted
async for chunk in compiled.astream({"messages": [...]}, {"thread_id": "abc"}):
if chunk.event == StreamEvent.MESSAGE and chunk.delta:
print(chunk.delta, end="", flush=True)

What's next

PageWhat it covers
MemoryThree memory layers: running state, per-thread checkpointing, long-term vector store
Serving AgentsFastAPI server, CLI, auth, authorization, publishers
ExtensibilityBaseAgent, BaseValidator and every other ABC you can subclass
Quality & ObservabilityUsing GraphLifecycleHook with OpenTelemetry, testing, evaluation