Part 4: Architecting Agent Teams - Hierarchical Workflows and Graph Composition
Part 4: Architecting Agent Teams - Hierarchical Workflows and Graph Composition
Building truly sophisticated AI systems isn't just about crafting a single brilliant agent; it's about enabling teams of specialized agents to collaborate, delegate, and synthesize information effectively. This is where the analogy of the human brain, with its vast network of specialized regions coordinating to achieve complex goals, becomes incredibly compelling. My long-term goal to replicate such a system through a Mixture of Experts (MoE) architecture demands a robust approach to agent orchestration.
Why: Beyond Flat Chains – The Need for Hierarchical Intelligence
Simple agent "chains" or linear workflows quickly hit their ceiling when tackling complex, real-world problems. Imagine trying to write a research paper with a single person handling everything from literature review to data analysis, coding, writing, and editing. It's inefficient, prone to errors, and rarely produces optimal results. Humans, by contrast, excel at breaking down problems, delegating sub-tasks to specialists, and then integrating their findings.
This hierarchical, collaborative model is precisely what we need for advanced AI agents. We require a "manager" agent capable of:
- Decomposing a complex task into manageable sub-tasks.
- Routing these sub-tasks to the most appropriate "worker" agents or specialized workflows.
- Orchestrating their execution, managing state, and handling communication.
- Synthesizing the outputs from workers into a coherent final result.
Many existing "agent frameworks" attempt this, but often fall short. They frequently introduce layers of abstraction that obscure the underlying logic, leading to performance bottlenecks and opaque state management. As a competitive programmer, I value performance and clarity above all else. Bloated frameworks that dictate too much of how I should build (looking at you, LangChain and its ilk) are often counterproductive. I prefer raw API access and direct control, especially when dealing with critical state transitions. We need to design our own composable, performant graph execution system.
This hierarchical approach isn't just a design pattern; it's a step towards emulating the brain's modularity. Different cortical regions specialize in vision, language, motor control, or abstract reasoning, yet they dynamically activate and communicate through sophisticated neural pathways to achieve overarching goals. Our agent teams should reflect this dynamic, specialized, yet integrated workflow.
Architecture: Deconstructing Hierarchy with Custom Graphs
The core of our hierarchical agent system lies in dynamically composing and invoking specialized workflow graphs. Instead of relying on opinionated, heavy framework abstractions, we'll outline a direct, efficient approach.
The Communication Bus: WorkflowState
At the heart of any multi-agent system is shared state. This WorkflowState acts as our universal communication bus, carrying the entire context, inputs, and outputs across different levels of our hierarchy. It must be robust, extensible, and precisely typed.
// src/core/workflowState.ts
export type AgentStatus = 'PENDING' | 'RUNNING' | 'COMPLETED' | 'FAILED';
export interface WorkflowState {
// Global context for the entire workflow
overallTask: string;
overallResult: string | null;
overallStatus: AgentStatus;
// Current sub-task being processed by the Manager or a Worker
currentSubTask: {
id: string; // Unique ID for the sub-task
description: string;
assignedWorkerGraphId: string | null; // Which worker graph is responsible
input: Record<string, any> | null; // Input payload for the worker
output: Record<string, any> | null; // Output payload from the worker
status: AgentStatus;
errorMessage: string | null;
startTime: number;
endTime: number | null;
} | null;
// History of completed sub-tasks for traceability and error recovery
subTaskHistory: Array<Omit<NonNullable<WorkflowState['currentSubTask']>, 'input'>>; // Exclude large inputs from history
// Any global metadata or scratchpad space
metadata: Record<string, any>;
// User-specific information or persistent data
contextData: Record<string, any>;
}
// Initializer for a new workflow state
export function createInitialWorkflowState(overallTask: string, contextData: Record<string, any> = {}): WorkflowState {
return {
overallTask,
overallResult: null,
overallStatus: 'PENDING',
currentSubTask: null,
subTaskHistory: [],
metadata: {},
contextData,
};
}This WorkflowState is critical. It's the explicit contract between agents, ensuring that data is passed consistently and predictably. The currentSubTask field is particularly important, acting as a temporary "scratchpad" for the manager to prepare inputs for a worker, and for the worker to write its results.
The GraphEngine: Our Custom Orchestrator
Rather than fighting with frameworks, we define a simple, performant GraphEngine that can execute a directed acyclic graph (DAG) of nodes. Each node represents an atomic operation (LLM call, tool execution, decision, another graph invocation).
// src/core/graphEngine.ts
import { WorkflowState } from './workflowState';
// A NodeFunction takes the current state and returns a Promise resolving to the updated state.
export type NodeFunction = (state: WorkflowState) => Promise<WorkflowState>;
// A WorkflowGraph defines the nodes and their transitions.
export interface WorkflowGraph {
id: string; // Unique ID for the graph
nodes: Map<string, NodeFunction>; // Map of nodeName -> function
edges: Map<string, string | string[]>; // Map of nodeName -> nextNodeName(s) or decision logic
// Add more complex edge logic if needed (e.g., conditional transitions)
startNode: string;
}
// Global registry for all defined workflow graphs
export const graphRegistry = new Map<string, WorkflowGraph>();
// The core engine to execute a workflow graph
export async function runWorkflowGraph(graphId: string, initialState: WorkflowState): Promise<WorkflowState> {
const graph = graphRegistry.get(graphId);
if (!graph) {
throw new Error(`WorkflowGraph with ID '${graphId}' not found.`);
}
let currentState = { ...initialState }; // Ensure immutability for nodes
let currentNodeName = graph.startNode;
try {
while (currentNodeName) {
const nodeFn = graph.nodes.get(currentNodeName);
if (!nodeFn) {
throw new Error(`Node '${currentNodeName}' not found in graph '${graphId}'.`);
}
// Mark sub-task as running if applicable
if (currentState.currentSubTask && currentState.currentSubTask.assignedWorkerGraphId === graphId && currentState.currentSubTask.status === 'PENDING') {
currentState.currentSubTask.status = 'RUNNING';
currentState.currentSubTask.startTime = Date.now();
}
console.log(`[${graphId}] Executing node: ${currentNodeName}`);
currentState = await nodeFn(currentState); // Execute node, update state
const nextEdge = graph.edges.get(currentNodeName);
if (!nextEdge) {
// End of graph
currentNodeName = '';
} else if (typeof nextEdge === 'string') {
currentNodeName = nextEdge;
} else if (Array.isArray(nextEdge)) {
// Example: simple sequential multiple next nodes, or complex conditional routing
// For now, let's assume a simple sequential for simplicity or decision-based.
// A real implementation would have more sophisticated decision nodes.
throw new Error(`Complex edge logic for node '${currentNodeName}' not yet implemented.`);
}
// A more advanced engine would use a router node to pick from `string[]` or based on `currentState`
}
// Update overall status if this was the top-level graph
if (graphId === currentState.currentSubTask?.assignedWorkerGraphId) { // Check if this was a root worker
currentState.currentSubTask.status = 'COMPLETED';
currentState.currentSubTask.endTime = Date.now();
currentState.subTaskHistory.push({
...currentState.currentSubTask,
input: undefined // Clear input before pushing to history to save memory
});
currentState.currentSubTask = null; // Clear current sub-task after completion
}
return currentState;
} catch (error: any) {
console.error(`[${graphId}] Error during graph execution at node ${currentNodeName}:`, error);
if (currentState.currentSubTask && currentState.currentSubTask.assignedWorkerGraphId === graphId) {
currentState.currentSubTask.status = 'FAILED';
currentState.currentSubTask.errorMessage = error.message;
currentState.currentSubTask.endTime = Date.now();
currentState.subTaskHistory.push({
...currentState.currentSubTask,
input: undefined
});
currentState.currentSubTask = null;
}
currentState.overallStatus = 'FAILED';
currentState.overallResult = `Workflow failed: ${error.message}`;
return currentState;
}
}This GraphEngine is minimalistic but powerful. It directly executes nodes, passing the WorkflowState around. The key for hierarchy comes next: one node can invoke another WorkflowGraph.
The Manager Graph (MainOrchestratorGraph)
The manager's job is strategic:
- Decompose: Break down
overallTaskinto smaller, independent sub-tasks. - Route: Select the appropriate
WorkerGraphfor each sub-task. - Execute: Invoke the selected
WorkerGraphvia ourGraphEngine. - Synthesize: Combine results from workers.
// src/graphs/mainOrchestratorGraph.ts
import { WorkflowState, AgentStatus } from '../core/workflowState';
import { WorkflowGraph, NodeFunction, runWorkflowGraph, graphRegistry } from '../core/graphEngine';
import { generateUniqueId } from '../utils/idGenerator'; // Simple ID generator utility
import { llmCall } from '../utils/llmApi'; // Mock LLM API
// --- Manager Nodes ---
const decomposeTaskNode: NodeFunction = async (state) => {
// Use LLM to break down the overall task
const prompt = `Given the overall task: "${state.overallTask}", identify the essential sub-tasks required to complete it.
Output a JSON array of objects, where each object has 'id', 'description', and 'requiredWorkerGraph' (e.g., 'CodeGenerator', 'DataAnalyzer', 'ReportWriter').
Example:
[
{ "id": "task_1", "description": "Research market trends", "requiredWorkerGraph": "DataAnalyzer" },
{ "id": "task_2", "description": "Generate Python script for analysis", "requiredWorkerGraph": "CodeGenerator" }
]`;
const response = await llmCall(prompt, state.contextData);
const subTasks = JSON.parse(response); // Assume LLM provides valid JSON
// Store decomposed tasks in metadata for sequential processing
state.metadata.pendingSubTasks = subTasks;
state.overallStatus = 'RUNNING';
return state;
};
const expertRouterNode: NodeFunction = async (state) => {
if (!state.metadata.pendingSubTasks || state.metadata.pendingSubTasks.length === 0) {
// All sub-tasks processed, move to synthesis
return { ...state, metadata: { ...state.metadata, nextManagerAction: 'synthesize' } };
}
const nextTask = state.metadata.pendingSubTasks.shift(); // Get next task
if (!nextTask) throw new Error("No pending sub-tasks, but router node was called.");
// Prepare current sub-task for worker invocation
state.currentSubTask = {
id: nextTask.id,
description: nextTask.description,
assignedWorkerGraphId: nextTask.requiredWorkerGraph,
input: { taskDescription: nextTask.description, context: state.contextData }, // Input for worker
output: null,
status: 'PENDING',
errorMessage: null,
startTime: 0, // Will be set by worker graph
endTime: null,
};
return { ...state, metadata: { ...state.metadata, nextManagerAction: 'invokeWorker' } };
};
const invokeWorkerGraphNode: NodeFunction = async (state) => {
if (!state.currentSubTask || !state.currentSubTask.assignedWorkerGraphId) {
throw new Error("Attempted to invoke worker without a defined currentSubTask or assignedWorkerGraphId.");
}
const workerGraphId = state.currentSubTask.assignedWorkerGraphId;
console.log(`Manager invoking worker graph: ${workerGraphId} for sub-task: ${state.currentSubTask.id}`);
// Create a new state instance for the worker to operate on, containing only relevant input
let workerState: WorkflowState = {
overallTask: state.currentSubTask.description, // Worker sees its specific task as 'overall'
overallResult: null,
overallStatus: 'PENDING',
currentSubTask: { // This *is* the sub-task for the manager, but the worker treats it as its root
id: generateUniqueId(), // Worker gets its own root sub-task ID
description: state.currentSubTask.description,
assignedWorkerGraphId: workerGraphId,
input: state.currentSubTask.input,
output: null,
status: 'PENDING',
errorMessage: null,
startTime: Date.now(),
endTime: null,
},
subTaskHistory: [], // Worker starts with fresh history
metadata: {},
contextData: state.contextData, // Pass down relevant context
};
// --- CRITICAL: Execute the worker graph as a sub-process ---
workerState = await runWorkflowGraph(workerGraphId, workerState);
// After worker completes, update manager's state
if (!state.currentSubTask) throw new Error("Current sub-task disappeared after worker invocation.");
state.currentSubTask.output = workerState.currentSubTask?.output || null;
state.currentSubTask.status = workerState.currentSubTask?.status || 'FAILED';
state.currentSubTask.errorMessage = workerState.currentSubTask?.errorMessage || null;
state.currentSubTask.endTime = workerState.currentSubTask?.endTime || Date.now();
// Add completed sub-task to manager's history
state.subTaskHistory.push({
...state.currentSubTask,
input: undefined // Clear input before pushing to history
});
// Clear currentSubTask for the manager to pick the next one
state.currentSubTask = null;
return state;
};
const synthesizeResultNode: NodeFunction = async (state) => {
// Collect all outputs from sub-task history
const workerOutputs = state.subTaskHistory.map(task => ({
id: task.id,
description: task.description,
output: task.output,
status: task.status
}));
const prompt = `Synthesize the following sub-task results into a comprehensive final answer for the overall task: "${state.overallTask}"
Sub-task results: ${JSON.stringify(workerOutputs, null, 2)}
Provide a concise, professional summary.`;
state.overallResult = await llmCall(prompt, state.contextData);
state.overallStatus = 'COMPLETED';
return state;
};
// Define the MainOrchestratorGraph
export const MainOrchestratorGraph: WorkflowGraph = {
id: 'MainOrchestrator',
startNode: 'decomposeTask',
nodes: new Map([
['decomposeTask', decomposeTaskNode],
['expertRouter', expertRouterNode],
['invokeWorkerGraph', invokeWorkerGraphNode],
['synthesizeResult', synthesizeResultNode],
]),
edges: new Map([
['decomposeTask', 'expertRouter'],
// Dynamic routing based on 'nextManagerAction' in metadata
['expertRouter', (state: WorkflowState) => state.metadata.nextManagerAction === 'invokeWorker' ? 'invokeWorkerGraph' : 'synthesizeResult'],
['invokeWorkerGraph', 'expertRouter'], // After a worker finishes, go back to router for next task
['synthesizeResult', null], // End of graph
]) as Map<string, string | ((state: WorkflowState) => string | null)>, // Type assertion for dynamic edges
};
graphRegistry.set(MainOrchestratorGraph.id, MainOrchestratorGraph);Note on Dynamic Edges: My GraphEngine edges definition (Map<string, string | string[]>) is basic. For the manager, I've shown a more advanced edges map with a function to simulate conditional routing. A robust GraphEngine would inherently support this via a dedicated RouterNode or ConditionalEdge type.
Worker Graphs (CodeGenerationGraph, DataAnalysisGraph, etc.)
Worker graphs are specialized, focused WorkflowGraph instances. They receive their specific task and input via WorkflowState.currentSubTask.input (when invoked by the manager) and write their output to WorkflowState.currentSubTask.output.
// src/graphs/codeGenerationGraph.ts
import { WorkflowState } from '../core/workflowState';
import { WorkflowGraph, NodeFunction, graphRegistry } from '../core/graphEngine';
import { llmCall } from '../utils/llmApi'; // Mock LLM API
import { runPythonCode } from '../utils/codeExecutor'; // Mock code executor
// --- Code Generation Worker Nodes ---
const planCodeNode: NodeFunction = async (state) => {
if (!state.currentSubTask || !state.currentSubTask.input) {
throw new Error("Code generation worker requires sub-task input.");
}
const taskDescription = state.currentSubTask.input.taskDescription;
const prompt = `Given the task: "${taskDescription}", outline a Python script plan (steps, libraries, expected output).`;
const plan = await llmCall(prompt, state.contextData);
state.metadata.codePlan = plan;
return state;
};
const generateCodeNode: NodeFunction = async (state) => {
if (!state.currentSubTask || !state.metadata.codePlan) {
throw new Error("Code generation worker needs a plan.");
}
const taskDescription = state.currentSubTask.input.taskDescription;
const plan = state.metadata.codePlan;
const prompt = `Based on this plan: "${plan}" and the task: "${taskDescription}", generate the full Python code.`;
const code = await llmCall(prompt, state.contextData);
state.metadata.generatedCode = code;
return state;
};
const executeCodeNode: NodeFunction = async (state) => {
if (!state.currentSubTask || !state.metadata.generatedCode) {
throw new Error("Code execution worker needs generated code.");
}
const code = state.metadata.generatedCode;
try {
const executionResult = await runPythonCode(code, state.currentSubTask.input.context);
state.currentSubTask.output = {
code: code,
result: executionResult,
success: true,
};
} catch (error: any) {
state.currentSubTask.output = {
code: code,
result: null,
success: false,
error: error.message,
};
throw error; // Propagate error for manager to handle
}
return state;
};
export const CodeGenerationGraph: WorkflowGraph = {
id: 'CodeGenerator',
startNode: 'planCode',
nodes: new Map([
['planCode', planCodeNode],
['generateCode', generateCodeNode],
['executeCode', executeCodeNode],
]),
edges: new Map([
['planCode', 'generateCode'],
['generateCode', 'executeCode'],
['executeCode', null], // End of graph
]),
};
graphRegistry.set(CodeGenerationGraph.id, CodeGenerationGraph);
// --- Dummy Data Analysis Worker (for illustration) ---
const analyzeDataNode: NodeFunction = async (state) => {
if (!state.currentSubTask || !state.currentSubTask.input) {
throw new Error("Data analysis worker requires sub-task input.");
}
const taskDescription = state.currentSubTask.input.taskDescription;
console.log(`Simulating data analysis for: ${taskDescription}`);
// Simulate some work
await new Promise(resolve => setTimeout(resolve, 1500));
state.currentSubTask.output = {
summary: `Analysis complete for "${taskDescription}". Key finding: Data shows a 15% increase in XYZ.`,
rawOutput: { /* large data payload */ },
};
return state;
};
export const DataAnalysisGraph: WorkflowGraph = {
id: 'DataAnalyzer',
startNode: 'analyzeData',
nodes: new Map([
['analyzeData', analyzeDataNode],
]),
edges: new Map([
['analyzeData', null],
]),
};
graphRegistry.set(DataAnalysisGraph.id, DataAnalysisGraph);
Putting It All Together: The Main Execution Flow
// src/main.ts
import { createInitialWorkflowState } from './core/workflowState';
import { runWorkflowGraph, graphRegistry } from './core/graphEngine';
import { MainOrchestratorGraph } from './graphs/mainOrchestratorGraph'; // This registers itself
import { CodeGenerationGraph } from './graphs/codeGenerationGraph'; // This registers itself
import { DataAnalysisGraph } from './graphs/dataAnalysisGraph'; // This registers itself
async function main() {
console.log("Starting hierarchical agent workflow...");
const initialTask = "Analyze sales data for Q3 2023, generate a report, and identify top-performing products. Also, write a Python script to automate weekly sales report generation.";
const userContext = { user: "Alice", preferences: { format: "markdown" } };
let state = createInitialWorkflowState(initialTask, userContext);
try {
state = await runWorkflowGraph(MainOrchestratorGraph.id, state);
console.log("\n--- Workflow Completed ---");
console.log("Overall Status:", state.overallStatus);
console.log("Overall Result:", state.overallResult);
console.log("Sub-task History:");
state.subTaskHistory.forEach(task => {
console.log(`- [${task.status}] ${task.description} (${task.assignedWorkerGraphId})`);
console.log(` Output: ${JSON.stringify(task.output, null, 2).slice(0, 200)}...`); // Truncate for display
});
} catch (error) {
console.error("\n--- Workflow Failed ---");
console.error("Final State:", state);
console.error("Error:", error);
}
}
main();
// Mock LLM and Code Executor (for demonstration)
// In a real system, these would be proper API calls.
export const llmCall = async (prompt: string, context: Record<string, any>): Promise<string> => {
console.log(`\n--- LLM Call ---`);
console.log(`Prompt: ${prompt.slice(0, 300)}...`);
// Simulate LLM processing time
await new Promise(resolve => setTimeout(resolve, 500));
if (prompt.includes("identify the essential sub-tasks")) {
return JSON.stringify([
{ "id": "sub_1", "description": "Analyze Q3 2023 sales data to identify top products", "requiredWorkerGraph": "DataAnalyzer" },
{ "id": "sub_2", "description": "Generate Python script for weekly sales report automation", "requiredWorkerGraph": "CodeGenerator" },
{ "id": "sub_3", "description": "Write a summary report based on analysis and script", "requiredWorkerGraph": "ReportWriter" } // Assuming another worker
]);
} else if (prompt.includes("outline a Python script plan")) {
return "Plan: 1. Load sales data. 2. Aggregate by product. 3. Identify top N. 4. Format report. Libraries: pandas, openpyxl.";
} else if (prompt.includes("generate the full Python code")) {
return "import pandas as pd\\ndef generate_sales_report(data_path):\\n df = pd.read_excel(data_path)\\n # ... (rest of the code)\\n return 'Report Generated'";
} else if (prompt.includes("Synthesize the following sub-task results")) {
return `Comprehensive report for Q3 2023:\n1. Top-performing products identified through data analysis.\n2. Automated script for future reports successfully generated and ready for deployment.`;
}
return "LLM generated a generic response.";
};
export const runPythonCode = async (code: string, context: Record<string, any>): Promise<string> => {
console.log(`\n--- Code Execution ---`);
console.log(`Executing code: ${code.slice(0, 100)}...`);
await new Promise(resolve => setTimeout(resolve, 1000));
return "Code executed successfully. Report data generated.";
};
export const generateUniqueId = (): string => `id_${Date.now()}_${Math.random().toFixed(5).replace('0.', '')}`;(Note: The ReportWriter graph is referenced but not fully implemented to keep the example concise. The graphRegistry ensures all workers are known.)
What I Learned: Modularity, Performance, and the Brain's Whisper
This journey into hierarchical agent architectures has reinforced several critical principles and deepened my understanding of intelligent systems:
- Modularity is Power: Breaking down complex problems into smaller, specialized sub-graphs dramatically improves maintainability, debugging, and scalability. Each worker graph is a self-contained expert. This directly mirrors the brain's specialized cortical regions, each processing specific types of information (visual, auditory, motor) but working in concert.
- State as the Ultimate Contract: A meticulously designed
WorkflowStateis not merely a data structure; it's the API contract between every node and every graph. It dictates how information flows, ensuring consistency and preventing silent failures. The explicit typing in TypeScript is a lifesaver here, preventing many runtime errors. - Performance Through Direct Control: By building our own
GraphEnginerather than relying on heavy, opinionated frameworks, we gain complete control over execution flow and state management. This translates directly to reduced overhead, faster execution cycles, and a deeper understanding of the system's inner workings – crucial for AI research where iterative experimentation is key. Every millisecond counts. - The MoE Connection Deepens: This architecture isn't just about agent teams; it's a concrete blueprint for implementing a Mixture of Experts model. The
MainOrchestratorGraphacts as a sophisticated router or gating network, deciding which "expert" (worker graph) is best suited for a given sub-task. TheInvokeSubGraphNodeis the mechanism for dynamically activating these experts. This is precisely how I envision constructing brain-like systems: a high-level manager orchestrating a vast array of specialized, highly efficient expert modules.
Challenges Encountered:
- Designing the
WorkflowState: Initial iterations were too simplistic, leading to data loss or ambiguity. Realizing the need for a comprehensive, yet clean, state that accommodates both global context and granular sub-task details was crucial. - Dynamic Graph Routing: Implementing the conditional logic within the
expertRouterNodeand theedgesdefinition for theMainOrchestratorGraphrequired careful thought to maintain flexibility without sacrificing performance or clarity. - Error Propagation: Ensuring that errors within a deep sub-graph are correctly caught, logged, and propagated back up to the manager (and ultimately reflected in the
overallStatus) is vital for robust systems.
Future Work:
- Adaptive Routing: Moving beyond simple
if/elselogic inexpertRouterNodeto an LLM-powered decision-making process that dynamically selects worker graphs based on nuances of the sub-task and historical performance. This aligns perfectly with learning-based routing in MoE. - Parallel Execution: Currently, sub-tasks are processed sequentially. Implementing mechanisms for parallel execution of independent sub-graphs would significantly boost performance, mirroring the brain's massive parallelism.
- Self-Healing Workflows: Introducing nodes that can detect and attempt to recover from worker failures (e.g., retrying a sub-task, escalating to a human, or generating alternative plans).
- Tool Integration: Enhancing worker graphs with robust, raw tool integration for external APIs, databases, and computation environments, ensuring seamless interaction with the real world.
This hierarchical approach, built on a foundation of explicit state and direct graph orchestration, is a powerful paradigm for engineering complex, intelligent agent systems. It’s a tangible step towards my vision of building AI that mirrors the elegant, modular, and efficient architecture of the human brain.