Back to blogs

Production-Grade Agent Architecture: Implementing Long-Term Memory and Human-in-the-Loop

April 19, 2024

Production-Grade Agent Architecture: Implementing Long-Term Memory and Human-in-the-Loop

As a CSE student fascinated by the human brain and the potential of AI, my ultimate goal is to build intelligent systems that truly think. We're talking about agents capable of complex reasoning, adaptation, and continuous learning, much like a scaled-up, specialized version of our own cortex – perhaps even an approximation of a cortical column, driven by a Mixture-of-Experts (MoE) architecture.

Moving from research prototypes to production-ready agents isn't just about bigger models or faster GPUs. It's about engineering robustness. Two critical pillars stand out for any agent destined for the real world: persistent, semantic long-term memory and robust Human-in-the-Loop (HITL) safety mechanisms. Without these, your agent is either an amnesiac child or a dangerous loose cannon.

This post dives deep into architecting these components. You won't find us relying on bloated, high-level frameworks where you lose control and performance. We're going direct, leveraging raw APIs and TypeScript to build efficient, scalable systems.

Why Bother? From Prototype to Persistent Intelligence

Imagine an agent tasked with managing complex projects, learning from past failures, or even contributing to scientific discovery. A stateless agent is useless. It’s like a human waking up every day with complete amnesia. True intelligence requires the ability to recall, synthesize, and apply past experiences – our hippocampus equivalent. This isn't just about storing data; it's about semantic retrieval, understanding what is relevant to the current context.

Equally important is the prefrontal cortex analogue: executive oversight. While we strive for autonomous agents, allowing them to execute high-impact actions (e.g., deploying code, sending critical emails, performing financial transactions) without supervision is irresponsible. We need safety rails, a mechanism for human review and approval at critical junctures. This is where HITL comes in, ensuring the agent’s actions align with human intent and ethical boundaries.

My vision for an MoE-driven agent is that each "expert" doesn't just process information; it learns from its past interactions, storing and retrieving relevant knowledge. Furthermore, a dedicated "Safety Expert" within the MoE orchestrates the interaction with the human, pausing execution when necessary.

Part 1: The Agent's Hippocampus – Semantic Long-Term Memory

Our goal is not just a key-value store. We want semantic memory, where the agent can query concepts and retrieve related experiences, even if the exact keywords aren't present. This mirrors how our brains make associations.

Architecture: Embeddings + Vector Database

The core idea is simple:

  1. Embed: Transform textual experiences (thoughts, observations, conversations) into dense numerical vectors using an embedding model. These vectors capture the semantic meaning.
  2. Store: Persist these vectors in a specialized database optimized for similarity search – a vector database.
  3. Retrieve: When the agent needs to recall, it embeds its query and searches the vector database for semantically similar vectors.

For performance, choosing the right embedding model and an efficient vector store is paramount. Forget the abstraction layers that add latency; we're talking direct API calls.

// interfaces/memory.ts
export interface MemoryRecord {
    id: string;
    content: string;
    timestamp: number;
    metadata?: Record<string, any>;
    embedding?: number[]; // Stored for retrieval if necessary, but primarily for DB
}
 
export interface IVectorStoreClient {
    /**
     * Stores a batch of memory records after embedding their content.
     * @param records Records to store.
     * @returns Promise resolving to void.
     */
    add(records: Omit<MemoryRecord, 'id' | 'embedding'>[]): Promise<void>;
 
    /**
     * Retrieves the top_k most semantically similar records to the query.
     * @param query The text query to search for.
     * @param topK The number of similar records to retrieve.
     * @returns Promise resolving to an array of MemoryRecord.
     */
    search(query: string, topK: number): Promise<MemoryRecord[]>;
}

Implementation: A DirectVectorStore Example

We'll assume a Cohere or OpenAI embeddings API and a vector database like Qdrant or even a local Faiss index for high-throughput, low-latency applications.

// services/embeddingService.ts
import axios from 'axios';
 
class EmbeddingService {
    private readonly apiKey: string;
    private readonly apiUrl: string;
 
