Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
Building Multi-Step AI Agents with Planning and Memory

Building Multi-Step AI Agents with Planning and Memory

AI & Machine Learning⚡ Practitioner23 min readMay 9, 2026Updated May 9, 2026
Table of Contents
  • Prerequisites
  • Agent Architecture Fundamentals
  • Implementing Dynamic Planning Systems
  • Building Persistent Memory Systems
  • Integrating Tools and External Systems
  • Hands-On Exercise: Building a Market Research Agent
  • Common Mistakes & Troubleshooting
  • Summary & Next Steps

Building Multi-Step AI Agents with Planning and Memory

Picture this: You're building an AI system to analyze quarterly financial reports across multiple companies. The system needs to download reports, extract key metrics, compare them against historical data, identify trends, and generate insights—all while remembering what it learned from previous analyses to avoid redundant work. This isn't a job for a simple chatbot or single-query AI system. You need an agent that can plan, execute complex workflows, and maintain memory across interactions.

Multi-step AI agents represent a significant evolution from basic prompt-response systems. They combine the reasoning capabilities of large language models with structured planning algorithms and persistent memory systems. Unlike traditional AI applications that handle one task at a time, these agents can break down complex objectives into manageable steps, execute them sequentially or in parallel, and learn from their experiences.

By the end of this lesson, you'll have built a sophisticated AI agent capable of handling multi-faceted business analysis tasks that would typically require human intervention at each step.

What you'll learn:

  • How to design agent architectures that separate planning, execution, and memory components
  • Implementing dynamic planning systems that adapt based on intermediate results
  • Building persistent memory systems that enhance agent performance over time
  • Creating robust error handling and recovery mechanisms for long-running agent workflows
  • Integrating multiple tools and data sources within a cohesive agent framework

Prerequisites

You should be comfortable with Python programming and have basic experience with:

  • Large language model APIs (OpenAI, Anthropic, or similar)
  • JSON data manipulation and API interactions
  • Basic understanding of retrieval-augmented generation (RAG) concepts
  • Familiarity with async programming patterns in Python

Agent Architecture Fundamentals

Multi-step AI agents require a fundamentally different architecture than single-purpose AI tools. Instead of a simple input-output model, we need systems that can maintain state, plan ahead, and coordinate multiple operations.

The core components of our agent architecture include:

Planning Engine: Responsible for breaking down complex goals into executable steps and adapting plans based on results. This isn't just task decomposition—it's dynamic strategy adjustment.

Memory System: Maintains both short-term context (current workflow state) and long-term knowledge (learnings from previous executions). Think of it as the agent's experience database.

Tool Registry: A flexible system for integrating external capabilities like web scraping, data analysis, file manipulation, or API calls.

Execution Coordinator: Manages step execution, handles dependencies, and coordinates parallel operations when possible.

Let's start by building the foundation. Here's our base agent class that establishes the architecture:

import asyncio
import json
import logging
from datetime import datetime
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, asdict
from enum import Enum

class StepStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"

@dataclass
class AgentStep:
    id: str
    description: str
    tool: str
    parameters: Dict[str, Any]
    status: StepStatus = StepStatus.PENDING
    result: Optional[Any] = None
    error: Optional[str] = None
    dependencies: List[str] = None
    created_at: datetime = None
    completed_at: Optional[datetime] = None
    
    def __post_init__(self):
        if self.dependencies is None:
            self.dependencies = []
        if self.created_at is None:
            self.created_at = datetime.now()

