
Picture this: You're building an AI system to analyze quarterly financial reports across multiple companies. The system needs to download reports, extract key metrics, compare them against historical data, identify trends, and generate insights—all while remembering what it learned from previous analyses to avoid redundant work. This isn't a job for a simple chatbot or single-query AI system. You need an agent that can plan, execute complex workflows, and maintain memory across interactions.
Multi-step AI agents represent a significant evolution from basic prompt-response systems. They combine the reasoning capabilities of large language models with structured planning algorithms and persistent memory systems. Unlike traditional AI applications that handle one task at a time, these agents can break down complex objectives into manageable steps, execute them sequentially or in parallel, and learn from their experiences.
By the end of this lesson, you'll have built a sophisticated AI agent capable of handling multi-faceted business analysis tasks that would typically require human intervention at each step.
What you'll learn:
You should be comfortable with Python programming and have basic experience with:
Multi-step AI agents require a fundamentally different architecture than single-purpose AI tools. Instead of a simple input-output model, we need systems that can maintain state, plan ahead, and coordinate multiple operations.
The core components of our agent architecture include:
Planning Engine: Responsible for breaking down complex goals into executable steps and adapting plans based on results. This isn't just task decomposition—it's dynamic strategy adjustment.
Memory System: Maintains both short-term context (current workflow state) and long-term knowledge (learnings from previous executions). Think of it as the agent's experience database.
Tool Registry: A flexible system for integrating external capabilities like web scraping, data analysis, file manipulation, or API calls.
Execution Coordinator: Manages step execution, handles dependencies, and coordinates parallel operations when possible.
Let's start by building the foundation. Here's our base agent class that establishes the architecture:
import asyncio
import json
import logging
from datetime import datetime
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, asdict
from enum import Enum
class StepStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class AgentStep:
id: str
description: str
tool: str
parameters: Dict[str, Any]
status: StepStatus = StepStatus.PENDING
result: Optional[Any] = None
error: Optional[str] = None
dependencies: List[str] = None
created_at: datetime = None
completed_at: Optional[datetime] = None
def __post_init__(self):
if self.dependencies is None:
self.dependencies = []
if self.created_at is None:
self.created_at = datetime.now()
class MultiStepAgent:
def __init__(self, name: str, llm_client, memory_store=None):
self.name = name
self.llm_client = llm_client
self.memory_store = memory_store or InMemoryStore()
self.tools = {}
self.current_plan = []
self.execution_history = []
self.logger = logging.getLogger(f"agent.{name}")
def register_tool(self, name: str, func: Callable, description: str):
"""Register a tool that the agent can use during execution."""
self.tools[name] = {
'function': func,
'description': description
}
self.logger.info(f"Registered tool: {name}")
async def execute_goal(self, goal: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Main entry point for agent execution."""
self.logger.info(f"Starting execution for goal: {goal}")
# Generate initial plan
plan = await self._generate_plan(goal, context or {})
self.current_plan = plan
# Execute plan with dynamic adjustments
results = await self._execute_plan_with_adaptation()
# Store results in memory for future use
await self._store_execution_memory(goal, results)
return {
'goal': goal,
'status': 'completed' if all(step.status == StepStatus.COMPLETED for step in self.current_plan) else 'partial',
'results': results,
'steps_completed': len([s for s in self.current_plan if s.status == StepStatus.COMPLETED]),
'total_steps': len(self.current_plan)
}
This architecture separates concerns cleanly. The planning happens independently of execution, and memory operations are abstracted away from the core logic. Notice how we're using dataclasses for structured step representation—this makes debugging and state inspection much easier in complex workflows.
Static planning fails in real-world scenarios because intermediate results often reveal new information or complications. Our agent needs to adapt its strategy based on what it discovers during execution.
Here's how we implement a planning system that can revise itself:
class PlanningEngine:
def __init__(self, llm_client, memory_store):
self.llm_client = llm_client
self.memory_store = memory_store
async def generate_initial_plan(self, goal: str, context: Dict[str, Any],
available_tools: Dict[str, Any]) -> List[AgentStep]:
"""Generate the initial execution plan."""
# Retrieve relevant past experiences
past_experiences = await self.memory_store.retrieve_similar_goals(goal)
planning_prompt = self._build_planning_prompt(
goal, context, available_tools, past_experiences
)
response = await self.llm_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": planning_prompt}],
temperature=0.1
)
plan_data = json.loads(response.choices[0].message.content)
steps = []
for i, step_data in enumerate(plan_data['steps']):
step = AgentStep(
id=f"step_{i+1}",
description=step_data['description'],
tool=step_data['tool'],
parameters=step_data['parameters'],
dependencies=step_data.get('dependencies', [])
)
steps.append(step)
return steps
async def replan_from_failure(self, current_plan: List[AgentStep],
failed_step: AgentStep,
error_context: str) -> List[AgentStep]:
"""Generate alternative plan when a step fails."""
completed_steps = [s for s in current_plan if s.status == StepStatus.COMPLETED]
remaining_steps = [s for s in current_plan if s.status == StepStatus.PENDING]
replanning_prompt = f"""
REPLANNING REQUEST
Original plan had a failure at step: {failed_step.description}
Error: {failed_step.error}
Context: {error_context}
Successfully completed steps:
{[{'description': s.description, 'result': str(s.result)[:200]} for s in completed_steps]}
Remaining planned steps:
{[s.description for s in remaining_steps]}
Available tools: {list(self.tools.keys())}
Generate a revised plan that:
1. Works around the failed step
2. Leverages the results from completed steps
3. Achieves the same overall objective
4. Returns JSON in the same format as initial planning
Focus on practical alternatives and don't repeat the same failing approach.
"""
response = await self.llm_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": replanning_prompt}],
temperature=0.2
)
revised_plan_data = json.loads(response.choices[0].message.content)
# Create new steps with updated IDs
new_steps = completed_steps.copy() # Keep completed steps
for i, step_data in enumerate(revised_plan_data['steps']):
if not any(s.description == step_data['description'] for s in completed_steps):
step = AgentStep(
id=f"revised_step_{len(new_steps)+1}",
description=step_data['description'],
tool=step_data['tool'],
parameters=step_data['parameters'],
dependencies=step_data.get('dependencies', [])
)
new_steps.append(step)
return new_steps
def _build_planning_prompt(self, goal: str, context: Dict[str, Any],
tools: Dict[str, Any], past_experiences: List[Dict]) -> str:
experience_summary = ""
if past_experiences:
experience_summary = f"""
RELEVANT PAST EXPERIENCES:
{json.dumps([exp for exp in past_experiences[:3]], indent=2)}
"""
return f"""
You are a planning engine for an AI agent. Generate a detailed execution plan.
GOAL: {goal}
CONTEXT:
{json.dumps(context, indent=2)}
AVAILABLE TOOLS:
{json.dumps({name: info['description'] for name, info in tools.items()}, indent=2)}
{experience_summary}
Generate a JSON plan with this structure:
{{
"reasoning": "Explanation of your approach",
"steps": [
{{
"description": "Clear description of what this step accomplishes",
"tool": "tool_name_to_use",
"parameters": {{"param": "value"}},
"dependencies": ["step_1", "step_2"] // IDs of steps that must complete first
}}
]
}}
Guidelines:
- Break complex goals into logical, manageable steps
- Consider dependencies and optimal ordering
- Be specific about parameters
- Each step should have a clear success criteria
- Plan for error scenarios where reasonable
"""
The key insight here is that replanning isn't just about fixing failures—it's about optimization. When a step completes successfully but reveals new information (like finding additional data sources), the agent can revise its plan to take advantage of these discoveries.
Memory transforms agents from stateless tools into learning systems. But effective memory isn't just about storage—it's about retrieval, relevance ranking, and knowledge synthesis.
Our memory system needs to handle multiple types of information:
Episodic Memory: Specific execution instances with their contexts, decisions, and outcomes
Semantic Memory: General patterns and rules learned from multiple executions
Working Memory: Current context and intermediate results during active execution
import sqlite3
import pickle
import hashlib
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
class PersistentMemoryStore:
def __init__(self, db_path: str = "agent_memory.db"):
self.db_path = db_path
self._initialize_database()
def _initialize_database(self):
"""Create the database schema for agent memory."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Episodic memories - specific execution instances
cursor.execute('''
CREATE TABLE IF NOT EXISTS episodic_memories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
goal_hash TEXT,
goal TEXT,
context_data BLOB,
plan_data BLOB,
results_data BLOB,
success_rate REAL,
execution_time REAL,
created_at TIMESTAMP,
tags TEXT
)
''')
# Semantic memories - learned patterns and rules
cursor.execute('''
CREATE TABLE IF NOT EXISTS semantic_memories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pattern_type TEXT,
pattern_description TEXT,
confidence REAL,
usage_count INTEGER DEFAULT 1,
last_reinforced TIMESTAMP,
supporting_episodes TEXT,
rule_data BLOB
)
''')
# Working memory - current execution state
cursor.execute('''
CREATE TABLE IF NOT EXISTS working_memory (
session_id TEXT PRIMARY KEY,
data BLOB,
created_at TIMESTAMP,
last_accessed TIMESTAMP
)
''')
conn.commit()
conn.close()
async def store_execution_memory(self, goal: str, context: Dict[str, Any],
plan: List[AgentStep], results: Dict[str, Any],
execution_time: float) -> str:
"""Store a complete execution episode."""
goal_hash = hashlib.md5(goal.encode()).hexdigest()
success_rate = len([s for s in plan if s.status == StepStatus.COMPLETED]) / len(plan)
# Extract meaningful tags from the goal and results
tags = await self._extract_tags(goal, context, results)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO episodic_memories
(goal_hash, goal, context_data, plan_data, results_data, success_rate, execution_time, created_at, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
goal_hash,
goal,
pickle.dumps(context),
pickle.dumps([asdict(step) for step in plan]),
pickle.dumps(results),
success_rate,
execution_time,
datetime.now(),
','.join(tags)
))
episode_id = cursor.lastrowid
conn.commit()
conn.close()
# Extract semantic patterns from this execution
await self._extract_semantic_patterns(goal, context, plan, results, episode_id)
return f"episode_{episode_id}"
async def retrieve_similar_goals(self, goal: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Retrieve episodic memories of similar goals."""
goal_tokens = set(goal.lower().split())
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get all episodes from the last 90 days for relevance
cutoff_date = datetime.now() - timedelta(days=90)
cursor.execute('''
SELECT goal, context_data, results_data, success_rate, tags, created_at
FROM episodic_memories
WHERE created_at > ?
ORDER BY created_at DESC
''', (cutoff_date,))
episodes = cursor.fetchall()
conn.close()
# Calculate relevance scores based on goal similarity and success rate
scored_episodes = []
for goal_text, context_blob, results_blob, success_rate, tags, created_at in episodes:
episode_tokens = set(goal_text.lower().split())
tag_tokens = set(tags.lower().split(',')) if tags else set()
# Simple token overlap scoring with success rate weighting
token_overlap = len(goal_tokens.intersection(episode_tokens))
tag_overlap = len(goal_tokens.intersection(tag_tokens))
relevance_score = (token_overlap + tag_overlap) * success_rate
if relevance_score > 0:
scored_episodes.append({
'goal': goal_text,
'context': pickle.loads(context_blob),
'results': pickle.loads(results_blob),
'success_rate': success_rate,
'relevance_score': relevance_score,
'created_at': created_at
})
# Return top matches sorted by relevance
scored_episodes.sort(key=lambda x: x['relevance_score'], reverse=True)
return scored_episodes[:limit]
async def get_learned_patterns(self, pattern_type: str = None) -> List[Dict[str, Any]]:
"""Retrieve semantic patterns learned from past executions."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if pattern_type:
cursor.execute('''
SELECT pattern_description, confidence, usage_count, rule_data
FROM semantic_memories
WHERE pattern_type = ? AND confidence > 0.6
ORDER BY confidence DESC, usage_count DESC
''', (pattern_type,))
else:
cursor.execute('''
SELECT pattern_type, pattern_description, confidence, usage_count, rule_data
FROM semantic_memories
WHERE confidence > 0.6
ORDER BY confidence DESC, usage_count DESC
''')
patterns = []
for row in cursor.fetchall():
if pattern_type:
pattern_desc, confidence, usage_count, rule_blob = row
pattern_data = {
'description': pattern_desc,
'confidence': confidence,
'usage_count': usage_count,
'rule': pickle.loads(rule_blob) if rule_blob else None
}
else:
p_type, pattern_desc, confidence, usage_count, rule_blob = row
pattern_data = {
'type': p_type,
'description': pattern_desc,
'confidence': confidence,
'usage_count': usage_count,
'rule': pickle.loads(rule_blob) if rule_blob else None
}
patterns.append(pattern_data)
conn.close()
return patterns
async def _extract_semantic_patterns(self, goal: str, context: Dict[str, Any],
plan: List[AgentStep], results: Dict[str, Any],
episode_id: int):
"""Extract reusable patterns from successful executions."""
successful_steps = [s for s in plan if s.status == StepStatus.COMPLETED]
if len(successful_steps) < 2:
return # Need multiple successful steps to identify patterns
# Identify tool usage patterns
tool_sequence = [step.tool for step in successful_steps]
if len(tool_sequence) >= 3:
await self._record_pattern(
"tool_sequence",
f"Successful sequence: {' -> '.join(tool_sequence[:3])}",
0.7, # Initial confidence
{'sequence': tool_sequence[:3], 'context_type': self._classify_context(context)},
episode_id
)
# Identify parameter optimization patterns
for step in successful_steps:
if step.parameters and 'timeout' in step.parameters:
await self._record_pattern(
"parameter_optimization",
f"Tool {step.tool} works well with timeout={step.parameters['timeout']}",
0.6,
{'tool': step.tool, 'parameter': 'timeout', 'value': step.parameters['timeout']},
episode_id
)
def _classify_context(self, context: Dict[str, Any]) -> str:
"""Simple context classification for pattern matching."""
if 'financial' in str(context).lower():
return 'financial_analysis'
elif 'data' in str(context).lower() and 'analysis' in str(context).lower():
return 'data_analysis'
else:
return 'general'
async def _record_pattern(self, pattern_type: str, description: str,
confidence: float, rule_data: Dict[str, Any],
episode_id: int):
"""Record a learned pattern in semantic memory."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Check if similar pattern already exists
cursor.execute('''
SELECT id, confidence, usage_count, supporting_episodes
FROM semantic_memories
WHERE pattern_type = ? AND pattern_description = ?
''', (pattern_type, description))
existing = cursor.fetchone()
if existing:
# Reinforce existing pattern
pattern_id, old_confidence, usage_count, supporting_episodes = existing
new_confidence = min(1.0, old_confidence + 0.1) # Gradual confidence building
new_usage_count = usage_count + 1
episodes_list = supporting_episodes.split(',') if supporting_episodes else []
episodes_list.append(str(episode_id))
cursor.execute('''
UPDATE semantic_memories
SET confidence = ?, usage_count = ?, last_reinforced = ?, supporting_episodes = ?
WHERE id = ?
''', (new_confidence, new_usage_count, datetime.now(), ','.join(episodes_list), pattern_id))
else:
# Create new pattern
cursor.execute('''
INSERT INTO semantic_memories
(pattern_type, pattern_description, confidence, last_reinforced, supporting_episodes, rule_data)
VALUES (?, ?, ?, ?, ?, ?)
''', (pattern_type, description, confidence, datetime.now(), str(episode_id), pickle.dumps(rule_data)))
conn.commit()
conn.close()
async def _extract_tags(self, goal: str, context: Dict[str, Any],
results: Dict[str, Any]) -> List[str]:
"""Extract meaningful tags for episode categorization."""
tags = []
# Goal-based tags
goal_words = goal.lower().split()
important_words = [w for w in goal_words if len(w) > 4 and w not in ['with', 'from', 'that', 'this', 'will']]
tags.extend(important_words[:3])
# Context-based tags
if context:
context_str = str(context).lower()
if 'financial' in context_str or 'revenue' in context_str or 'profit' in context_str:
tags.append('financial')
if 'data' in context_str and ('analysis' in context_str or 'report' in context_str):
tags.append('analytics')
return list(set(tags)) # Remove duplicates
This memory system creates a feedback loop: each execution provides data that improves future planning. The semantic pattern extraction is particularly powerful—it automatically learns which tool combinations work well together and which parameters tend to succeed in specific contexts.
Real-world agents need to interact with multiple systems. Our tool registry system provides a clean abstraction for integrating diverse capabilities while maintaining consistency in how the agent interacts with them.
import aiohttp
import pandas as pd
from pathlib import Path
import asyncio
class ToolRegistry:
def __init__(self, memory_store):
self.tools = {}
self.memory_store = memory_store
def register(self, name: str, description: str):
"""Decorator for registering tools with the agent."""
def decorator(func):
self.tools[name] = {
'function': func,
'description': description,
'usage_count': 0,
'success_count': 0,
'avg_execution_time': 0.0
}
return func
return decorator
async def execute_tool(self, name: str, parameters: Dict[str, Any],
timeout: int = 300) -> Dict[str, Any]:
"""Execute a tool with performance tracking."""
if name not in self.tools:
raise ValueError(f"Tool '{name}' not found in registry")
tool_info = self.tools[name]
start_time = datetime.now()
try:
# Apply learned parameter optimizations
optimized_params = await self._apply_parameter_optimizations(name, parameters)
# Execute with timeout
result = await asyncio.wait_for(
tool_info['function'](**optimized_params),
timeout=timeout
)
# Update performance metrics
execution_time = (datetime.now() - start_time).total_seconds()
tool_info['usage_count'] += 1
tool_info['success_count'] += 1
tool_info['avg_execution_time'] = (
(tool_info['avg_execution_time'] * (tool_info['success_count'] - 1) + execution_time)
/ tool_info['success_count']
)
return {
'success': True,
'result': result,
'execution_time': execution_time,
'tool': name
}
except asyncio.TimeoutError:
tool_info['usage_count'] += 1
return {
'success': False,
'error': f"Tool '{name}' timed out after {timeout} seconds",
'tool': name
}
except Exception as e:
tool_info['usage_count'] += 1
return {
'success': False,
'error': f"Tool '{name}' failed: {str(e)}",
'tool': name
}
async def _apply_parameter_optimizations(self, tool_name: str,
parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Apply learned parameter optimizations."""
patterns = await self.memory_store.get_learned_patterns("parameter_optimization")
optimized_params = parameters.copy()
for pattern in patterns:
if (pattern.get('rule', {}).get('tool') == tool_name and
pattern['confidence'] > 0.7):
param_name = pattern['rule']['parameter']
param_value = pattern['rule']['value']
# Apply optimization if parameter isn't explicitly set
if param_name not in optimized_params:
optimized_params[param_name] = param_value
return optimized_params
# Now let's register some realistic tools
def create_financial_analysis_tools(registry: ToolRegistry):
@registry.register("web_scraper", "Scrape data from web pages with robust error handling")
async def scrape_web_data(url: str, css_selector: str = None, timeout: int = 30) -> Dict[str, Any]:
"""Scrape structured data from web pages."""
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session:
try:
async with session.get(url) as response:
if response.status != 200:
raise Exception(f"HTTP {response.status}: {await response.text()}")
content = await response.text()
# Simple extraction - in production, use BeautifulSoup or similar
if css_selector:
# Placeholder for CSS selector logic
extracted_data = {"raw_content": content[:1000]}
else:
extracted_data = {"raw_content": content}
return {
"url": url,
"status_code": response.status,
"content_length": len(content),
"extracted_data": extracted_data
}
except Exception as e:
raise Exception(f"Web scraping failed for {url}: {str(e)}")
@registry.register("financial_calculator", "Calculate financial metrics and ratios")
async def calculate_financial_metrics(data: Dict[str, float],
metrics: List[str]) -> Dict[str, float]:
"""Calculate standard financial metrics."""
results = {}
# Revenue growth rate
if "revenue_growth" in metrics and "current_revenue" in data and "previous_revenue" in data:
if data["previous_revenue"] != 0:
results["revenue_growth"] = (
(data["current_revenue"] - data["previous_revenue"]) / data["previous_revenue"]
) * 100
# Profit margin
if "profit_margin" in metrics and "net_income" in data and "revenue" in data:
if data["revenue"] != 0:
results["profit_margin"] = (data["net_income"] / data["revenue"]) * 100
# ROE (Return on Equity)
if "roe" in metrics and "net_income" in data and "shareholders_equity" in data:
if data["shareholders_equity"] != 0:
results["roe"] = (data["net_income"] / data["shareholders_equity"]) * 100
# Current ratio
if "current_ratio" in metrics and "current_assets" in data and "current_liabilities" in data:
if data["current_liabilities"] != 0:
results["current_ratio"] = data["current_assets"] / data["current_liabilities"]
return results
@registry.register("data_analyzer", "Perform statistical analysis on datasets")
async def analyze_dataset(data: List[Dict[str, Any]],
analysis_type: str = "summary") -> Dict[str, Any]:
"""Analyze datasets with various statistical methods."""
if not data:
return {"error": "Empty dataset provided"}
df = pd.DataFrame(data)
if analysis_type == "summary":
numeric_columns = df.select_dtypes(include=['number']).columns
summary = {}
for col in numeric_columns:
summary[col] = {
"mean": float(df[col].mean()),
"median": float(df[col].median()),
"std": float(df[col].std()),
"min": float(df[col].min()),
"max": float(df[col].max()),
"count": int(df[col].count())
}
return {
"summary_statistics": summary,
"total_records": len(df),
"columns": list(df.columns)
}
elif analysis_type == "trend":
# Simple trend analysis
trends = {}
numeric_columns = df.select_dtypes(include=['number']).columns
for col in numeric_columns:
if len(df) > 1:
# Calculate simple linear trend
x = list(range(len(df)))
y = df[col].values
# Simple correlation with time
correlation = pd.Series(x).corr(pd.Series(y))
trends[col] = {
"trend_direction": "increasing" if correlation > 0.1 else "decreasing" if correlation < -0.1 else "stable",
"correlation_with_time": float(correlation)
}
return {"trends": trends}
return {"error": f"Unknown analysis type: {analysis_type}"}
@registry.register("report_generator", "Generate formatted reports from analysis results")
async def generate_report(data: Dict[str, Any],
template: str = "financial_summary",
output_format: str = "markdown") -> str:
"""Generate formatted reports from analysis data."""
if template == "financial_summary" and output_format == "markdown":
report_content = f"""# Financial Analysis Report
Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## Executive Summary
"""
# Add metrics if available
if "financial_metrics" in data:
report_content += "\n### Key Financial Metrics\n"
for metric, value in data["financial_metrics"].items():
if isinstance(value, float):
report_content += f"- **{metric.replace('_', ' ').title()}**: {value:.2f}%\n"
else:
report_content += f"- **{metric.replace('_', ' ').title()}**: {value}\n"
# Add analysis results
if "analysis_results" in data:
report_content += "\n### Analysis Results\n"
results = data["analysis_results"]
if "summary_statistics" in results:
report_content += "\n#### Summary Statistics\n"
for column, stats in results["summary_statistics"].items():
report_content += f"\n**{column}:**\n"
report_content += f"- Mean: {stats['mean']:.2f}\n"
report_content += f"- Median: {stats['median']:.2f}\n"
report_content += f"- Standard Deviation: {stats['std']:.2f}\n"
report_content += f"- Range: {stats['min']:.2f} to {stats['max']:.2f}\n"
return report_content
return f"Template '{template}' with format '{output_format}' not implemented"
# Initialize the tool registry
def setup_agent_tools(memory_store) -> ToolRegistry:
registry = ToolRegistry(memory_store)
create_financial_analysis_tools(registry)
return registry
The tool registry automatically tracks performance metrics and applies learned optimizations. This means that over time, the agent gets better at using its tools—adjusting timeouts, parameter values, and retry strategies based on historical success rates.
Let's put it all together by building a complete market research agent that can analyze multiple companies, compare their financial performance, and generate insights.
import asyncio
import json
from datetime import datetime
class MarketResearchAgent(MultiStepAgent):
def __init__(self, llm_client):
memory_store = PersistentMemoryStore("market_research_memory.db")
super().__init__("market_researcher", llm_client, memory_store)
# Set up specialized tools
self.tool_registry = setup_agent_tools(memory_store)
self.planning_engine = PlanningEngine(llm_client, memory_store)
# Register tools with the agent
for name, info in self.tool_registry.tools.items():
self.register_tool(name, info['function'], info['description'])
async def research_companies(self, companies: List[str],
research_focus: str = "financial_performance") -> Dict[str, Any]:
"""Conduct comprehensive research on a list of companies."""
goal = f"Research and analyze {len(companies)} companies focusing on {research_focus}"
context = {
"companies": companies,
"focus": research_focus,
"timestamp": datetime.now().isoformat()
}
# Generate execution plan
plan = await self.planning_engine.generate_initial_plan(
goal, context, self.tools
)
self.current_plan = plan
results = {}
try:
# Execute plan with monitoring and adaptation
for step in self.current_plan:
if not self._dependencies_met(step):
continue
step.status = StepStatus.IN_PROGRESS
# Execute the step
tool_result = await self.tool_registry.execute_tool(
step.tool,
step.parameters
)
if tool_result['success']:
step.status = StepStatus.COMPLETED
step.result = tool_result['result']
step.completed_at = datetime.now()
# Store intermediate results
results[step.id] = tool_result['result']
# Check if plan needs adaptation based on results
if await self._should_adapt_plan(step, tool_result):
adapted_plan = await self._adapt_plan(step, tool_result)
self.current_plan.extend(adapted_plan)
else:
step.status = StepStatus.FAILED
step.error = tool_result['error']
# Attempt recovery
recovery_plan = await self.planning_engine.replan_from_failure(
self.current_plan, step, tool_result['error']
)
if recovery_plan:
self.current_plan = recovery_plan
self.logger.info(f"Replanned after failure in {step.description}")
# Generate final synthesis
synthesis = await self._synthesize_results(goal, results)
# Store execution memory
execution_time = sum(
(step.completed_at - step.created_at).total_seconds()
for step in self.current_plan
if step.completed_at
)
await self.memory_store.store_execution_memory(
goal, context, self.current_plan, results, execution_time
)
return {
"goal": goal,
"companies_analyzed": companies,
"results": results,
"synthesis": synthesis,
"execution_summary": {
"total_steps": len(self.current_plan),
"successful_steps": len([s for s in self.current_plan if s.status == StepStatus.COMPLETED]),
"failed_steps": len([s for s in self.current_plan if s.status == StepStatus.FAILED]),
"execution_time": execution_time
}
}
except Exception as e:
self.logger.error(f"Agent execution failed: {str(e)}")
return {"error": str(e), "partial_results": results}
def _dependencies_met(self, step: AgentStep) -> bool:
"""Check if all dependencies for a step are satisfied."""
if not step.dependencies:
return True
completed_ids = {s.id for s in self.current_plan if s.status == StepStatus.COMPLETED}
return all(dep_id in completed_ids for dep_id in step.dependencies)
async def _should_adapt_plan(self, completed_step: AgentStep,
result: Dict[str, Any]) -> bool:
"""Determine if the plan should be adapted based on step results."""
# Example: If we discover more companies during research
if (completed_step.tool == "web_scraper" and
"companies" in str(result.get('result', {})).lower()):
return True
# Example: If analysis reveals need for additional data
if (completed_step.tool == "data_analyzer" and
"insufficient_data" in str(result.get('result', {})).lower()):
return True
return False
async def _adapt_plan(self, trigger_step: AgentStep,
result: Dict[str, Any]) -> List[AgentStep]:
"""Generate additional steps based on new information."""
adaptation_context = {
"trigger_step": trigger_step.description,
"result_summary": str(result.get('result', {}))[:500],
"current_plan_status": len([s for s in self.current_plan if s.status == StepStatus.COMPLETED])
}
adaptation_prompt = f"""
PLAN ADAPTATION REQUEST
Current step revealed new information that suggests additional analysis would be valuable.
Trigger step: {trigger_step.description}
Result summary: {adaptation_context['result_summary']}
Available tools: {list(self.tools.keys())}
Generate 1-3 additional steps that would enhance the analysis based on this new information.
Return JSON with the same format as initial planning.
Focus on high-value additions that leverage the new information discovered.
"""
try:
response = await self.llm_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": adaptation_prompt}],
temperature=0.2
)
adaptation_data = json.loads(response.choices[0].message.content)
additional_steps = []
for i, step_data in enumerate(adaptation_data.get('steps', [])):
step = AgentStep(
id=f"adapted_step_{len(self.current_plan) + i + 1}",
description=step_data['description'],
tool=step_data['tool'],
parameters=step_data['parameters'],
dependencies=step_data.get('dependencies', [])
)
additional_steps.append(step)
return additional_steps
except Exception as e:
self.logger.warning(f"Plan adaptation failed: {str(e)}")
return []
async def _synthesize_results(self, goal: str,
results: Dict[str, Any]) -> Dict[str, Any]:
"""Generate insights and synthesis from execution results."""
synthesis_prompt = f"""
RESULTS SYNTHESIS
Goal: {goal}
Execution results:
{json.dumps(results, indent=2, default=str)}
Generate a comprehensive synthesis that includes:
1. Key insights discovered
2. Comparative analysis (if multiple entities analyzed)
3. Significant patterns or trends
4. Recommendations or next steps
5. Confidence levels for major conclusions
Return structured JSON with clear sections for each type of insight.
"""
try:
response = await self.llm_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": synthesis_prompt}],
temperature=0.3
)
return json.loads(response.choices[0].message.content)
except Exception as e:
return {
"synthesis_error": str(e),
"raw_results": results
}
# Example usage
async def demo_market_research():
# Mock LLM client for demonstration
class MockLLMClient:
class Chat:
class Completions:
async def create(self, **kwargs):
# Simplified mock response - in practice, use real OpenAI client
class MockResponse:
def __init__(self):
self.choices = [type('Choice', (), {
'message': type('Message', (), {
'content': json.dumps({
"reasoning": "Analyzing companies requires data collection, analysis, and reporting",
"steps": [
{
"description": "Gather financial data for target companies",
"tool": "web_scraper",
"parameters": {
"url": "https://finance.yahoo.com",
"timeout": 60
}
},
{
"description": "Calculate key financial metrics",
"tool": "financial_calculator",
"parameters": {
"metrics": ["revenue_growth", "profit_margin", "roe"]
},
"dependencies": ["step_1"]
},
{
"description": "Generate comparative analysis report",
"tool": "report_generator",
"parameters": {
"template": "financial_summary"
},
"dependencies": ["step_2"]
}
]
})
})()
})]
return MockResponse()
def __init__(self):
self.completions = self.Completions()
def __init__(self):
self.chat = self.Chat()
# Create and run the agent
agent = MarketResearchAgent(MockLLMClient())
results = await agent.research_companies(
companies=["Apple", "Microsoft", "Google"],
research_focus="financial_performance"
)
print(json.dumps(results, indent=2, default=str))
# Run the demo
if __name__ == "__main__":
asyncio.run(demo_market_research())
This complete implementation demonstrates how all the components work together. The agent can adapt its plan mid-execution, learn from each run, and handle complex multi-step workflows that would be impossible with traditional AI applications.
Memory Bloat and Performance Degradation
The most common issue is unbounded memory growth. Without proper cleanup and relevance filtering, your agent's memory can become a performance bottleneck. Implement regular cleanup routines:
async def cleanup_old_memories(self, days_threshold: int = 180):
"""Remove low-value memories older than threshold."""
cutoff_date = datetime.now() - timedelta(days=days_threshold)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Remove low-success episodes
cursor.execute('''
DELETE FROM episodic_memories
WHERE created_at < ? AND success_rate < 0.3
''', (cutoff_date,))
# Remove low-confidence semantic patterns
cursor.execute('''
DELETE FROM semantic_memories
WHERE last_reinforced < ? AND confidence < 0.5
''', (cutoff_date,))
conn.commit()
conn.close()
Infinite Planning Loops
When replanning fails repeatedly, agents can get stuck in endless planning cycles. Always implement maximum retry limits and fallback strategies:
async def execute_with_retry_limit(self, max_replans: int = 3):
replan_count = 0
while replan_count < max_replans:
try:
return await self._execute_plan()
except PlanFailure as e:
replan_count += 1
if replan_count >= max_replans:
# Switch to simplified fallback plan
return await self._execute_fallback_plan(e)
self.current_plan = await self.planning_engine.replan_from_failure(
self.current_plan, e.failed_step, e.context
)
Tool Integration Fragility
External APIs and systems fail regularly. Build resilience into every tool:
async def resilient_tool_execution(self, tool_name: str, params: Dict[str, Any]):
max_retries = 3
backoff_seconds = [1, 5, 15] # Exponential backoff
for attempt in range(max_retries):
try:
return await self.tool_registry.execute_tool(tool_name, params)
except Exception as e:
if attempt == max_retries - 1:
# Final attempt failed, try alternative approach
alternative_result = await self._try_alternative_approach(tool_name, params, e)
if alternative_result:
return alternative_result
raise e
# Wait before retry
await asyncio.sleep(backoff_seconds[attempt])
Context Window Limitations
Large plans and extensive memory can exceed LLM context windows. Implement intelligent context compression:
async def compress_context_for_llm(self, goal: str, plan: List[AgentStep],
memories: List[Dict]) -> str:
"""Compress context to fit within LLM limits while preserving key information."""
# Prioritize recent and successful memories
relevant_memories = sorted(
memories,
key=lambda m: (m['relevance_score'], m['success_rate']),
reverse=True
)[:3] # Keep only top 3
# Summarize completed steps instead of full details
completed_summaries = []
for step in plan:
if step.status == StepStatus.COMPLETED:
completed_summaries.append({
'description': step.description,
'tool': step.tool,
'success': True
})
# Build compressed context
compressed_context = {
'goal': goal,
'completed_steps': completed_summaries,
'relevant_past_experiences': [
{
'goal': m['goal'],
'success_rate': m['success_rate']
} for m in relevant_memories
]
}
return json.dumps(compressed_context, indent=2)
Multi-step AI agents represent a significant evolution in AI system design, moving beyond simple question-answering to complex workflow automation and decision-making. You now have the foundational components: a planning engine that can adapt based on results, a memory system that learns from experience, and a tool integration framework that handles real-world complexity.
The key insights from this implementation are:
Architecture Matters: Separating planning, execution, and memory concerns makes agents much more maintainable and debuggable than monolithic approaches.
Memory Enables Learning: Agents that remember and learn from past executions become dramatically more effective over time, automatically optimizing parameters and strategies.
Resilience is Critical: Production agents must handle failures gracefully with retry logic, alternative approaches, and degraded operation modes.
Context Management is Essential: As agents become more sophisticated, managing information flow and context size becomes a significant engineering challenge.
For your next steps, consider these advanced topics:
Multi-Agent Coordination: Build systems where multiple specialized agents collaborate on complex tasks, sharing workload and expertise.
Real-Time Learning: Implement online learning systems that update agent behavior during execution, not just between sessions.
Human-in-the-Loop Integration: Design interfaces where humans can intervene, guide, or validate agent decisions at critical points.
Performance Optimization: Explore parallel execution strategies, caching mechanisms, and distributed agent architectures for handling enterprise-scale workloads.
The foundation you've built here scales to handle increasingly complex scenarios—from automated business intelligence systems to sophisticated data analysis pipelines that adapt and optimize themselves over time.
Learning Path: RAG & AI Agents