Skip to main content

Callbacks and Command

Callbacks and Command are two advanced control surfaces:

  • Callbacks observe, validate, transform, or recover around model, tool, MCP, validation, and skill invocations.
  • Command lets a node return both a state update and a runtime routing decision.

CallbackManager

Pass a CallbackManager when compiling the graph:

from agentflow.utils import CallbackManager, InvocationType

callback_manager = CallbackManager()
app = graph.compile(callback_manager=callback_manager)

Hook families:

HookPurpose
register_before_invokeValidate or transform input before an invocation.
register_after_invokeInspect, log, or transform output after an invocation.
register_on_errorRecover from an error or let it re-raise.
register_input_validatorAdd a structured message validator.

Invocation types include AI, TOOL, MCP, INPUT_VALIDATION, and SKILL.

Validators

Validators are useful for input policy, prompt-injection protection, and business rules.

from agentflow.utils import CallbackManager
from agentflow.utils.validators import PromptInjectionValidator

callback_manager = CallbackManager()
callback_manager.register_input_validator(PromptInjectionValidator(strict_mode=True))

app = graph.compile(callback_manager=callback_manager)

Command

Use Command when a node needs to update state and choose the next node at runtime.

from agentflow.utils import Command, END

def router_node(state, config):
last = state.context[-1].text() if state.context else ""

if "billing" in last.lower():
return Command(update={"route": "billing"}, goto="BILLING")

return Command(goto=END)

Prefer conditional edges for normal graph routing because they are easier to visualize and test. Use Command for dynamic jumps, recovery branches, handoffs, or routing that depends on side effects inside the node.

Graph Lifecycle Hooks

While CallbackManager observes invocation-level events (before/after each LLM, tool, or MCP call), graph lifecycle hooks observe graph-level orchestration events that fire once per graph run (or once per node transition).

Register a GraphLifecycleHook to react to structural events:

from agentflow.utils.callbacks import GraphLifecycleHook, GraphLifecycleContext
from agentflow.state import AgentState
from agentflow.state.message import Message

class MyLifecycleHook(GraphLifecycleHook):
async def on_graph_start(self, context: GraphLifecycleContext, state: AgentState) -> AgentState | None:
"""Initialize trace, observability, or state enrichment."""
print(f"Graph starting: thread_id={context.thread_id}")
return None

async def on_graph_end(self, context: GraphLifecycleContext, final_state: AgentState,
messages: list[Message], total_steps: int) -> AgentState | None:
"""Record metrics, send notifications, or perform cleanup."""
print(f"Graph completed in {total_steps} steps")
return None

async def on_graph_error(self, context: GraphLifecycleContext, error: Exception,
partial_state: AgentState, messages: list[Message],
step: int, node_name: str) -> tuple[AgentState, str] | None:
"""Alert on failures and mask sensitive data before persistence."""
print(f"Graph failed at {node_name}: {error}")
return None

async def on_interrupt(self, context: GraphLifecycleContext, interrupted_node: str,
interrupt_type: str, state: AgentState) -> AgentState | None:
"""React when execution pauses waiting for user input."""
print(f"Graph paused at {interrupted_node}")
return None

async def on_resume(self, context: GraphLifecycleContext, resumed_node: str,
state: AgentState, resume_data: dict) -> AgentState | None:
"""Validate and log when paused execution resumes."""
print(f"Graph resuming from {resumed_node}")
return None

async def on_checkpoint(self, context: GraphLifecycleContext, state: AgentState,
messages: list[Message], is_context_trimmed: bool) -> tuple[AgentState, list[Message]] | AgentState | None:
"""React before state is persisted—redact PII, replicate to cache, etc."""
print(f"Checkpoint: {len(messages)} messages")
return None

async def on_state_update(self, context: GraphLifecycleContext, node_name: str,
old_state: AgentState, new_state: AgentState, step: int) -> AgentState | None:
"""Observe each node transition—most granular graph-level hook."""
print(f"Step {step}: {node_name}")
return None

app = graph.compile(lifecycle_hook=MyLifecycleHook())

Lifecycle hooks are useful for:

  • Observability: Start/stop OpenTelemetry spans, send metrics to Datadog or Prometheus
  • Human-in-the-loop: Coordinate interrupts, approvals, and resume workflows
  • Compliance: Redact PII before persistence, write audit logs at checkpoint time
  • Notifications: Send Slack/email when graph completes, fails, or needs approval
  • Debugging: Observe state mutations per node, detect infinite loops

Key difference from CallbackManager:

AspectCallbackManagerGraphLifecycleHook
FiresOnce per LLM/tool/MCP invocationOnce per graph run (or once per node)
ContextWhich function was called, function nameThread ID, run ID, graph state
Use caseValidate/transform invocationsMonitor/coordinate entire execution

Rules

RuleWhy it matters
Keep callbacks boundedThey run inside graph execution paths.
Avoid global mutable request stateUse context metadata and config instead.
Return the expected shape from transforming callbacksDownstream invocations expect specific data.
Test Command routesMissing destinations and recursion loops are runtime issues.
Don't suppress errors in lifecycle hookson_graph_error alerts but cannot recover; use node-level on_error for recovery.