class MultiStepAgent:
    def __init__(self, name: str, llm_client, memory_store=None):
        self.name = name
        self.llm_client = llm_client
        self.memory_store = memory_store or InMemoryStore()
        self.tools = {}
        self.current_plan = []
        self.execution_history = []
        self.logger = logging.getLogger(f"agent.{name}")
        
    def register_tool(self, name: str, func: Callable, description: str):
        """Register a tool that the agent can use during execution."""
        self.tools[name] = {
            'function': func,
            'description': description
        }
        self.logger.info(f"Registered tool: {name}")
    
    async def execute_goal(self, goal: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
        """Main entry point for agent execution."""
        self.logger.info(f"Starting execution for goal: {goal}")
        
        # Generate initial plan
        plan = await self._generate_plan(goal, context or {})
        self.current_plan = plan
        
        # Execute plan with dynamic adjustments
        results = await self._execute_plan_with_adaptation()
        
        # Store results in memory for future use
        await self._store_execution_memory(goal, results)
        
        return {
            'goal': goal,
            'status': 'completed' if all(step.status == StepStatus.COMPLETED for step in self.current_plan) else 'partial',
            'results': results,
            'steps_completed': len([s for s in self.current_plan if s.status == StepStatus.COMPLETED]),
            'total_steps': len(self.current_plan)
        }

This architecture separates concerns cleanly. The planning happens independently of execution, and memory operations are abstracted away from the core logic. Notice how we're using dataclasses for structured step representation—this makes debugging and state inspection much easier in complex workflows.

Implementing Dynamic Planning Systems

Static planning fails in real-world scenarios because intermediate results often reveal new information or complications. Our agent needs to adapt its strategy based on what it discovers during execution.

Here's how we implement a planning system that can revise itself:

class PlanningEngine:
    def __init__(self, llm_client, memory_store):
        self.llm_client = llm_client
        self.memory_store = memory_store
        
    async def generate_initial_plan(self, goal: str, context: Dict[str, Any], 
                                   available_tools: Dict[str, Any]) -> List[AgentStep]:
        """Generate the initial execution plan."""
        
        # Retrieve relevant past experiences
        past_experiences = await self.memory_store.retrieve_similar_goals(goal)
        
        planning_prompt = self._build_planning_prompt(
            goal, context, available_tools, past_experiences
        )
        
        response = await self.llm_client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": planning_prompt}],
            temperature=0.1
        )
        
        plan_data = json.loads(response.choices[0].message.content)
        
        steps = []
        for i, step_data in enumerate(plan_data['steps']):
            step = AgentStep(
                id=f"step_{i+1}",
                description=step_data['description'],
                tool=step_data['tool'],
                parameters=step_data['parameters'],
                dependencies=step_data.get('dependencies', [])
            )
            steps.append(step)
            
        return steps
    
    async def replan_from_failure(self, current_plan: List[AgentStep], 
                                 failed_step: AgentStep, 
                                 error_context: str) -> List[AgentStep]:
        """Generate alternative plan when a step fails."""
        
        completed_steps = [s for s in current_plan if s.status == StepStatus.COMPLETED]
        remaining_steps = [s for s in current_plan if s.status == StepStatus.PENDING]
        
        replanning_prompt = f"""
        REPLANNING REQUEST
        
        Original plan had a failure at step: {failed_step.description}
        Error: {failed_step.error}
        Context: {error_context}
        
        Successfully completed steps:
        {[{'description': s.description, 'result': str(s.result)[:200]} for s in completed_steps]}
        
        Remaining planned steps:
        {[s.description for s in remaining_steps]}
        
        Available tools: {list(self.tools.keys())}
        
        Generate a revised plan that:
        1. Works around the failed step
        2. Leverages the results from completed steps
        3. Achieves the same overall objective
        4. Returns JSON in the same format as initial planning
        
        Focus on practical alternatives and don't repeat the same failing approach.
        """
        
        response = await self.llm_client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": replanning_prompt}],
            temperature=0.2
        )
        
        revised_plan_data = json.loads(response.choices[0].message.content)
        
        # Create new steps with updated IDs
        new_steps = completed_steps.copy()  # Keep completed steps
        
        for i, step_data in enumerate(revised_plan_data['steps']):
            if not any(s.description == step_data['description'] for s in completed_steps):
                step = AgentStep(
                    id=f"revised_step_{len(new_steps)+1}",
                    description=step_data['description'],
                    tool=step_data['tool'],
                    parameters=step_data['parameters'],
                    dependencies=step_data.get('dependencies', [])
                )
                new_steps.append(step)
                
        return new_steps
    
    def _build_planning_prompt(self, goal: str, context: Dict[str, Any], 
                              tools: Dict[str, Any], past_experiences: List[Dict]) -> str:
        experience_summary = ""
        if past_experiences:
            experience_summary = f"""
            RELEVANT PAST EXPERIENCES:
            {json.dumps([exp for exp in past_experiences[:3]], indent=2)}
            """
        
        return f"""
        You are a planning engine for an AI agent. Generate a detailed execution plan.
        
        GOAL: {goal}
        
        CONTEXT:
        {json.dumps(context, indent=2)}
        
        AVAILABLE TOOLS:
        {json.dumps({name: info['description'] for name, info in tools.items()}, indent=2)}
        
        {experience_summary}
        
        Generate a JSON plan with this structure:
        {{
            "reasoning": "Explanation of your approach",
            "steps": [
                {{
                    "description": "Clear description of what this step accomplishes",
                    "tool": "tool_name_to_use",
                    "parameters": {{"param": "value"}},
                    "dependencies": ["step_1", "step_2"] // IDs of steps that must complete first
                }}
            ]
        }}
        
        Guidelines:
        - Break complex goals into logical, manageable steps
        - Consider dependencies and optimal ordering
        - Be specific about parameters
        - Each step should have a clear success criteria
        - Plan for error scenarios where reasonable
        """

The key insight here is that replanning isn't just about fixing failures—it's about optimization. When a step completes successfully but reveals new information (like finding additional data sources), the agent can revise its plan to take advantage of these discoveries.

Building Persistent Memory Systems

Memory transforms agents from stateless tools into learning systems. But effective memory isn't just about storage—it's about retrieval, relevance ranking, and knowledge synthesis.

Our memory system needs to handle multiple types of information:

Episodic Memory: Specific execution instances with their contexts, decisions, and outcomes Semantic Memory: General patterns and rules learned from multiple executions
Working Memory: Current context and intermediate results during active execution

import sqlite3
import pickle
import hashlib
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta

class PersistentMemoryStore:
    def __init__(self, db_path: str = "agent_memory.db"):
        self.db_path = db_path
        self._initialize_database()
        
    def _initialize_database(self):
        """Create the database schema for agent memory."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Episodic memories - specific execution instances
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS episodic_memories (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                goal_hash TEXT,
                goal TEXT,
                context_data BLOB,
                plan_data BLOB,
                results_data BLOB,
                success_rate REAL,
                execution_time REAL,
                created_at TIMESTAMP,
                tags TEXT
            )
        ''')
        
        # Semantic memories - learned patterns and rules
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS semantic_memories (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                pattern_type TEXT,
                pattern_description TEXT,
                confidence REAL,
                usage_count INTEGER DEFAULT 1,
                last_reinforced TIMESTAMP,
                supporting_episodes TEXT,
                rule_data BLOB
            )
        ''')
        
        # Working memory - current execution state
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS working_memory (
                session_id TEXT PRIMARY KEY,
                data BLOB,
                created_at TIMESTAMP,
                last_accessed TIMESTAMP
            )
        ''')
        
        conn.commit()
        conn.close()
    
    async def store_execution_memory(self, goal: str, context: Dict[str, Any], 
                                   plan: List[AgentStep], results: Dict[str, Any],
                                   execution_time: float) -> str:
        """Store a complete execution episode."""
        
        goal_hash = hashlib.md5(goal.encode()).hexdigest()
        success_rate = len([s for s in plan if s.status == StepStatus.COMPLETED]) / len(plan)
        
        # Extract meaningful tags from the goal and results
        tags = await self._extract_tags(goal, context, results)
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO episodic_memories 
            (goal_hash, goal, context_data, plan_data, results_data, success_rate, execution_time, created_at, tags)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            goal_hash,
            goal,
            pickle.dumps(context),
            pickle.dumps([asdict(step) for step in plan]),
            pickle.dumps(results),
            success_rate,
            execution_time,
            datetime.now(),
            ','.join(tags)
        ))
        
        episode_id = cursor.lastrowid
        conn.commit()
        conn.close()
        
        # Extract semantic patterns from this execution
        await self._extract_semantic_patterns(goal, context, plan, results, episode_id)
        
        return f"episode_{episode_id}"
    
    async def retrieve_similar_goals(self, goal: str, limit: int = 5) -> List[Dict[str, Any]]:
        """Retrieve episodic memories of similar goals."""
        
        goal_tokens = set(goal.lower().split())
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Get all episodes from the last 90 days for relevance
        cutoff_date = datetime.now() - timedelta(days=90)
        cursor.execute('''
            SELECT goal, context_data, results_data, success_rate, tags, created_at
            FROM episodic_memories 
            WHERE created_at > ?
            ORDER BY created_at DESC
        ''', (cutoff_date,))
        
        episodes = cursor.fetchall()
        conn.close()
        
        # Calculate relevance scores based on goal similarity and success rate
        scored_episodes = []
        for goal_text, context_blob, results_blob, success_rate, tags, created_at in episodes:
            episode_tokens = set(goal_text.lower().split())
            tag_tokens = set(tags.lower().split(',')) if tags else set()
            
            # Simple token overlap scoring with success rate weighting
            token_overlap = len(goal_tokens.intersection(episode_tokens))
            tag_overlap = len(goal_tokens.intersection(tag_tokens))
            
            relevance_score = (token_overlap + tag_overlap) * success_rate
            
            if relevance_score > 0:
                scored_episodes.append({
                    'goal': goal_text,
                    'context': pickle.loads(context_blob),
                    'results': pickle.loads(results_blob),
                    'success_rate': success_rate,
                    'relevance_score': relevance_score,
                    'created_at': created_at
                })
        
        # Return top matches sorted by relevance
        scored_episodes.sort(key=lambda x: x['relevance_score'], reverse=True)
        return scored_episodes[:limit]
    
    async def get_learned_patterns(self, pattern_type: str = None) -> List[Dict[str, Any]]:
        """Retrieve semantic patterns learned from past executions."""
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        if pattern_type:
            cursor.execute('''
                SELECT pattern_description, confidence, usage_count, rule_data
                FROM semantic_memories 
                WHERE pattern_type = ? AND confidence > 0.6
                ORDER BY confidence DESC, usage_count DESC
            ''', (pattern_type,))
        else:
            cursor.execute('''
                SELECT pattern_type, pattern_description, confidence, usage_count, rule_data
                FROM semantic_memories 
                WHERE confidence > 0.6
                ORDER BY confidence DESC, usage_count DESC
            ''')
        
        patterns = []
        for row in cursor.fetchall():
            if pattern_type:
                pattern_desc, confidence, usage_count, rule_blob = row
                pattern_data = {
                    'description': pattern_desc,
                    'confidence': confidence,
                    'usage_count': usage_count,
                    'rule': pickle.loads(rule_blob) if rule_blob else None
                }
            else:
                p_type, pattern_desc, confidence, usage_count, rule_blob = row
                pattern_data = {
                    'type': p_type,
                    'description': pattern_desc,
                    'confidence': confidence,
                    'usage_count': usage_count,
                    'rule': pickle.loads(rule_blob) if rule_blob else None
                }
            patterns.append(pattern_data)
        
        conn.close()
        return patterns
    
    async def _extract_semantic_patterns(self, goal: str, context: Dict[str, Any], 
                                       plan: List[AgentStep], results: Dict[str, Any], 
                                       episode_id: int):
        """Extract reusable patterns from successful executions."""
        
        successful_steps = [s for s in plan if s.status == StepStatus.COMPLETED]
        if len(successful_steps) < 2:
            return  # Need multiple successful steps to identify patterns
        
        # Identify tool usage patterns
        tool_sequence = [step.tool for step in successful_steps]
        if len(tool_sequence) >= 3:
            await self._record_pattern(
                "tool_sequence",
                f"Successful sequence: {' -> '.join(tool_sequence[:3])}",
                0.7,  # Initial confidence
                {'sequence': tool_sequence[:3], 'context_type': self._classify_context(context)},
                episode_id
            )
        
        # Identify parameter optimization patterns
        for step in successful_steps:
            if step.parameters and 'timeout' in step.parameters:
                await self._record_pattern(
                    "parameter_optimization",
                    f"Tool {step.tool} works well with timeout={step.parameters['timeout']}",
                    0.6,
                    {'tool': step.tool, 'parameter': 'timeout', 'value': step.parameters['timeout']},
                    episode_id
                )
    
    def _classify_context(self, context: Dict[str, Any]) -> str:
        """Simple context classification for pattern matching."""
        if 'financial' in str(context).lower():
            return 'financial_analysis'
        elif 'data' in str(context).lower() and 'analysis' in str(context).lower():
            return 'data_analysis'
        else:
            return 'general'
    
    async def _record_pattern(self, pattern_type: str, description: str, 
                            confidence: float, rule_data: Dict[str, Any], 
                            episode_id: int):
        """Record a learned pattern in semantic memory."""
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Check if similar pattern already exists
        cursor.execute('''
            SELECT id, confidence, usage_count, supporting_episodes 
            FROM semantic_memories 
            WHERE pattern_type = ? AND pattern_description = ?
        ''', (pattern_type, description))
        
        existing = cursor.fetchone()
        
        if existing:
            # Reinforce existing pattern
            pattern_id, old_confidence, usage_count, supporting_episodes = existing
            new_confidence = min(1.0, old_confidence + 0.1)  # Gradual confidence building
            new_usage_count = usage_count + 1
            
            episodes_list = supporting_episodes.split(',') if supporting_episodes else []
            episodes_list.append(str(episode_id))
            
            cursor.execute('''
                UPDATE semantic_memories 
                SET confidence = ?, usage_count = ?, last_reinforced = ?, supporting_episodes = ?
                WHERE id = ?
            ''', (new_confidence, new_usage_count, datetime.now(), ','.join(episodes_list), pattern_id))
        else:
            # Create new pattern
            cursor.execute('''
                INSERT INTO semantic_memories 
                (pattern_type, pattern_description, confidence, last_reinforced, supporting_episodes, rule_data)
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (pattern_type, description, confidence, datetime.now(), str(episode_id), pickle.dumps(rule_data)))
        
        conn.commit()
        conn.close()
    
    async def _extract_tags(self, goal: str, context: Dict[str, Any], 
                           results: Dict[str, Any]) -> List[str]:
        """Extract meaningful tags for episode categorization."""
        tags = []
        
        # Goal-based tags
        goal_words = goal.lower().split()
        important_words = [w for w in goal_words if len(w) > 4 and w not in ['with', 'from', 'that', 'this', 'will']]
        tags.extend(important_words[:3])
        
        # Context-based tags
        if context:
            context_str = str(context).lower()
            if 'financial' in context_str or 'revenue' in context_str or 'profit' in context_str:
                tags.append('financial')
            if 'data' in context_str and ('analysis' in context_str or 'report' in context_str):
                tags.append('analytics')
        
        return list(set(tags))  # Remove duplicates

This memory system creates a feedback loop: each execution provides data that improves future planning. The semantic pattern extraction is particularly powerful—it automatically learns which tool combinations work well together and which parameters tend to succeed in specific contexts.

Integrating Tools and External Systems

Real-world agents need to interact with multiple systems. Our tool registry system provides a clean abstraction for integrating diverse capabilities while maintaining consistency in how the agent interacts with them.

import aiohttp
import pandas as pd
from pathlib import Path
import asyncio

class ToolRegistry:
    def __init__(self, memory_store):
        self.tools = {}
        self.memory_store = memory_store
        
    def register(self, name: str, description: str):
        """Decorator for registering tools with the agent."""
        def decorator(func):
            self.tools[name] = {
                'function': func,
                'description': description,
                'usage_count': 0,
                'success_count': 0,
                'avg_execution_time': 0.0
            }
            return func
        return decorator
    
    async def execute_tool(self, name: str, parameters: Dict[str, Any], 
                          timeout: int = 300) -> Dict[str, Any]:
        """Execute a tool with performance tracking."""
        if name not in self.tools:
            raise ValueError(f"Tool '{name}' not found in registry")
        
        tool_info = self.tools[name]
        start_time = datetime.now()
        
        try:
            # Apply learned parameter optimizations
            optimized_params = await self._apply_parameter_optimizations(name, parameters)
            
            # Execute with timeout
            result = await asyncio.wait_for(
                tool_info['function'](**optimized_params),
                timeout=timeout
            )
            
            # Update performance metrics
            execution_time = (datetime.now() - start_time).total_seconds()
            tool_info['usage_count'] += 1
            tool_info['success_count'] += 1
            tool_info['avg_execution_time'] = (
                (tool_info['avg_execution_time'] * (tool_info['success_count'] - 1) + execution_time) 
                / tool_info['success_count']
            )
            
            return {
                'success': True,
                'result': result,
                'execution_time': execution_time,
                'tool': name
            }
            
        except asyncio.TimeoutError:
            tool_info['usage_count'] += 1
            return {
                'success': False,
                'error': f"Tool '{name}' timed out after {timeout} seconds",
                'tool': name
            }
        except Exception as e:
            tool_info['usage_count'] += 1
            return {
                'success': False,
                'error': f"Tool '{name}' failed: {str(e)}",
                'tool': name
            }
    
    async def _apply_parameter_optimizations(self, tool_name: str, 
                                           parameters: Dict[str, Any]) -> Dict[str, Any]:
        """Apply learned parameter optimizations."""
        patterns = await self.memory_store.get_learned_patterns("parameter_optimization")
        
        optimized_params = parameters.copy()
        
        for pattern in patterns:
            if (pattern.get('rule', {}).get('tool') == tool_name and 
                pattern['confidence'] > 0.7):
                param_name = pattern['rule']['parameter']
                param_value = pattern['rule']['value']
                
                # Apply optimization if parameter isn't explicitly set
                if param_name not in optimized_params:
                    optimized_params[param_name] = param_value
        
        return optimized_params

# Now let's register some realistic tools
def create_financial_analysis_tools(registry: ToolRegistry):
    
    @registry.register("web_scraper", "Scrape data from web pages with robust error handling")
    async def scrape_web_data(url: str, css_selector: str = None, timeout: int = 30) -> Dict[str, Any]:
        """Scrape structured data from web pages."""
        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session:
            try:
                async with session.get(url) as response:
                    if response.status != 200:
                        raise Exception(f"HTTP {response.status}: {await response.text()}")
                    
                    content = await response.text()
                    
                    # Simple extraction - in production, use BeautifulSoup or similar
                    if css_selector:
                        # Placeholder for CSS selector logic
                        extracted_data = {"raw_content": content[:1000]}
                    else:
                        extracted_data = {"raw_content": content}
                    
                    return {
                        "url": url,
                        "status_code": response.status,
                        "content_length": len(content),
                        "extracted_data": extracted_data
                    }
                    
            except Exception as e:
                raise Exception(f"Web scraping failed for {url}: {str(e)}")
    
    @registry.register("financial_calculator", "Calculate financial metrics and ratios")
    async def calculate_financial_metrics(data: Dict[str, float], 
                                        metrics: List[str]) -> Dict[str, float]:
        """Calculate standard financial metrics."""
        results = {}
        
        # Revenue growth rate
        if "revenue_growth" in metrics and "current_revenue" in data and "previous_revenue" in data:
            if data["previous_revenue"] != 0:
                results["revenue_growth"] = (
                    (data["current_revenue"] - data["previous_revenue"]) / data["previous_revenue"]
                ) * 100
        
        # Profit margin
        if "profit_margin" in metrics and "net_income" in data and "revenue" in data:
            if data["revenue"] != 0:
                results["profit_margin"] = (data["net_income"] / data["revenue"]) * 100
        
        # ROE (Return on Equity)
        if "roe" in metrics and "net_income" in data and "shareholders_equity" in data:
            if data["shareholders_equity"] != 0:
                results["roe"] = (data["net_income"] / data["shareholders_equity"]) * 100
        
        # Current ratio
        if "current_ratio" in metrics and "current_assets" in data and "current_liabilities" in data:
            if data["current_liabilities"] != 0:
                results["current_ratio"] = data["current_assets"] / data["current_liabilities"]
        
        return results
    
    @registry.register("data_analyzer", "Perform statistical analysis on datasets")
    async def analyze_dataset(data: List[Dict[str, Any]], 
                            analysis_type: str = "summary") -> Dict[str, Any]:
        """Analyze datasets with various statistical methods."""
        if not data:
            return {"error": "Empty dataset provided"}
        
        df = pd.DataFrame(data)
        
        if analysis_type == "summary":
            numeric_columns = df.select_dtypes(include=['number']).columns
            summary = {}
            
            for col in numeric_columns:
                summary[col] = {
                    "mean": float(df[col].mean()),
                    "median": float(df[col].median()),
                    "std": float(df[col].std()),
                    "min": float(df[col].min()),
                    "max": float(df[col].max()),
                    "count": int(df[col].count())
                }
            
            return {
                "summary_statistics": summary,
                "total_records": len(df),
                "columns": list(df.columns)
            }
        
        elif analysis_type == "trend":
            # Simple trend analysis
            trends = {}
            numeric_columns = df.select_dtypes(include=['number']).columns
            
            for col in numeric_columns:
                if len(df) > 1:
                    # Calculate simple linear trend
                    x = list(range(len(df)))
                    y = df[col].values
                    
                    # Simple correlation with time
                    correlation = pd.Series(x).corr(pd.Series(y))
                    trends[col] = {
                        "trend_direction": "increasing" if correlation > 0.1 else "decreasing" if correlation < -0.1 else "stable",
                        "correlation_with_time": float(correlation)
                    }
            
            return {"trends": trends}
        
        return {"error": f"Unknown analysis type: {analysis_type}"}
    
    @registry.register("report_generator", "Generate formatted reports from analysis results")
    async def generate_report(data: Dict[str, Any], 
                            template: str = "financial_summary",
                            output_format: str = "markdown") -> str:
        """Generate formatted reports from analysis data."""
        
        if template == "financial_summary" and output_format == "markdown":
            report_content = f"""# Financial Analysis Report
Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