    constructor(apiKey: string, apiUrl: string = "https://api.openai.com/v1/embeddings") {
        this.apiKey = apiKey;
        this.apiUrl = apiUrl;
    }
 
    /**
     * Generates embeddings for a batch of texts.
     * Emphasize batching for efficiency!
     */
    public async embedBatch(texts: string[]): Promise<number[][]> {
        if (texts.length === 0) return [];
        try {
            const response = await axios.post(
                this.apiUrl,
                {
                    model: "text-embedding-3-small", // Or other performant model
                    input: texts,
                },
                {
                    headers: {
                        'Authorization': `Bearer ${this.apiKey}`,
                        'Content-Type': 'application/json',
                    },
                }
            );
            return response.data.data.map((item: any) => item.embedding);
        } catch (error) {
            console.error("Error generating embeddings:", error);
            throw new Error("Failed to generate embeddings.");
        }
    }
}
 
// services/vectorStoreClient.ts
import { v4 as uuidv4 } from 'uuid';
import { IVectorStoreClient, MemoryRecord } from '../interfaces/memory';
import { EmbeddingService } from './embeddingService';
import { QdrantClient } from '@qdrant/js-client-rest'; // Example: Using Qdrant
 
export class QdrantVectorStoreClient implements IVectorStoreClient {
    private readonly embeddingService: EmbeddingService;
    private readonly qdrantClient: QdrantClient;
    private readonly collectionName: string;
 
    constructor(embeddingService: EmbeddingService, qdrantHost: string, collectionName: string = "agent_memories") {
        this.embeddingService = embeddingService;
        this.qdrantClient = new QdrantClient({ host: qdrantHost });
        this.collectionName = collectionName;
        this.initializeCollection().catch(console.error); // Ensure collection exists
    }
 
    private async initializeCollection() {
        const collectionExists = (await this.qdrantClient.getCollections()).collections.some(
            (c) => c.name === this.collectionName
        );
        if (!collectionExists) {
            await this.qdrantClient.createCollection(this.collectionName, {
                vectors_config: { size: 1536, distance: 'Cosine' }, // Match embedding model dimension
            });
            console.log(`Qdrant collection '${this.collectionName}' created.`);
        }
    }
 
    public async add(records: Omit<MemoryRecord, 'id' | 'embedding'>[]): Promise<void> {
        if (records.length === 0) return;
 
        const contents = records.map(r => r.content);
        const embeddings = await this.embeddingService.embedBatch(contents);
 
        const points = records.map((record, index) => ({
            id: uuidv4(),
            vector: embeddings[index],
            payload: {
                content: record.content,
                timestamp: record.timestamp,
                metadata: record.metadata,
            },
        }));
 
        await this.qdrantClient.upsert(this.collectionName, {
            wait: true,
            batch: {
                ids: points.map(p => p.id),
                vectors: points.map(p => p.vector),
                payloads: points.map(p => p.payload),
            },
        });
    }
 
    public async search(query: string, topK: number): Promise<MemoryRecord[]> {
        if (!query) return [];
 
        const [queryEmbedding] = await this.embeddingService.embedBatch([query]);
 
        const searchResult = await this.qdrantClient.search(this.collectionName, {
            vector: queryEmbedding,
            limit: topK,
            with_payload: true,
        });
 
        return searchResult.map(result => ({
            id: result.id.toString(),
            content: (result.payload as any).content,
            timestamp: (result.payload as any).timestamp,
            metadata: (result.payload as any).metadata,
            // score: result.score, // Could include score if needed
        }));
    }
}

Within our MoE agent, a MemoryManager expert would be responsible for deciding when to store new memories (e.g., after significant observations, successful tool executions, or complex reasoning steps) and what to store (summarized insights, raw observations). It would also abstract the search calls for other experts. This keeps the MemoryManager focused and efficient.

Part 2: The Agent's Prefrontal Cortex – Human-in-the-Loop for Critical Decisions

Even with perfect memory and reasoning, agents need human oversight for high-stakes actions. This requires the agent to pause, externalize its proposed action, wait for human input, and then proceed or modify its plan based on that input.

