Skip to main content

How to use publishers

Publishers emit structured EventModel events during graph execution — node starts and ends, tool calls, streaming tokens, state updates, and errors. They are optional: graphs run without them. Add a publisher when you need to observe, audit, or forward execution events to external systems.


Publisher overview

ClassTransportInstall extra
ConsolePublisherprint() to stdoutnone (built-in)
RedisPublisherRedis Pub/Sub or Redis Streamspip install 10xscale-agentflow[redis]
KafkaPublisherKafka topic via aiokafkapip install 10xscale-agentflow[kafka]
RabbitMQPublisherRabbitMQ exchange via aio-pikapip install 10xscale-agentflow[rabbitmq]
CompositePublisherFan-out to multiple publishersnone (built-in)
OtelPublisherOpenTelemetry tracesinstall opentelemetry-* packages

All publishers extend BasePublisher. Pass a publisher to StateGraph(publisher=...).


ConsolePublisher

Prints every event to stdout. Good for debugging locally. This publisher is opt-in and writes to stdout by default. In a server context where stdout output is not desirable, pass {"use_logger": True} to route events through the agentflow.publisher logger at INFO level instead:

from agentflow.runtime.publisher import ConsolePublisher
from agentflow.core.graph import StateGraph

# Default — writes to stdout
publisher = ConsolePublisher()

# Route through the logging system
publisher = ConsolePublisher(config={"use_logger": True})

graph = StateGraph(publisher=publisher)
# ... add nodes, edges, compile, invoke

Do not use ConsolePublisher in production. Use a real transport (RedisPublisher, KafkaPublisher, RabbitMQPublisher) for any deployed environment.


RedisPublisher

Publishes events as JSON to a Redis channel or stream. Requires pip install 10xscale-agentflow[redis].

Pub/Sub mode (default)

from agentflow.runtime.publisher import RedisPublisher
from agentflow.core.graph import StateGraph

publisher = RedisPublisher({
"url": "redis://localhost:6379/0",
"mode": "pubsub",
"channel": "agentflow.events",
"max_connections": 10,
})

graph = StateGraph(publisher=publisher)

A subscriber on the same channel receives every event JSON:

import redis.asyncio as aioredis
import asyncio

async def listen():
r = aioredis.from_url("redis://localhost:6379/0")
pubsub = r.pubsub()
await pubsub.subscribe("agentflow.events")
async for msg in pubsub.listen():
if msg["type"] == "message":
print(msg["data"])

asyncio.run(listen())

Redis Streams mode

publisher = RedisPublisher({
"url": "redis://localhost:6379/0",
"mode": "stream",
"stream": "agentflow.events",
"maxlen": 10000, # trim stream to last 10 000 entries
})

RedisPublisher config reference

KeyDefaultNotes
url"redis://localhost:6379/0"Redis connection URL.
mode"pubsub""pubsub" or "stream".
channel"agentflow.events"Pub/Sub channel name.
stream"agentflow.events"Redis Stream name.
maxlenNoneMax length cap for streams.
max_connections10Connection pool size.
socket_timeout5.0Socket timeout in seconds.
socket_connect_timeout5.0Connection timeout in seconds.
socket_keepaliveTrueTCP keepalive.
health_check_interval30Health-check interval in seconds.

KafkaPublisher

Publishes events to a Kafka topic. Requires pip install 10xscale-agentflow[kafka].

from agentflow.runtime.publisher import KafkaPublisher
from agentflow.core.graph import StateGraph

publisher = KafkaPublisher({
"bootstrap_servers": "localhost:9092",
"topic": "agentflow.events",
"client_id": "my-agent-service",
"compression_type": "gzip",
})

graph = StateGraph(publisher=publisher)

KafkaPublisher config reference

KeyDefaultNotes
bootstrap_servers"localhost:9092"Comma-separated broker list.
topic"agentflow.events"Kafka topic to publish to.
client_idNoneProducer client ID.
max_batch_size16384Max batch size in bytes.
linger_ms0Time to wait for batching in ms.
compression_typeNone"gzip", "snappy", "lz4", "zstd", or None.
request_timeout_ms30000Request timeout in milliseconds.

RabbitMQPublisher

Publishes events to a RabbitMQ exchange. Requires pip install 10xscale-agentflow[rabbitmq].

from agentflow.runtime.publisher import RabbitMQPublisher
from agentflow.core.graph import StateGraph