## Executive Summary
"""
            
            # Add metrics if available
            if "financial_metrics" in data:
                report_content += "\n### Key Financial Metrics\n"
                for metric, value in data["financial_metrics"].items():
                    if isinstance(value, float):
                        report_content += f"- **{metric.replace('_', ' ').title()}**: {value:.2f}%\n"
                    else:
                        report_content += f"- **{metric.replace('_', ' ').title()}**: {value}\n"
            
            # Add analysis results
            if "analysis_results" in data:
                report_content += "\n### Analysis Results\n"
                results = data["analysis_results"]
                if "summary_statistics" in results:
                    report_content += "\n#### Summary Statistics\n"
                    for column, stats in results["summary_statistics"].items():
                        report_content += f"\n**{column}:**\n"
                        report_content += f"- Mean: {stats['mean']:.2f}\n"
                        report_content += f"- Median: {stats['median']:.2f}\n"
                        report_content += f"- Standard Deviation: {stats['std']:.2f}\n"
                        report_content += f"- Range: {stats['min']:.2f} to {stats['max']:.2f}\n"
            
            return report_content
        
        return f"Template '{template}' with format '{output_format}' not implemented"

# Initialize the tool registry
def setup_agent_tools(memory_store) -> ToolRegistry:
    registry = ToolRegistry(memory_store)
    create_financial_analysis_tools(registry)
    return registry

The tool registry automatically tracks performance metrics and applies learned optimizations. This means that over time, the agent gets better at using its tools—adjusting timeouts, parameter values, and retry strategies based on historical success rates.

Hands-On Exercise: Building a Market Research Agent

Let's put it all together by building a complete market research agent that can analyze multiple companies, compare their financial performance, and generate insights.

import asyncio
import json
from datetime import datetime

class MarketResearchAgent(MultiStepAgent):
    def __init__(self, llm_client):
        memory_store = PersistentMemoryStore("market_research_memory.db")
        super().__init__("market_researcher", llm_client, memory_store)
        
        # Set up specialized tools
        self.tool_registry = setup_agent_tools(memory_store)
        self.planning_engine = PlanningEngine(llm_client, memory_store)
        
        # Register tools with the agent
        for name, info in self.tool_registry.tools.items():
            self.register_tool(name, info['function'], info['description'])
    
    async def research_companies(self, companies: List[str], 
                               research_focus: str = "financial_performance") -> Dict[str, Any]:
        """Conduct comprehensive research on a list of companies."""
        
        goal = f"Research and analyze {len(companies)} companies focusing on {research_focus}"
        context = {
            "companies": companies,
            "focus": research_focus,
            "timestamp": datetime.now().isoformat()
        }
        
        # Generate execution plan
        plan = await self.planning_engine.generate_initial_plan(
            goal, context, self.tools
        )
        
        self.current_plan = plan
        results = {}
        
        try:
            # Execute plan with monitoring and adaptation
            for step in self.current_plan:
                if not self._dependencies_met(step):
                    continue
                
                step.status = StepStatus.IN_PROGRESS
                
                # Execute the step
                tool_result = await self.tool_registry.execute_tool(
                    step.tool, 
                    step.parameters
                )
                
                if tool_result['success']:
                    step.status = StepStatus.COMPLETED
                    step.result = tool_result['result']
                    step.completed_at = datetime.now()
                    
                    # Store intermediate results
                    results[step.id] = tool_result['result']
                    
                    # Check if plan needs adaptation based on results
                    if await self._should_adapt_plan(step, tool_result):
                        adapted_plan = await self._adapt_plan(step, tool_result)
                        self.current_plan.extend(adapted_plan)
                
                else:
                    step.status = StepStatus.FAILED
                    step.error = tool_result['error']
                    
                    # Attempt recovery
                    recovery_plan = await self.planning_engine.replan_from_failure(
                        self.current_plan, step, tool_result['error']
                    )
                    
                    if recovery_plan:
                        self.current_plan = recovery_plan
                        self.logger.info(f"Replanned after failure in {step.description}")
            
            # Generate final synthesis
            synthesis = await self._synthesize_results(goal, results)
            
            # Store execution memory
            execution_time = sum(
                (step.completed_at - step.created_at).total_seconds() 
                for step in self.current_plan 
                if step.completed_at
            )
            
            await self.memory_store.store_execution_memory(
                goal, context, self.current_plan, results, execution_time
            )
            
            return {
                "goal": goal,
                "companies_analyzed": companies,
                "results": results,
                "synthesis": synthesis,
                "execution_summary": {
                    "total_steps": len(self.current_plan),
                    "successful_steps": len([s for s in self.current_plan if s.status == StepStatus.COMPLETED]),
                    "failed_steps": len([s for s in self.current_plan if s.status == StepStatus.FAILED]),
                    "execution_time": execution_time
                }
            }
            
        except Exception as e:
            self.logger.error(f"Agent execution failed: {str(e)}")
            return {"error": str(e), "partial_results": results}
    
    def _dependencies_met(self, step: AgentStep) -> bool:
        """Check if all dependencies for a step are satisfied."""
        if not step.dependencies:
            return True
        
        completed_ids = {s.id for s in self.current_plan if s.status == StepStatus.COMPLETED}
        return all(dep_id in completed_ids for dep_id in step.dependencies)
    
    async def _should_adapt_plan(self, completed_step: AgentStep, 
                               result: Dict[str, Any]) -> bool:
        """Determine if the plan should be adapted based on step results."""
        
        # Example: If we discover more companies during research
        if (completed_step.tool == "web_scraper" and 
            "companies" in str(result.get('result', {})).lower()):
            return True
        
        # Example: If analysis reveals need for additional data
        if (completed_step.tool == "data_analyzer" and 
            "insufficient_data" in str(result.get('result', {})).lower()):
            return True
        
        return False
    
    async def _adapt_plan(self, trigger_step: AgentStep, 
                         result: Dict[str, Any]) -> List[AgentStep]:
        """Generate additional steps based on new information."""
        
        adaptation_context = {
            "trigger_step": trigger_step.description,
            "result_summary": str(result.get('result', {}))[:500],
            "current_plan_status": len([s for s in self.current_plan if s.status == StepStatus.COMPLETED])
        }
        
        adaptation_prompt = f"""
        PLAN ADAPTATION REQUEST
        
        Current step revealed new information that suggests additional analysis would be valuable.
        
        Trigger step: {trigger_step.description}
        Result summary: {adaptation_context['result_summary']}
        
        Available tools: {list(self.tools.keys())}
        
        Generate 1-3 additional steps that would enhance the analysis based on this new information.
        Return JSON with the same format as initial planning.
        
        Focus on high-value additions that leverage the new information discovered.
        """
        
        try:
            response = await self.llm_client.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": adaptation_prompt}],
                temperature=0.2
            )
            
            adaptation_data = json.loads(response.choices[0].message.content)
            
            additional_steps = []
            for i, step_data in enumerate(adaptation_data.get('steps', [])):
                step = AgentStep(
                    id=f"adapted_step_{len(self.current_plan) + i + 1}",
                    description=step_data['description'],
                    tool=step_data['tool'],
                    parameters=step_data['parameters'],
                    dependencies=step_data.get('dependencies', [])
                )
                additional_steps.append(step)
            
            return additional_steps
            
        except Exception as e:
            self.logger.warning(f"Plan adaptation failed: {str(e)}")
            return []
    
    async def _synthesize_results(self, goal: str, 
                                results: Dict[str, Any]) -> Dict[str, Any]:
        """Generate insights and synthesis from execution results."""
        
        synthesis_prompt = f"""
        RESULTS SYNTHESIS
        
        Goal: {goal}
        
        Execution results:
        {json.dumps(results, indent=2, default=str)}
        
        Generate a comprehensive synthesis that includes:
        1. Key insights discovered
        2. Comparative analysis (if multiple entities analyzed)
        3. Significant patterns or trends
        4. Recommendations or next steps
        5. Confidence levels for major conclusions
        
        Return structured JSON with clear sections for each type of insight.
        """
        
        try:
            response = await self.llm_client.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": synthesis_prompt}],
                temperature=0.3
            )
            
            return json.loads(response.choices[0].message.content)
            
        except Exception as e:
            return {
                "synthesis_error": str(e),
                "raw_results": results
            }

# Example usage
async def demo_market_research():
    # Mock LLM client for demonstration
    class MockLLMClient:
        class Chat:
            class Completions:
                async def create(self, **kwargs):
                    # Simplified mock response - in practice, use real OpenAI client
                    class MockResponse:
                        def __init__(self):
                            self.choices = [type('Choice', (), {
                                'message': type('Message', (), {
                                    'content': json.dumps({
                                        "reasoning": "Analyzing companies requires data collection, analysis, and reporting",
                                        "steps": [
                                            {
                                                "description": "Gather financial data for target companies",
                                                "tool": "web_scraper",
                                                "parameters": {
                                                    "url": "https://finance.yahoo.com",
                                                    "timeout": 60
                                                }
                                            },
                                            {
                                                "description": "Calculate key financial metrics",
                                                "tool": "financial_calculator",
                                                "parameters": {
                                                    "metrics": ["revenue_growth", "profit_margin", "roe"]
                                                },
                                                "dependencies": ["step_1"]
                                            },
                                            {
                                                "description": "Generate comparative analysis report",
                                                "tool": "report_generator",
                                                "parameters": {
                                                    "template": "financial_summary"
                                                },
                                                "dependencies": ["step_2"]
                                            }
                                        ]
                                    })
                                })()
                            })]
                    
                    return MockResponse()
            
            def __init__(self):
                self.completions = self.Completions()
        
        def __init__(self):
            self.chat = self.Chat()
    
    # Create and run the agent
    agent = MarketResearchAgent(MockLLMClient())
    
    results = await agent.research_companies(
        companies=["Apple", "Microsoft", "Google"],
        research_focus="financial_performance"
    )
    
    print(json.dumps(results, indent=2, default=str))

# Run the demo
if __name__ == "__main__":
    asyncio.run(demo_market_research())

This complete implementation demonstrates how all the components work together. The agent can adapt its plan mid-execution, learn from each run, and handle complex multi-step workflows that would be impossible with traditional AI applications.

Common Mistakes & Troubleshooting

Memory Bloat and Performance Degradation

The most common issue is unbounded memory growth. Without proper cleanup and relevance filtering, your agent's memory can become a performance bottleneck. Implement regular cleanup routines:

async def cleanup_old_memories(self, days_threshold: int = 180):
    """Remove low-value memories older than threshold."""
    cutoff_date = datetime.now() - timedelta(days=days_threshold)
    
    conn = sqlite3.connect(self.db_path)
    cursor = conn.cursor()
    
    # Remove low-success episodes
    cursor.execute('''
        DELETE FROM episodic_memories 
        WHERE created_at < ? AND success_rate < 0.3
    ''', (cutoff_date,))
    
    # Remove low-confidence semantic patterns
    cursor.execute('''
        DELETE FROM semantic_memories 
        WHERE last_reinforced < ? AND confidence < 0.5
    ''', (cutoff_date,))
    
    conn.commit()
    conn.close()

Infinite Planning Loops

When replanning fails repeatedly, agents can get stuck in endless planning cycles. Always implement maximum retry limits and fallback strategies:

async def execute_with_retry_limit(self, max_replans: int = 3):
    replan_count = 0
    
    while replan_count < max_replans:
        try:
            return await self._execute_plan()
        except PlanFailure as e:
            replan_count += 1
            if replan_count >= max_replans:
                # Switch to simplified fallback plan
                return await self._execute_fallback_plan(e)
            
            self.current_plan = await self.planning_engine.replan_from_failure(
                self.current_plan, e.failed_step, e.context
            )

Tool Integration Fragility

External APIs and systems fail regularly. Build resilience into every tool:

async def resilient_tool_execution(self, tool_name: str, params: Dict[str, Any]):
    max_retries = 3
    backoff_seconds = [1, 5, 15]  # Exponential backoff
    
    for attempt in range(max_retries):
        try:
            return await self.tool_registry.execute_tool(tool_name, params)
        except Exception as e:
            if attempt == max_retries - 1:
                # Final attempt failed, try alternative approach
                alternative_result = await self._try_alternative_approach(tool_name, params, e)
                if alternative_result:
                    return alternative_result
                raise e
            
            # Wait before retry
            await asyncio.sleep(backoff_seconds[attempt])

Context Window Limitations

Large plans and extensive memory can exceed LLM context windows. Implement intelligent context compression:

async def compress_context_for_llm(self, goal: str, plan: List[AgentStep], 
                                 memories: List[Dict]) -> str:
    """Compress context to fit within LLM limits while preserving key information."""
    
    # Prioritize recent and successful memories
    relevant_memories = sorted(
        memories, 
        key=lambda m: (m['relevance_score'], m['success_rate']), 
        reverse=True
    )[:3]  # Keep only top 3
    
    # Summarize completed steps instead of full details
    completed_summaries = []
    for step in plan:
        if step.status == StepStatus.COMPLETED:
            completed_summaries.append({
                'description': step.description,
                'tool': step.tool,
                'success': True
            })
    
    # Build compressed context
    compressed_context = {
        'goal': goal,
        'completed_steps': completed_summaries,
        'relevant_past_experiences': [
            {
                'goal': m['goal'], 
                'success_rate': m['success_rate']
            } for m in relevant_memories
        ]
    }
    
    return json.dumps(compressed_context, indent=2)

Summary & Next Steps

Multi-step AI agents represent a significant evolution in AI system design, moving beyond simple question-answering to complex workflow automation and decision-making. You now have the foundational components: a planning engine that can adapt based on results, a memory system that learns from experience, and a tool integration framework that handles real-world complexity.

The key insights from this implementation are:

Architecture Matters: Separating planning, execution, and memory concerns makes agents much more maintainable and debuggable than monolithic approaches.

Memory Enables Learning: Agents that remember and learn from past executions become dramatically more effective over time, automatically optimizing parameters and strategies.

Resilience is Critical: Production agents must handle failures gracefully with retry logic, alternative approaches, and degraded operation modes.

Context Management is Essential: As agents become more sophisticated, managing information flow and context size becomes a significant engineering challenge.

For your next steps, consider these advanced topics:

Multi-Agent Coordination: Build systems where multiple specialized agents collaborate on complex tasks, sharing workload and expertise.

Real-Time Learning: Implement online learning systems that update agent behavior during execution, not just between sessions.

Human-in-the-Loop Integration: Design interfaces where humans can intervene, guide, or validate agent decisions at critical points.

Performance Optimization: Explore parallel execution strategies, caching mechanisms, and distributed agent architectures for handling enterprise-scale workloads.

The foundation you've built here scales to handle increasingly complex scenarios—from automated business intelligence systems to sophisticated data analysis pipelines that adapt and optimize themselves over time.

Learning Path: RAG & AI Agents

Previous

Chunking Strategies: How to Split Documents for Better Retrieval

Next

Evaluating RAG Systems: Precision, Recall, and Faithfulness

Related Articles

AI & Machine Learning🔥 Expert

Enterprise RAG: Security, Permissions, and Multi-Tenant Architecture

27 min
AI & Machine Learning⚡ Practitioner

Production RAG: Caching, Monitoring, and Continuous Improvement

21 min
AI & Machine Learning🌱 Foundation

Hybrid Search: Combining Keyword and Semantic Search for Better Results

14 min

On this page

  • Prerequisites
  • Agent Architecture Fundamentals
  • Implementing Dynamic Planning Systems
  • Building Persistent Memory Systems
  • Integrating Tools and External Systems
  • Hands-On Exercise: Building a Market Research Agent
  • Common Mistakes & Troubleshooting
  • Summary & Next Steps