While my preference leans towards raw APIs, managing complex, branching agent workflows often benefits from a graph-based state machine. LangGraph offers a decent way to define these workflows, although it can introduce some verbosity. We'll leverage its state management and checkpointing capabilities, while keeping the actual "nodes" as direct and performant as possible.

Architecture: LangGraph + Persistent State + External API

  1. Agent State: Define a comprehensive state that includes the agent's current thoughts, observations, planned actions, and crucially, flags for human approval.
  2. Workflow Graph: Define the agent's decision-making process as a graph using LangGraph.
  3. Approval Node: Introduce a special node that, when reached with a critical action, flags the agent's state as awaitingHumanApproval and saves a checkpoint.
  4. External API: A dedicated endpoint (e.g., a REST API or WebSocket) receives the proposed action, presents it to a human, and then sends back the human's decision.
  5. Resume: Upon receiving human approval/rejection, the agent's state is updated, and the LangGraph execution is resumed from the checkpoint.

Implementation: State & Graph Definition

// types/agent.ts
export interface AgentState {
    input: string; // User query
    scratchpad: string[]; // Internal monologue/reasoning steps
    tool_calls: { tool_name: string; args: Record<string, any>; }[]; // Tools agent intends to call
    final_answer?: string; // If agent has a final answer
    awaitingHumanApproval: boolean; // Flag for HITL
    proposedAction?: {
        type: 'tool_call' | 'final_answer';
        details: any;
    };
    humanFeedback?: 'approve' | 'reject' | 'modify'; // What human decided
    modificationDetails?: string; // If human requested modification
}
 
// Initial state factory
export const initialAgentState: () => AgentState = () => ({
    input: '',
    scratchpad: [],
    tool_calls: [],
    awaitingHumanApproval: false,
});

Now, let's sketch the LangGraph integration. This requires langchain_core and langgraph.

// agentGraph.ts
import { BaseMessage, HumanMessage, ToolMessage } from '@langchain/core/messages';
import { StateGraphArgs, END, StateGraph } from '@langchain/langgraph';
import { AgentState, initialAgentState } from './types/agent';
import { IVectorStoreClient } from './interfaces/memory';
import { QdrantVectorStoreClient } from './services/vectorStoreClient';
import { EmbeddingService } from './services/embeddingService';
import { ChatOpenAI } from '@langchain/openai'; // Using a direct LLM client
 
// --- Mock Tools for demonstration ---
interface Tool {
    name: string;
    description: string;
    schema: any; // Zod schema
    func: (args: any) => Promise<string>;
    isCritical?: boolean; // New flag for HITL
}
 
const safeTool: Tool = {
    name: "read_database",
    description: "Reads non-sensitive data from a simulated database.",
    schema: { type: 'object', properties: { query: { type: 'string' } } },
    func: async (args: { query: string }) => {
        console.log(`Executing safe tool: read_database with query: ${args.query}`);
        return `Data for "${args.query}" is: [itemA, itemB]`;
    },
    isCritical: false,
};
 
const criticalTool: Tool = {
    name: "deploy_code",
    description: "Deploys code to production. This is a critical operation.",
    schema: { type: 'object', properties: { project_id: { type: 'string' }, version: { type: 'string' } } },
    func: async (args: { project_id: string; version: string }) => {
        console.warn(`CRITICAL: Deploying project ${args.project_id} version ${args.version}...`);
        return `Project ${args.project_id} version ${args.version} deployed successfully.`;
    },
    isCritical: true,
};
const tools: Tool[] = [safeTool, criticalTool];
// --- End Mock Tools ---
 
const llm = new ChatOpenAI({
    model: "gpt-4o-mini", // Or other performant model
    temperature: 0.7,
});
 
// Bind tools to the LLM (LangChain specific)
const llmWithTools = llm.bindTools(tools);
 
// --- Agent Nodes ---
 