publisher = RabbitMQPublisher({
"url": "amqp://guest:guest@localhost/",
"exchange": "agentflow.events",
"routing_key": "agent.executions",
"exchange_type": "topic",
"durable": True,
})

graph = StateGraph(publisher=publisher)

RabbitMQPublisher config reference

KeyDefaultNotes
url"amqp://guest:guest@localhost/"AMQP connection URL.
exchange"agentflow.events"Exchange name.
routing_key"agentflow.events"Message routing key.
exchange_type"topic""topic", "direct", "fanout", "headers".
declareTrueDeclare the exchange if it doesn't exist.
durableTrueExchange survives broker restarts.
connection_timeout10Connection timeout in seconds.
heartbeat60Heartbeat interval in seconds.

CompositePublisher

Fan-out to multiple publishers simultaneously.

from agentflow.runtime.publisher import CompositePublisher, ConsolePublisher, RedisPublisher
from agentflow.core.graph import StateGraph

publisher = CompositePublisher([
ConsolePublisher(),
RedisPublisher({"url": "redis://localhost:6379/0"}),
])

graph = StateGraph(publisher=publisher)

Pass a list of publishers to StateGraph(publisher=[...]) and it is automatically wrapped in a CompositePublisher:

graph = StateGraph(
publisher=[
ConsolePublisher(),
KafkaPublisher({"bootstrap_servers": "kafka:9092"}),
]
)

OtelPublisher

Emits execution events as OpenTelemetry spans. Requires installing OpenTelemetry SDK packages manually.

from agentflow.runtime.publisher import OtelPublisher, setup_tracing

# Configure OTLP exporter (e.g. to Jaeger, Tempo, Honeycomb)
setup_tracing(service_name="my-agent-service")

publisher = OtelPublisher()
graph = StateGraph(publisher=publisher)

EventModel structure

Every event published carries an EventModel with these fields:

FieldTypeDescription
eventEventSource: GRAPH_EXECUTION, NODE_EXECUTION, LLM_CALL, TOOL_EXECUTION, STREAMING, REALTIME.
event_typeEventTypePhase: START, PROGRESS, RESULT, END, UPDATE, ERROR, INTERRUPTED.
content_typelist[ContentType]Content tags: TEXT, MESSAGE, TOOL_CALL, TOOL_RESULT, IMAGE, AUDIO, TRANSCRIPT, STATE, etc.
node_namestr | NoneNode that emitted the event.
datadictEvent payload (args, results, error messages, etc.).
content_blockslist[ContentBlock]Structured message blocks (tool calls, tool results, etc.).
metadatadictrun_id, thread_id, user_id, timestamp.
from agentflow.runtime.publisher import Event, EventType, ContentType

Complete example: graph with Redis event streaming

import asyncio
from agentflow.core.graph import StateGraph, Agent
from agentflow.core.state import AgentState, Message
from agentflow.storage.checkpointer import InMemoryCheckpointer
from agentflow.runtime.publisher import RedisPublisher
from agentflow.utils import END

publisher = RedisPublisher({
"url": "redis://localhost:6379/0",
"mode": "stream",
"stream": "my-agent.events",
"maxlen": 50000,
})

agent = Agent(
model="gpt-4o",
system_prompt=[{"role": "system", "content": "You are a helpful assistant."}],
)

graph = StateGraph(publisher=publisher)
graph.add_node("MAIN", agent)
graph.set_entry_point("MAIN")
graph.add_edge("MAIN", END)

app = graph.compile(checkpointer=InMemoryCheckpointer())

async def main():
result = await app.ainvoke(
{"messages": [Message.text_message("Hello!")]},
config={"thread_id": "demo", "user_id": "user-1"},
)
print(result["messages"][-1].content)
await publisher.close() # flush and close the connection

asyncio.run(main())

What you learned

  • Pass a publisher to StateGraph(publisher=...) to receive execution events.
  • ConsolePublisher is zero-config and prints to stdout.
  • RedisPublisher supports both Pub/Sub and Redis Streams; requires [redis] extra.
  • KafkaPublisher publishes to a Kafka topic; requires [kafka] extra.
  • RabbitMQPublisher publishes to a RabbitMQ exchange; requires [rabbitmq] extra.
  • CompositePublisher (or passing a list) fans out to multiple publishers.
  • Every event carries EventModel with source, phase, content type, node name, and metadata.