How to build a graph
StateGraph is the core orchestration primitive in AgentFlow. You construct a workflow by adding nodes (functions, Agent instances, or ToolNode instances), connecting them with edges, and compiling to get a runnable CompiledGraph.
Prerequisites
pip install 10xscale-agentflow
Set your provider API key:
export OPENAI_API_KEY=sk-... # for OpenAI
export GOOGLE_API_KEY=... # for Google
Step 1: Import the essentials
from agentflow.core.graph import StateGraph, Agent, ToolNode
from agentflow.core.state import AgentState, Message
from agentflow.utils import START, END
Step 2: Define tools
Tools are plain Python functions. Type-annotate parameters so the LLM receives an accurate schema.
def get_weather(city: str) -> str:
"""Return current weather for a city."""
return f"Weather in {city}: 22°C, partly cloudy."
def calculate(expression: str) -> str:
"""Evaluate a safe math expression."""
try:
return str(eval(expression, {"__builtins__": {}}, {}))
except Exception as e:
return f"Error: {e}"
Step 3: Create a ToolNode
Group tools in a ToolNode. The node registers every function by its __name__.
tool_node = ToolNode([get_weather, calculate])
Step 4: Create an Agent
Agent wraps the LLM call as a graph node.
agent = Agent(
model="gpt-4o",
system_prompt=[{"role": "system", "content": "You are a helpful assistant."}],
tool_node=tool_node,
)
Step 5: Build and wire the graph
graph = StateGraph()
graph.add_node("MAIN", agent)
graph.add_node("TOOL", tool_node)
# Route: if agent produced tool calls, go to TOOL, else END
def should_use_tools(state: AgentState) -> str:
last = state.context[-1] if state.context else None
if last and last.role == "assistant" and getattr(last, "tools_calls", None):
return "TOOL"
return END
graph.add_conditional_edges("MAIN", should_use_tools, {"TOOL": "TOOL", END: END})
graph.add_edge("TOOL", "MAIN") # loop back after tool execution
graph.set_entry_point("MAIN") # also adds START → MAIN edge
Graph methods
| Method | Signature | Purpose |
|---|---|---|
add_node | (name_or_func, func=None) | Register a node. Pass a function (name inferred) or an explicit name + callable/Agent/ToolNode. |
add_edge | (from_node, to_node) | Static route between nodes. add_edge(START, "X") sets the entry point. |
add_conditional_edges | (from_node, condition, path_map=None) | Dynamic routing. condition(state) returns a key; path_map maps keys to node names. Without path_map, condition must return the node name directly. |
set_entry_point | (node_name) | Shorthand for add_edge(START, node_name). |
override_node | (name, func) | Replace an existing node (useful in tests). |
compile | (checkpointer, store, interrupt_before, interrupt_after, ...) | Returns a CompiledGraph. |
Step 6: Compile
app = graph.compile()
Without a checkpointer argument, compilation defaults to InMemoryCheckpointer. For persistent state see how-to/python/set-up-checkpointing.
Step 7: Invoke
Synchronous
result = app.invoke(
{"messages": [Message.text_message("What is the weather in Paris?")]},
config={"thread_id": "session-1", "user_id": "user-42"},
)
for msg in result["messages"]:
print(msg.role, msg.content)
Asynchronous
import asyncio
async def main():
result = await app.ainvoke(
{"messages": [Message.text_message("Calculate 123 * 456")]},
config={"thread_id": "session-2"},
)
for msg in result["messages"]:
print(msg.role, msg.content)
asyncio.run(main())
Config keys
Use config for runtime metadata. For most cases, you only need these keys:
| Key | Default | Notes |
|---|---|---|
thread_id | auto (UUID) | Conversation/thread identifier used by the checkpointer. |
user_id | "test-user-id" | Passed to tools and publisher events. If not provided, it is set automatically. |
recursion_limit | 25 | Maximum node-execution steps before GraphRecursionError. |
Other reserved keys are auto-populated by the runtime and should not be set manually:
run_idis_streamtimestamp
When API authentication is enabled, two values are injected into config automatically:
user_iduser(whatever object/value your auth layer returns)
These are reserved keys; beyond them, you can add any custom keys you want in config.
For partial state updates, return a dictionary with only the fields you want to change.
State is available inside input_data, which is the first dictionary passed into node functions.
result = app.invoke(
{"messages": [Message.text_message("What is the weather in Paris?")], "state": {"location": "Paris"}},
config={"thread_id": "session-1", "user_id": "user-42"},
)
Step 8: Stream responses
from agentflow.utils import ResponseGranularity
async def stream_example():
async for chunk in app.astream(
{"messages": [Message.text_message("Tell me a short story.")]},
config={"thread_id": "stream-1"},
response_granularity=ResponseGranularity.LOW,
):
if chunk.content:
print(chunk.content, end="", flush=True)
asyncio.run(stream_example())
See how-to/python/stream-graph for the full streaming reference.
Response granularity
Pass response_granularity to invoke(), ainvoke(), or astream():
| Value | invoke() returns |
|---|---|
ResponseGranularity.LOW (default) | {"messages": [...]} — only the final messages |
ResponseGranularity.PARTIAL | {"messages": [...], "context": [...], "context_summary": ...} |
ResponseGranularity.FULL | Complete state dict including execution_meta |
Stop a running graph
# From another coroutine or task
await app.astop({"thread_id": "session-1"})
stop() is the sync wrapper. The graph checks the stop flag between nodes and halts before the next node.
Override a node for testing
# Production graph
app = graph.compile()
# In tests: swap the real agent with a stub
def stub_agent(state, config, **deps):
return {"messages": [Message.text_message("stub response", role="assistant")]}
app.override_node("MAIN", stub_agent)
override_node is available on both StateGraph (before compile) and CompiledGraph (after compile).
Complete example
import asyncio
from agentflow.core.graph import StateGraph, Agent, ToolNode
from agentflow.core.state import AgentState, Message
from agentflow.utils import START, END
def get_weather(city: str) -> str:
"""Return current weather for a city."""
return f"22°C, partly cloudy in {city}."
tool_node = ToolNode([get_weather])
agent = Agent(
model="gpt-4o",
system_prompt=[{"role": "system", "content": "You are a helpful assistant."}],
tool_node=tool_node,
)
graph = StateGraph()
graph.add_node("MAIN", agent)
graph.add_node("TOOL", tool_node)
def should_use_tools(state: AgentState) -> str:
last = state.context[-1] if state.context else None
if last and last.role == "assistant" and getattr(last, "tools_calls", None):
return "TOOL"
return END
graph.add_conditional_edges("MAIN", should_use_tools, {"TOOL": "TOOL", END: END})
graph.add_edge("TOOL", "MAIN")
graph.set_entry_point("MAIN")
app = graph.compile()
result = app.invoke(
{"messages": [Message.text_message("What is the weather in Tokyo?")]},
config={"thread_id": "demo-1"},
)
for msg in result["messages"]:
print(f"[{msg.role}]", msg.content)
What you learned
- Use
StateGraphto build the workflow, then callcompile()to get a runnableCompiledGraph. add_node,add_edge,add_conditional_edges,set_entry_pointwire the graph topology.invoke()/ainvoke()run the graph;astream()streams chunks token by token.configdict controlsthread_id,user_id,recursion_limit, and more.