// The main agent 'think' node (our MoE orchestrator/expert)
const agentThinkNode = async (state: AgentState): Promise<Partial<AgentState>> => {
    console.log("[AGENT] Thinking...");
    const messages: BaseMessage[] = [
        new HumanMessage(state.input),
        ...state.scratchpad.map(s => new BaseMessage({ content: s, name: 'scratchpad_entry' })), // Example: add scratchpad to context
    ];
 
    // Simulate retrieval from long-term memory for relevant context
    // In a real MoE, a 'MemoryExpert' would handle this.
    const relevantMemories = await memoryClient.search(state.input, 2);
    if (relevantMemories.length > 0) {
        messages.unshift(new HumanMessage(`Past relevant experiences: ${relevantMemories.map(m => m.content).join('\n')}`));
    }
 
    const response = await llmWithTools.invoke(messages);
    const toolCalls = response.tool_calls || [];
    const content = response.content || '';
 
    // Simulate the 'SafetyExpert' determining if an action is critical
    if (toolCalls.length > 0) {
        // Check if any proposed tool call is critical
        const criticalCall = toolCalls.find(tc => tools.find(t => t.name === tc.name)?.isCritical);
        if (criticalCall) {
            return {
                scratchpad: [...state.scratchpad, `Proposed critical tool call: ${JSON.stringify(criticalCall)}`],
                awaitingHumanApproval: true,
                proposedAction: { type: 'tool_call', details: criticalCall },
            };
        }
    }
 
    if (content.includes("FINAL ANSWER")) { // Simple heuristic for final answer
        return {
            scratchpad: [...state.scratchpad, content],
            final_answer: content.replace("FINAL ANSWER:", "").trim(),
            awaitingHumanApproval: true, // Also require approval for final answer if critical enough
            proposedAction: { type: 'final_answer', details: content },
        };
    }
 
    return {
        scratchpad: [...state.scratchpad, content],
        tool_calls: toolCalls.map(tc => ({ tool_name: tc.name, args: tc.args })),
    };
};
 
// Node to execute tools
const executeToolsNode = async (state: AgentState): Promise<Partial<AgentState>> => {
    console.log("[AGENT] Executing tools...");
    const toolResults: string[] = [];
    for (const toolCall of state.tool_calls) {
        const tool = tools.find(t => t.name === toolCall.tool_name);
        if (tool) {
            try {
                const result = await tool.func(toolCall.args);
                toolResults.push(`Tool ${toolCall.tool_name} returned: ${result}`);
            } catch (e) {
                toolResults.push(`Tool ${toolCall.tool_name} failed: ${e}`);
            }
        } else {
            toolResults.push(`Unknown tool: ${toolCall.tool_name}`);
        }
    }
 
    // Agent remembers the outcome of its actions
    await memoryClient.add([{
        content: `Agent executed tools: ${state.tool_calls.map(tc => tc.tool_name).join(', ')}. Results: ${toolResults.join('; ')}`,
        timestamp: Date.now(),
        metadata: { type: 'tool_execution' }
    }]);
 
    return {
        scratchpad: [...state.scratchpad, ...toolResults],
        tool_calls: [], // Clear tool calls after execution
    };
};
 
// --- LangGraph Setup ---
const graphState: StateGraphArgs<AgentState>['channels'] = {
    input: {
        value: (x: string, y: string) => y, // Overwrite
        default: () => '',
    },
    scratchpad: {
        value: (x: string[], y: string[]) => x.concat(y), // Append
        default: () => [],
    },
    tool_calls: {
        value: (x: any[], y: any[]) => y, // Overwrite
        default: () => [],
    },
    final_answer: {
        value: (x: string | undefined, y: string | undefined) => y, // Overwrite
        default: () => undefined,
    },
    awaitingHumanApproval: {
        value: (x: boolean, y: boolean) => y, // Overwrite
        default: () => false,
    },
    proposedAction: {
        value: (x: any | undefined, y: any | undefined) => y, // Overwrite
        default: () => undefined,
    },
    humanFeedback: {
        value: (x: 'approve' | 'reject' | 'modify' | undefined, y: 'approve' | 'reject' | 'modify' | undefined) => y,
        default: () => undefined,
    },
    modificationDetails: {
        value: (x: string | undefined, y: string | undefined) => y,
        default: () => undefined,
    },
};
 
