Skip to main content

SupervisorTeamAgent

A centralized multi-agent pattern where a dedicated supervisor LLM decides which specialist worker to invoke next.

Import path: agentflow.prebuilt.agent


Concept

Where a swarm has agents routing to each other directly, a supervisor pattern has a single coordinator that controls every routing decision. The supervisor's only job is to output a single word — the next worker's name, or FINISH.

Full graph — two-worker example

Worker without tools

A worker that has no tools routes directly back to PRE_SUPERVISOR after its LLM call:

Auto-generated supervisor prompt

SupervisorTeamAgent builds the supervisor's system prompt automatically from the worker registry:

Available workers:
- RESEARCHER: Searches the web for factual information.
- CODER: Writes and runs Python code.
- FINISH: All tasks fully completed.

Respond with only the name of the next worker, or FINISH.
Rules:
- Respond with a single word — exactly one worker name or FINISH.
- Do NOT explain your choice.
- Do NOT include any other text.

Override this entirely with supervisor_system_prompt.

Supervisor routing logic

def _route(state: AgentState) -> str:
rounds = state.execution_meta.internal_data.get("sta_rounds", 0)

if rounds >= max_rounds:
return END # hard cap

raw = last.text().strip().upper()

if "FINISH" in raw:
return END # task complete

for name in worker_names:
if re.search(rf"\b{re.escape(name)}\b", raw):
return name # delegate to worker

return END # unrecognized — terminate

Word-boundary regex prevents "CODE" from matching a worker named "CODER". An unrecognized response logs a warning and terminates rather than looping forever.

PRE_SUPERVISOR node

Every time a worker finishes, it routes to PRE_SUPERVISOR — a lightweight node that increments execution_meta.internal_data["sta_rounds"] — before handing back to the supervisor. This keeps the hard-cap check simple and decoupled from the worker logic.

Mini ReAct loop per worker

Each worker that has tools gets WORKER → WORKER_TOOL → WORKER wired automatically, giving the worker its own tool loop before it returns to the supervisor.


WorkerConfig fields

FieldTypeDefaultDescription
agentBaseAgentrequiredPre-built agent instance, configured independently
descriptionstr""Injected into the supervisor's system prompt so the LLM knows when to delegate here

Constructor Parameters

ParameterTypeDefaultDescription
supervisor_modelstrrequiredLLM model for the supervisor agent
workersdict[str, WorkerConfig]requiredWorker registry — name → config (UPPER-CASE recommended)
supervisor_system_promptlist[dict] | Noneauto-generatedOverride the supervisor prompt
max_roundsint10Maximum supervisor→worker delegations before terminating
stateAgentState | NoneNoneOptional custom state subclass
context_managerBaseContextManager | NoneNoneOptional custom context manager
publisherBasePublisher | NoneNoneEvent publisher for streaming
containerInjectQ | NoneNoneDependency injection container
**supervisor_kwargsAnyForwarded to the supervisor Agent only (e.g. provider, temperature)

compile() Parameters

ParameterTypeDefaultDescription
checkpointerBaseCheckpointerNonePersist and restore conversation state
storeBaseStoreNoneLong-term cross-thread storage
interrupt_beforelist[str]NonePause before the named nodes
interrupt_afterlist[str]NonePause after the named nodes
callback_managerCallbackManagerdefaultLifecycle hooks
media_storeBaseMediaStoreNoneBinary/media file storage
shutdown_timeoutfloat30.0Seconds to wait for clean shutdown

Full Code

Two-worker team (researcher + coder)

import asyncio
from dotenv import load_dotenv
from agentflow.core.graph import Agent, ToolNode
from agentflow.prebuilt.agent import SupervisorTeamAgent
from agentflow.prebuilt.agent.supervisor_team import WorkerConfig
from agentflow.prebuilt.tools import google_web_search
from agentflow.core.state import Message

load_dotenv()


def run_python(code: str) -> str:
"""Execute Python code and return stdout (use a real sandbox in production)."""
import io, contextlib
buf = io.StringIO()
with contextlib.redirect_stdout(buf):
exec(code, {}) # noqa: S102
return buf.getvalue()


agent = SupervisorTeamAgent(
supervisor_model="gpt-4o",
provider="openai", # forwarded to the supervisor Agent
workers={
"RESEARCHER": WorkerConfig(
agent=Agent(
model="gpt-4o-mini",
provider="openai",
tool_node=ToolNode([google_web_search]),
system_prompt=[{"role": "system",
"content": "Search the web and return factual results."}],
),
description="Searches the web and returns factual information.",
),
"CODER": WorkerConfig(
agent=Agent(
model="gpt-4o",
provider="openai",
tool_node=ToolNode([run_python]),
system_prompt=[{"role": "system",
"content": "Write and run Python code to solve problems."}],
),
description="Writes and executes Python code to solve computational problems.",
),
},
max_rounds=8,
)

