Production-Grade Agent Architecture: Implementing Long-Term Memory and Human-in-the-Loop
Production-Grade Agent Architecture: Implementing Long-Term Memory and Human-in-the-Loop
As a CSE student fascinated by the human brain and the potential of AI, my ultimate goal is to build intelligent systems that truly think. We're talking about agents capable of complex reasoning, adaptation, and continuous learning, much like a scaled-up, specialized version of our own cortex – perhaps even an approximation of a cortical column, driven by a Mixture-of-Experts (MoE) architecture.
Moving from research prototypes to production-ready agents isn't just about bigger models or faster GPUs. It's about engineering robustness. Two critical pillars stand out for any agent destined for the real world: persistent, semantic long-term memory and robust Human-in-the-Loop (HITL) safety mechanisms. Without these, your agent is either an amnesiac child or a dangerous loose cannon.
This post dives deep into architecting these components. You won't find us relying on bloated, high-level frameworks where you lose control and performance. We're going direct, leveraging raw APIs and TypeScript to build efficient, scalable systems.
Why Bother? From Prototype to Persistent Intelligence
Imagine an agent tasked with managing complex projects, learning from past failures, or even contributing to scientific discovery. A stateless agent is useless. It’s like a human waking up every day with complete amnesia. True intelligence requires the ability to recall, synthesize, and apply past experiences – our hippocampus equivalent. This isn't just about storing data; it's about semantic retrieval, understanding what is relevant to the current context.
Equally important is the prefrontal cortex analogue: executive oversight. While we strive for autonomous agents, allowing them to execute high-impact actions (e.g., deploying code, sending critical emails, performing financial transactions) without supervision is irresponsible. We need safety rails, a mechanism for human review and approval at critical junctures. This is where HITL comes in, ensuring the agent’s actions align with human intent and ethical boundaries.
My vision for an MoE-driven agent is that each "expert" doesn't just process information; it learns from its past interactions, storing and retrieving relevant knowledge. Furthermore, a dedicated "Safety Expert" within the MoE orchestrates the interaction with the human, pausing execution when necessary.
Part 1: The Agent's Hippocampus – Semantic Long-Term Memory
Our goal is not just a key-value store. We want semantic memory, where the agent can query concepts and retrieve related experiences, even if the exact keywords aren't present. This mirrors how our brains make associations.
Architecture: Embeddings + Vector Database
The core idea is simple:
- Embed: Transform textual experiences (thoughts, observations, conversations) into dense numerical vectors using an embedding model. These vectors capture the semantic meaning.
- Store: Persist these vectors in a specialized database optimized for similarity search – a vector database.
- Retrieve: When the agent needs to recall, it embeds its query and searches the vector database for semantically similar vectors.
For performance, choosing the right embedding model and an efficient vector store is paramount. Forget the abstraction layers that add latency; we're talking direct API calls.
// interfaces/memory.ts
export interface MemoryRecord {
id: string;
content: string;
timestamp: number;
metadata?: Record<string, any>;
embedding?: number[]; // Stored for retrieval if necessary, but primarily for DB
}
export interface IVectorStoreClient {
/**
* Stores a batch of memory records after embedding their content.
* @param records Records to store.
* @returns Promise resolving to void.
*/
add(records: Omit<MemoryRecord, 'id' | 'embedding'>[]): Promise<void>;
/**
* Retrieves the top_k most semantically similar records to the query.
* @param query The text query to search for.
* @param topK The number of similar records to retrieve.
* @returns Promise resolving to an array of MemoryRecord.
*/
search(query: string, topK: number): Promise<MemoryRecord[]>;
}Implementation: A DirectVectorStore Example
We'll assume a Cohere or OpenAI embeddings API and a vector database like Qdrant or even a local Faiss index for high-throughput, low-latency applications.
// services/embeddingService.ts
import axios from 'axios';
class EmbeddingService {
private readonly apiKey: string;
private readonly apiUrl: string;
constructor(apiKey: string, apiUrl: string = "https://api.openai.com/v1/embeddings") {
this.apiKey = apiKey;
this.apiUrl = apiUrl;
}
/**
* Generates embeddings for a batch of texts.
* Emphasize batching for efficiency!
*/
public async embedBatch(texts: string[]): Promise<number[][]> {
if (texts.length === 0) return [];
try {
const response = await axios.post(
this.apiUrl,
{
model: "text-embedding-3-small", // Or other performant model
input: texts,
},
{
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json',
},
}
);
return response.data.data.map((item: any) => item.embedding);
} catch (error) {
console.error("Error generating embeddings:", error);
throw new Error("Failed to generate embeddings.");
}
}
}
// services/vectorStoreClient.ts
import { v4 as uuidv4 } from 'uuid';
import { IVectorStoreClient, MemoryRecord } from '../interfaces/memory';
import { EmbeddingService } from './embeddingService';
import { QdrantClient } from '@qdrant/js-client-rest'; // Example: Using Qdrant
export class QdrantVectorStoreClient implements IVectorStoreClient {
private readonly embeddingService: EmbeddingService;
private readonly qdrantClient: QdrantClient;
private readonly collectionName: string;
constructor(embeddingService: EmbeddingService, qdrantHost: string, collectionName: string = "agent_memories") {
this.embeddingService = embeddingService;
this.qdrantClient = new QdrantClient({ host: qdrantHost });
this.collectionName = collectionName;
this.initializeCollection().catch(console.error); // Ensure collection exists
}
private async initializeCollection() {
const collectionExists = (await this.qdrantClient.getCollections()).collections.some(
(c) => c.name === this.collectionName
);
if (!collectionExists) {
await this.qdrantClient.createCollection(this.collectionName, {
vectors_config: { size: 1536, distance: 'Cosine' }, // Match embedding model dimension
});
console.log(`Qdrant collection '${this.collectionName}' created.`);
}
}
public async add(records: Omit<MemoryRecord, 'id' | 'embedding'>[]): Promise<void> {
if (records.length === 0) return;
const contents = records.map(r => r.content);
const embeddings = await this.embeddingService.embedBatch(contents);
const points = records.map((record, index) => ({
id: uuidv4(),
vector: embeddings[index],
payload: {
content: record.content,
timestamp: record.timestamp,
metadata: record.metadata,
},
}));
await this.qdrantClient.upsert(this.collectionName, {
wait: true,
batch: {
ids: points.map(p => p.id),
vectors: points.map(p => p.vector),
payloads: points.map(p => p.payload),
},
});
}
public async search(query: string, topK: number): Promise<MemoryRecord[]> {
if (!query) return [];
const [queryEmbedding] = await this.embeddingService.embedBatch([query]);
const searchResult = await this.qdrantClient.search(this.collectionName, {
vector: queryEmbedding,
limit: topK,
with_payload: true,
});
return searchResult.map(result => ({
id: result.id.toString(),
content: (result.payload as any).content,
timestamp: (result.payload as any).timestamp,
metadata: (result.payload as any).metadata,
// score: result.score, // Could include score if needed
}));
}
}Within our MoE agent, a MemoryManager expert would be responsible for deciding when to store new memories (e.g., after significant observations, successful tool executions, or complex reasoning steps) and what to store (summarized insights, raw observations). It would also abstract the search calls for other experts. This keeps the MemoryManager focused and efficient.
Part 2: The Agent's Prefrontal Cortex – Human-in-the-Loop for Critical Decisions
Even with perfect memory and reasoning, agents need human oversight for high-stakes actions. This requires the agent to pause, externalize its proposed action, wait for human input, and then proceed or modify its plan based on that input.
While my preference leans towards raw APIs, managing complex, branching agent workflows often benefits from a graph-based state machine. LangGraph offers a decent way to define these workflows, although it can introduce some verbosity. We'll leverage its state management and checkpointing capabilities, while keeping the actual "nodes" as direct and performant as possible.
Architecture: LangGraph + Persistent State + External API
- Agent State: Define a comprehensive state that includes the agent's current thoughts, observations, planned actions, and crucially, flags for human approval.
- Workflow Graph: Define the agent's decision-making process as a graph using
LangGraph. - Approval Node: Introduce a special node that, when reached with a critical action, flags the agent's state as
awaitingHumanApprovaland saves a checkpoint. - External API: A dedicated endpoint (e.g., a REST API or WebSocket) receives the proposed action, presents it to a human, and then sends back the human's decision.
- Resume: Upon receiving human approval/rejection, the agent's state is updated, and the
LangGraphexecution is resumed from the checkpoint.
Implementation: State & Graph Definition
// types/agent.ts
export interface AgentState {
input: string; // User query
scratchpad: string[]; // Internal monologue/reasoning steps
tool_calls: { tool_name: string; args: Record<string, any>; }[]; // Tools agent intends to call
final_answer?: string; // If agent has a final answer
awaitingHumanApproval: boolean; // Flag for HITL
proposedAction?: {
type: 'tool_call' | 'final_answer';
details: any;
};
humanFeedback?: 'approve' | 'reject' | 'modify'; // What human decided
modificationDetails?: string; // If human requested modification
}
// Initial state factory
export const initialAgentState: () => AgentState = () => ({
input: '',
scratchpad: [],
tool_calls: [],
awaitingHumanApproval: false,
});Now, let's sketch the LangGraph integration. This requires langchain_core and langgraph.
// agentGraph.ts
import { BaseMessage, HumanMessage, ToolMessage } from '@langchain/core/messages';
import { StateGraphArgs, END, StateGraph } from '@langchain/langgraph';
import { AgentState, initialAgentState } from './types/agent';
import { IVectorStoreClient } from './interfaces/memory';
import { QdrantVectorStoreClient } from './services/vectorStoreClient';
import { EmbeddingService } from './services/embeddingService';
import { ChatOpenAI } from '@langchain/openai'; // Using a direct LLM client
// --- Mock Tools for demonstration ---
interface Tool {
name: string;
description: string;
schema: any; // Zod schema
func: (args: any) => Promise<string>;
isCritical?: boolean; // New flag for HITL
}
const safeTool: Tool = {
name: "read_database",
description: "Reads non-sensitive data from a simulated database.",
schema: { type: 'object', properties: { query: { type: 'string' } } },
func: async (args: { query: string }) => {
console.log(`Executing safe tool: read_database with query: ${args.query}`);
return `Data for "${args.query}" is: [itemA, itemB]`;
},
isCritical: false,
};
const criticalTool: Tool = {
name: "deploy_code",
description: "Deploys code to production. This is a critical operation.",
schema: { type: 'object', properties: { project_id: { type: 'string' }, version: { type: 'string' } } },
func: async (args: { project_id: string; version: string }) => {
console.warn(`CRITICAL: Deploying project ${args.project_id} version ${args.version}...`);
return `Project ${args.project_id} version ${args.version} deployed successfully.`;
},
isCritical: true,
};
const tools: Tool[] = [safeTool, criticalTool];
// --- End Mock Tools ---
const llm = new ChatOpenAI({
model: "gpt-4o-mini", // Or other performant model
temperature: 0.7,
});
// Bind tools to the LLM (LangChain specific)
const llmWithTools = llm.bindTools(tools);
// --- Agent Nodes ---
// The main agent 'think' node (our MoE orchestrator/expert)
const agentThinkNode = async (state: AgentState): Promise<Partial<AgentState>> => {
console.log("[AGENT] Thinking...");
const messages: BaseMessage[] = [
new HumanMessage(state.input),
...state.scratchpad.map(s => new BaseMessage({ content: s, name: 'scratchpad_entry' })), // Example: add scratchpad to context
];
// Simulate retrieval from long-term memory for relevant context
// In a real MoE, a 'MemoryExpert' would handle this.
const relevantMemories = await memoryClient.search(state.input, 2);
if (relevantMemories.length > 0) {
messages.unshift(new HumanMessage(`Past relevant experiences: ${relevantMemories.map(m => m.content).join('\n')}`));
}
const response = await llmWithTools.invoke(messages);
const toolCalls = response.tool_calls || [];
const content = response.content || '';
// Simulate the 'SafetyExpert' determining if an action is critical
if (toolCalls.length > 0) {
// Check if any proposed tool call is critical
const criticalCall = toolCalls.find(tc => tools.find(t => t.name === tc.name)?.isCritical);
if (criticalCall) {
return {
scratchpad: [...state.scratchpad, `Proposed critical tool call: ${JSON.stringify(criticalCall)}`],
awaitingHumanApproval: true,
proposedAction: { type: 'tool_call', details: criticalCall },
};
}
}
if (content.includes("FINAL ANSWER")) { // Simple heuristic for final answer
return {
scratchpad: [...state.scratchpad, content],
final_answer: content.replace("FINAL ANSWER:", "").trim(),
awaitingHumanApproval: true, // Also require approval for final answer if critical enough
proposedAction: { type: 'final_answer', details: content },
};
}
return {
scratchpad: [...state.scratchpad, content],
tool_calls: toolCalls.map(tc => ({ tool_name: tc.name, args: tc.args })),
};
};
// Node to execute tools
const executeToolsNode = async (state: AgentState): Promise<Partial<AgentState>> => {
console.log("[AGENT] Executing tools...");
const toolResults: string[] = [];
for (const toolCall of state.tool_calls) {
const tool = tools.find(t => t.name === toolCall.tool_name);
if (tool) {
try {
const result = await tool.func(toolCall.args);
toolResults.push(`Tool ${toolCall.tool_name} returned: ${result}`);
} catch (e) {
toolResults.push(`Tool ${toolCall.tool_name} failed: ${e}`);
}
} else {
toolResults.push(`Unknown tool: ${toolCall.tool_name}`);
}
}
// Agent remembers the outcome of its actions
await memoryClient.add([{
content: `Agent executed tools: ${state.tool_calls.map(tc => tc.tool_name).join(', ')}. Results: ${toolResults.join('; ')}`,
timestamp: Date.now(),
metadata: { type: 'tool_execution' }
}]);
return {
scratchpad: [...state.scratchpad, ...toolResults],
tool_calls: [], // Clear tool calls after execution
};
};
// --- LangGraph Setup ---
const graphState: StateGraphArgs<AgentState>['channels'] = {
input: {
value: (x: string, y: string) => y, // Overwrite
default: () => '',
},
scratchpad: {
value: (x: string[], y: string[]) => x.concat(y), // Append
default: () => [],
},
tool_calls: {
value: (x: any[], y: any[]) => y, // Overwrite
default: () => [],
},
final_answer: {
value: (x: string | undefined, y: string | undefined) => y, // Overwrite
default: () => undefined,
},
awaitingHumanApproval: {
value: (x: boolean, y: boolean) => y, // Overwrite
default: () => false,
},
proposedAction: {
value: (x: any | undefined, y: any | undefined) => y, // Overwrite
default: () => undefined,
},
humanFeedback: {
value: (x: 'approve' | 'reject' | 'modify' | undefined, y: 'approve' | 'reject' | 'modify' | undefined) => y,
default: () => undefined,
},
modificationDetails: {
value: (x: string | undefined, y: string | undefined) => y,
default: () => undefined,
},
};
const workflow = new StateGraph({
channels: graphState,
})
.addNode("agent_think", agentThinkNode)
.addNode("execute_tools", executeToolsNode);
// Define conditional edges for dynamic workflow
workflow.addConditionalEdges(
"agent_think",
(state: AgentState) => {
if (state.awaitingHumanApproval) {
return "await_human_approval"; // Special state indicating pause
}
if (state.tool_calls.length > 0) {
return "execute_tools";
}
if (state.final_answer) {
return END;
}
return "agent_think"; // Loop back to think if no tools/final answer
}
);
workflow.addConditionalEdges(
"execute_tools",
(state: AgentState) => {
if (state.final_answer) {
return END;
}
return "agent_think"; // After tools, go back to thinking
}
);
workflow.setEntryPoint("agent_think");
// Compile the graph
const app = workflow.compile();
// Initialize memory client (global or passed into nodes)
const embeddingSvc = new EmbeddingService(process.env.OPENAI_API_KEY!);
const memoryClient = new QdrantVectorStoreClient(embeddingSvc, process.env.QDRANT_HOST || 'localhost:6333');
// --- External API for Human-in-the-Loop Management ---
// This would typically be a separate microservice or API endpoint.
// It retrieves the paused agent's state, presents it, and sends feedback.
// Example of how human feedback would resume the agent
export async function provideHumanFeedback(threadId: string, feedback: 'approve' | 'reject' | 'modify', modification?: string) {
// In a real system, you'd load the current state of the thread/agent from LangGraph's checkpoint.
// For demonstration, we'll simulate updating the state and resuming.
// This is the critical part: LangGraph needs to be able to load the checkpoint
// and continue execution with the updated state.
// When LangGraph encounters a "return 'await_human_approval'" it implicitly
// saves the state to its configured checkpointing mechanism (e.g., Redis, SQL).
// To resume, you'd call `app.invoke` again with the thread_id and the
// *updated* state for `humanFeedback` and `modificationDetails`.
// LangGraph's checkpointing automatically handles loading the paused state.
// We modify the *input* for the next step of the agent.
const feedbackState: Partial<AgentState> = {
awaitingHumanApproval: false, // Reset approval flag
humanFeedback: feedback,
modificationDetails: modification,
// The agent's next 'think' cycle will process this feedback
input: `Human feedback received: ${feedback}. ${modification ? `Modification requested: ${modification}` : ''}`,
};
// This is where you would call LangGraph's `invoke` with the `threadId`
// and the `feedbackState` as the updated state. LangGraph will then
// load the checkpointed state for `threadId`, merge `feedbackState`,
// and continue execution.
// (Actual call depends on how LangGraph's checkpointing is configured and exposed)
console.log(`[HITL] Human feedback for thread ${threadId}: ${feedback}. Modification: ${modification || 'none'}`);
console.log("This feedback will now be fed back into the agent's workflow.");
// In a production setup, you'd use a LangGraph `checkpoint_id` and resume
// from there. For example:
// const stream = app.stream(feedbackState, {
// configurable: { thread_id: threadId }
// });
// This would pick up from the last checkpoint and continue processing.
}
// --- Main execution flow (example) ---
async function runAgentConversation(threadId: string, prompt: string) {
let currentState: AgentState = initialAgentState();
currentState.input = prompt;
console.log(`[USER] ${prompt}`);
// LangGraph with configurable thread_id will manage checkpoints
const stream = app.stream(currentState, {
configurable: { thread_id: threadId },
});
for await (const s of stream) {
if (s.__end__) {
console.log("[AGENT] Workflow finished.");
currentState = s.__end__ as AgentState;
break;
}
const newNode = Object.keys(s).filter(key => key !== '__end__')[0];
console.log(`[GRAPH] Current node: ${newNode}`);
currentState = s[newNode] as AgentState;
if (currentState.awaitingHumanApproval) {
console.warn(`[HITL REQUIRED] Agent proposed action: ${JSON.stringify(currentState.proposedAction)}. Waiting for human approval for thread ${threadId}...`);
// Here, the system would pause.
// A separate UI/API would pick up this state, show it to a human.
// The human would then call `provideHumanFeedback(threadId, 'approve' | 'reject' | 'modify', modificationDetails)`.
// The `stream` would then continue from the checkpoint.
return currentState; // Exit here for simulation of waiting
}
}
if (currentState.final_answer) {
console.log(`[AGENT FINAL ANSWER] ${currentState.final_answer}`);
} else {
console.log("[AGENT] No final answer reached, current scratchpad:", currentState.scratchpad);
}
return currentState;
}
// Example Usage (requires setting OPENAI_API_KEY and QDRANT_HOST in .env or environment)
// import { config } from 'dotenv';
// config();
// (async () => {
// const thread1 = "user_123";
// let state;
// // First interaction: non-critical tool
// state = await runAgentConversation(thread1, "What is the capital of France? Also, read some database entries about 'users'.");
// // Agent should run read_database and return answer
// // Second interaction: critical tool
// // state = await runAgentConversation(thread1, "Deploy project 'frontend-app' version '1.2.0' to production.");
// // Agent should pause here, setting `awaitingHumanApproval: true`
// // To simulate approval (after the above call returns with awaitingHumanApproval)
// // if (state && state.awaitingHumanApproval) {
// // console.log("Simulating human approval after 5 seconds...");
// // await new Promise(resolve => setTimeout(resolve, 5000));
// // await provideHumanFeedback(thread1, 'approve');
// // console.log("Resuming agent after human approval...");
// // await runAgentConversation(thread1, "Continue with deployment."); // Agent will pick up from checkpoint
// // }
// })();
This is a simplified example. In a real production system:
LangGraph's checkpointing would be configured with a persistent store (e.g.,RedisSupervisor).- The
app.streamwould be run within an asynchronous worker (e.g., a message queue consumer). - When
awaitingHumanApprovalis true, the worker would push the agent's state to a dedicated queue or API, notifying a human approval system. - The human approval system would then trigger
provideHumanFeedback, which would put a message back into the worker's queue, including thethread_idand thehumanFeedback, promptingapp.streamto resume.
What I Learned: Control, Complexity, and Cognitive Architectures
Building production-grade agents forces a shift in perspective. It's no longer just about chaining LLM calls; it's about robust state management, efficient data flow, and graceful failure modes.
- The Framework Tax vs. Raw Power: While
LangGraphprovides a structural advantage for complex state machines, its verbosity and overhead can be frustrating. For core logic like embedding and vector search, direct API calls are indispensable for performance and granular control. This confirms my bias: abstract where it benefits, but get down to the metal where performance or unique behavior is critical. - State is Everything: Managing mutable state across asynchronous steps, especially with external human interaction, is inherently complex. A clear
AgentStatedefinition and reliable checkpointing are non-negotiable. - The Necessity of Specialization: The concept of MoE isn't just for LLMs. It's a powerful architectural pattern for agents. A dedicated
MemoryExpertandSafetyExpert(which triggers HITL) encapsulate distinct functionalities, making the system more modular, testable, and robust. - The Human Factor: Implementing HITL isn't just about code; it requires designing effective user interfaces and clear communication protocols for human operators. The agent needs to present its reasoning and proposed action concisely for humans to make informed decisions quickly.
This journey, constantly comparing artificial systems to the elegant complexity of the human brain, reinforces my belief that we are only scratching the surface. Long-term memory and executive control are fundamental for any truly intelligent system. By building these foundational elements with performance and precision, we move one step closer to agents that don't just mimic intelligence, but embody it. The next frontiers involve even more dynamic memory consolidation, learning from human feedback loops, and truly adaptive MoE routing.