const workflow = new StateGraph({
    channels: graphState,
})
    .addNode("agent_think", agentThinkNode)
    .addNode("execute_tools", executeToolsNode);
 
// Define conditional edges for dynamic workflow
workflow.addConditionalEdges(
    "agent_think",
    (state: AgentState) => {
        if (state.awaitingHumanApproval) {
            return "await_human_approval"; // Special state indicating pause
        }
        if (state.tool_calls.length > 0) {
            return "execute_tools";
        }
        if (state.final_answer) {
            return END;
        }
        return "agent_think"; // Loop back to think if no tools/final answer
    }
);
 
workflow.addConditionalEdges(
    "execute_tools",
    (state: AgentState) => {
        if (state.final_answer) {
            return END;
        }
        return "agent_think"; // After tools, go back to thinking
    }
);
 
workflow.setEntryPoint("agent_think");
 
// Compile the graph
const app = workflow.compile();
 
// Initialize memory client (global or passed into nodes)
const embeddingSvc = new EmbeddingService(process.env.OPENAI_API_KEY!);
const memoryClient = new QdrantVectorStoreClient(embeddingSvc, process.env.QDRANT_HOST || 'localhost:6333');
 
// --- External API for Human-in-the-Loop Management ---
// This would typically be a separate microservice or API endpoint.
// It retrieves the paused agent's state, presents it, and sends feedback.
 
// Example of how human feedback would resume the agent
export async function provideHumanFeedback(threadId: string, feedback: 'approve' | 'reject' | 'modify', modification?: string) {
    // In a real system, you'd load the current state of the thread/agent from LangGraph's checkpoint.
    // For demonstration, we'll simulate updating the state and resuming.
 
    // This is the critical part: LangGraph needs to be able to load the checkpoint
    // and continue execution with the updated state.
    // When LangGraph encounters a "return 'await_human_approval'" it implicitly
    // saves the state to its configured checkpointing mechanism (e.g., Redis, SQL).
 
    // To resume, you'd call `app.invoke` again with the thread_id and the
    // *updated* state for `humanFeedback` and `modificationDetails`.
    // LangGraph's checkpointing automatically handles loading the paused state.
 
    // We modify the *input* for the next step of the agent.
    const feedbackState: Partial<AgentState> = {
        awaitingHumanApproval: false, // Reset approval flag
        humanFeedback: feedback,
        modificationDetails: modification,
        // The agent's next 'think' cycle will process this feedback
        input: `Human feedback received: ${feedback}. ${modification ? `Modification requested: ${modification}` : ''}`,
    };
 
    // This is where you would call LangGraph's `invoke` with the `threadId`
    // and the `feedbackState` as the updated state. LangGraph will then
    // load the checkpointed state for `threadId`, merge `feedbackState`,
    // and continue execution.
    // (Actual call depends on how LangGraph's checkpointing is configured and exposed)
    console.log(`[HITL] Human feedback for thread ${threadId}: ${feedback}. Modification: ${modification || 'none'}`);
    console.log("This feedback will now be fed back into the agent's workflow.");
 
    // In a production setup, you'd use a LangGraph `checkpoint_id` and resume
    // from there. For example:
    // const stream = app.stream(feedbackState, {
    //   configurable: { thread_id: threadId }
    // });
    // This would pick up from the last checkpoint and continue processing.
}
 