app = agent.compile()


async def main():
result = await app.ainvoke(
{"messages": [Message.text_message(
"Find the current price of Bitcoin, then calculate how much $1000 "
"would be worth if BTC doubles."
)]},
config={"thread_id": "supervisor-1"},
)
print(result["context"][-1].text())


asyncio.run(main())

With a custom supervisor prompt

from agentflow.prebuilt.agent import SupervisorTeamAgent
from agentflow.prebuilt.agent.supervisor_team import WorkerConfig

agent = SupervisorTeamAgent(
supervisor_model="gpt-4o",
provider="openai",
workers={
"RESEARCHER": WorkerConfig(agent=..., description="..."),
"CODER": WorkerConfig(agent=..., description="..."),
},
supervisor_system_prompt=[{
"role": "system",
"content": (
"You manage a RESEARCHER and CODER. "
"Always research before coding. "
"Respond with only one word: RESEARCHER, CODER, or FINISH."
),
}],
max_rounds=6,
)

With a checkpointer (persistent conversations)

import asyncio
from agentflow.core.graph import Agent, ToolNode
from agentflow.prebuilt.agent import SupervisorTeamAgent
from agentflow.prebuilt.agent.supervisor_team import WorkerConfig
from agentflow.storage.checkpointer import PostgresCheckpointer
from agentflow.prebuilt.tools import google_web_search, safe_calculator
from agentflow.core.state import Message

agent = SupervisorTeamAgent(
supervisor_model="gpt-4o-mini",
provider="openai",
workers={
"RESEARCHER": WorkerConfig(
agent=Agent(model="gpt-4o-mini", provider="openai",
tool_node=ToolNode([google_web_search])),
description="Searches the web for facts.",
),
"CALCULATOR": WorkerConfig(
agent=Agent(model="gpt-4o-mini", provider="openai",
tool_node=ToolNode([safe_calculator])),
description="Performs arithmetic and numeric calculations.",
),
},
max_rounds=6,
)

checkpointer = PostgresCheckpointer(dsn="postgresql://user:pass@localhost/db")
app = agent.compile(checkpointer=checkpointer)


async def main():
result = await app.ainvoke(
{"messages": [Message.text_message(
"What is the compound interest on $5000 at 7% over 10 years?"
)]},
config={"thread_id": "user-10-finance"},
)
print(result["context"][-1].text())


asyncio.run(main())

Google Gemini supervisor with OpenAI workers

**supervisor_kwargs goes to the supervisor only — each worker Agent is configured independently:

from agentflow.core.graph import Agent, ToolNode
from agentflow.prebuilt.agent import SupervisorTeamAgent
from agentflow.prebuilt.agent.supervisor_team import WorkerConfig
from agentflow.prebuilt.tools import google_web_search

agent = SupervisorTeamAgent(
supervisor_model="google/gemini-2.5-flash",
provider="google", # supervisor uses Gemini
workers={
"RESEARCHER": WorkerConfig(
agent=Agent(
model="gpt-4o-mini",
provider="openai", # worker uses OpenAI
tool_node=ToolNode([google_web_search]),
),
description="Researches topics on the web.",
),
},
max_rounds=5,
)

Running with agentflow play

graph.py

from agentflow.core.graph import Agent, ToolNode
from agentflow.prebuilt.agent import SupervisorTeamAgent
from agentflow.prebuilt.agent.supervisor_team import WorkerConfig
from agentflow.prebuilt.tools import google_web_search, safe_calculator

agent = SupervisorTeamAgent(
supervisor_model="gpt-4o-mini",
provider="openai",
workers={
"RESEARCHER": WorkerConfig(
agent=Agent(model="gpt-4o-mini", provider="openai",
tool_node=ToolNode([google_web_search])),
description="Searches the web for facts.",
),
"CALCULATOR": WorkerConfig(
agent=Agent(model="gpt-4o-mini", provider="openai",
tool_node=ToolNode([safe_calculator])),
description="Performs arithmetic and numeric calculations.",
),
},
max_rounds=6,
)

app = agent.compile()

agentflow.json

{
"agent": "graph:app",
"env": ".env",
"auth": null,
"checkpointer": null,
"injectq": null,
"store": null,
"redis": null,
"thread_name_generator": null
}
agentflow play