Skip to main content

Memory

AgentFlow has three distinct memory layers. Each solves a different problem; all three can run together in production.


Layer 1 — Running Memory

What it is: the live message list carried through every node in a single run.

AgentState.context holds all messages for the current execution. Every node reads from it, every Agent sends it to the LLM, every new message is appended via the add_messages reducer.

from typing import Annotated
from agentflow.core.state import AgentState, add_messages, Message

class MyState(AgentState):
context: Annotated[list[Message], add_messages]

BaseContextManager — keeping the window in budget

Before each LLM call the graph passes AgentState.context through the active BaseContextManager. It trims or summarises messages so the token count stays within the model's window.

from agentflow.core.state import MessageContextManager, SummaryContextManager

# drop oldest messages beyond a count limit
graph = StateGraph(
state=MyState(),
context_manager=MessageContextManager(max_messages=50),
)

# summarise old messages with a secondary LLM call
graph = StateGraph(
state=MyState(),
context_manager=SummaryContextManager(model="gpt-4o-mini", max_tokens=4096),
)

To write your own strategy extend BaseContextManager:

from agentflow.core.state import BaseContextManager

class PriorityContextManager(BaseContextManager):
def trim_context(self, state: AgentState) -> AgentState: # sync
...
return state
async def atrim_context(self, state: AgentState) -> AgentState: # async
return self.trim_context(state)

Layer 2 — Conversational Memory

What it is: per-thread state persistence. Pass the same thread_id on the next call and the graph resumes exactly where it left off.

How it works

Every compiled graph has a checkpointer. After each node completes the runtime calls aput_state to persist the full AgentState. On the next call it calls aget_state to restore it.

from agentflow.storage.checkpointer import InMemoryCheckpointer, PgCheckpointer

# dev / test — state lives in process memory, lost on restart
compiled = graph.compile(checkpointer=InMemoryCheckpointer())

# production — state survives restarts and load-balanced workers
compiled = graph.compile(
checkpointer=PgCheckpointer(
postgres_dsn="postgresql+asyncpg://...",
redis_url="redis://...", # optional hot cache
)
)

Redis hot cache

PgCheckpointer accepts an optional Redis URL. When present it adds a two-level lookup:

Redis serves read-heavy deployments from memory; Postgres is the durable source of truth. A cache miss falls through to Postgres automatically.

BaseCheckpointer — full interface

MethodPurpose
asetup()Create tables / indexes on first run
aput_state(thread_id, state)Persist full AgentState
aget_state(thread_id)Restore AgentState
aclear_state(thread_id)Wipe state for a thread
aput_state_cache / aget_state_cacheRedis hot-cache layer
aput_messages / aget_message / alist_messages / adelete_messageMessage-level CRUD
aput_thread / aget_thread / alist_threads / aclean_threadThread management
arelease()Release connections on shutdown

Extend BaseCheckpointer to plug in any storage backend:

from agentflow.storage.checkpointer import BaseCheckpointer

class MyCheckpointer(BaseCheckpointer[MyState]):
async def aput_state(self, thread_id, state): ...
async def aget_state(self, thread_id): ...
# ... implement all abstract methods

Thread config keys

KeyTypePurpose
thread_idstrIdentifies the conversation; required for persistence
user_idstrScopes threads to a user; used by auth and stores
run_idstrIdentifies a single invoke/stream call
recursion_limitintMax node hops per run (default 25)

Layer 3 — Long-Term Memory

What it is: cross-thread, cross-user facts stored as vector embeddings. Survives beyond any single conversation.

How the LLM writes and reads memory

The memory_tool is injected into the agent's tool list when you configure MemoryConfig. The LLM decides when to call it:

from agentflow.storage.store.memory_config import MemoryConfig
from agentflow.storage.store import QdrantStore, ReadMode
from agentflow.storage.store.embedding import OpenAIEmbedding

store = QdrantStore(
url="http://localhost:6333",
collection="agent-memory",
embedding=OpenAIEmbedding(model="text-embedding-3-small"),
)

agent = Agent(
model="gpt-4o",
memory=MemoryConfig(
store=store,
retrieval_mode=ReadMode.PRELOAD, # inject relevant memories before each LLM call
),
)

The LLM calls memory_tool(action="store", content="...") to write facts and memory_tool(action="search", query="...") to recall them. Writes are async and non-blocking.

Retrieval modes

ModeBehaviour
ReadMode.NO_RETRIEVALLLM must explicitly call memory_tool(action="search")
ReadMode.PRELOADFramework searches and injects relevant memories into the system prompt before each LLM call
ReadMode.POSTLOADFramework retrieves memories after the LLM call and appends them for the next turn

Backends

ClassBackendNotes
QdrantStoreQdrant (local or cloud)pip install "10xscale-agentflow[qdrant]"
Mem0Storemem0 managed servicepip install "10xscale-agentflow[mem0]"

BaseStore and BaseEmbedding

from agentflow.storage.store import BaseStore
from agentflow.storage.store.embedding import BaseEmbedding

class MyStore(BaseStore):
async def astore(self, user_id, content, metadata): ...
async def asearch(self, user_id, query, top_k): ...
async def adelete(self, user_id, memory_id): ...

class MyEmbedding(BaseEmbedding):
async def aembed(self, text) -> list[float]: ...
async def aembed_batch(self, texts) -> list[list[float]]: ...
dimension: int = 1536

Putting all three layers together


Go deeper

GuideLink
Add per-thread checkpointingHow-To: Checkpointing
Add long-term memory to an agentHow-To: Memory Store
Custom storage backendsExtensibility
Thread management via APIServing Agents