// --- Main execution flow (example) ---
async function runAgentConversation(threadId: string, prompt: string) {
    let currentState: AgentState = initialAgentState();
    currentState.input = prompt;
 
    console.log(`[USER] ${prompt}`);
 
    // LangGraph with configurable thread_id will manage checkpoints
    const stream = app.stream(currentState, {
        configurable: { thread_id: threadId },
    });
 
    for await (const s of stream) {
        if (s.__end__) {
            console.log("[AGENT] Workflow finished.");
            currentState = s.__end__ as AgentState;
            break;
        }
 
        const newNode = Object.keys(s).filter(key => key !== '__end__')[0];
        console.log(`[GRAPH] Current node: ${newNode}`);
        currentState = s[newNode] as AgentState;
 
        if (currentState.awaitingHumanApproval) {
            console.warn(`[HITL REQUIRED] Agent proposed action: ${JSON.stringify(currentState.proposedAction)}. Waiting for human approval for thread ${threadId}...`);
            // Here, the system would pause.
            // A separate UI/API would pick up this state, show it to a human.
            // The human would then call `provideHumanFeedback(threadId, 'approve' | 'reject' | 'modify', modificationDetails)`.
            // The `stream` would then continue from the checkpoint.
            return currentState; // Exit here for simulation of waiting
        }
    }
 
    if (currentState.final_answer) {
        console.log(`[AGENT FINAL ANSWER] ${currentState.final_answer}`);
    } else {
        console.log("[AGENT] No final answer reached, current scratchpad:", currentState.scratchpad);
    }
    return currentState;
}
 
// Example Usage (requires setting OPENAI_API_KEY and QDRANT_HOST in .env or environment)
// import { config } from 'dotenv';
// config();
 
// (async () => {
//     const thread1 = "user_123";
//     let state;
 
//     // First interaction: non-critical tool
//     state = await runAgentConversation(thread1, "What is the capital of France? Also, read some database entries about 'users'.");
//     // Agent should run read_database and return answer
 
//     // Second interaction: critical tool
//     // state = await runAgentConversation(thread1, "Deploy project 'frontend-app' version '1.2.0' to production.");
//     // Agent should pause here, setting `awaitingHumanApproval: true`
 
//     // To simulate approval (after the above call returns with awaitingHumanApproval)
//     // if (state && state.awaitingHumanApproval) {
//     //     console.log("Simulating human approval after 5 seconds...");
//     //     await new Promise(resolve => setTimeout(resolve, 5000));
//     //     await provideHumanFeedback(thread1, 'approve');
//     //     console.log("Resuming agent after human approval...");
//     //     await runAgentConversation(thread1, "Continue with deployment."); // Agent will pick up from checkpoint
//     // }
// })();
 

This is a simplified example. In a real production system:

  • LangGraph's checkpointing would be configured with a persistent store (e.g., RedisSupervisor).
  • The app.stream would be run within an asynchronous worker (e.g., a message queue consumer).
  • When awaitingHumanApproval is true, the worker would push the agent's state to a dedicated queue or API, notifying a human approval system.
  • The human approval system would then trigger provideHumanFeedback, which would put a message back into the worker's queue, including the thread_id and the humanFeedback, prompting app.stream to resume.

What I Learned: Control, Complexity, and Cognitive Architectures

Building production-grade agents forces a shift in perspective. It's no longer just about chaining LLM calls; it's about robust state management, efficient data flow, and graceful failure modes.

  1. The Framework Tax vs. Raw Power: While LangGraph provides a structural advantage for complex state machines, its verbosity and overhead can be frustrating. For core logic like embedding and vector search, direct API calls are indispensable for performance and granular control. This confirms my bias: abstract where it benefits, but get down to the metal where performance or unique behavior is critical.
  2. State is Everything: Managing mutable state across asynchronous steps, especially with external human interaction, is inherently complex. A clear AgentState definition and reliable checkpointing are non-negotiable.
  3. The Necessity of Specialization: The concept of MoE isn't just for LLMs. It's a powerful architectural pattern for agents. A dedicated MemoryExpert and SafetyExpert (which triggers HITL) encapsulate distinct functionalities, making the system more modular, testable, and robust.
  4. The Human Factor: Implementing HITL isn't just about code; it requires designing effective user interfaces and clear communication protocols for human operators. The agent needs to present its reasoning and proposed action concisely for humans to make informed decisions quickly.

This journey, constantly comparing artificial systems to the elegant complexity of the human brain, reinforces my belief that we are only scratching the surface. Long-term memory and executive control are fundamental for any truly intelligent system. By building these foundational elements with performance and precision, we move one step closer to agents that don't just mimic intelligence, but embody it. The next frontiers involve even more dynamic memory consolidation, learning from human feedback loops, and truly adaptive MoE routing.