
When you're staring at a mountain of PDFs, documentation, and internal knowledge scattered across your organization, the promise of RAG (Retrieval-Augmented Generation) suddenly becomes very real. Instead of having your team dig through hundreds of documents to answer questions, you want to build a system that can intelligently retrieve relevant information and generate comprehensive answers.
But here's the reality: most RAG tutorials show you how to query a few Wikipedia articles or sample datasets. What you actually need is a production-ready pipeline that can handle your messy, real-world documents—the 50-page technical specifications, the poorly formatted meeting notes, the PowerPoint presentations converted to PDF. You need something that works with your actual data, not toy examples.
By the end of this lesson, you'll have built a complete RAG pipeline that ingests your organization's documents, creates searchable embeddings, and answers questions with cited sources. You'll understand not just the "what" but the "why" behind each component, and you'll know how to troubleshoot when things inevitably go wrong.
What you'll learn:
You should be comfortable with Python programming and have basic familiarity with machine learning concepts like embeddings. Experience with APIs and databases will help, though we'll explain the specific tools as we go. You don't need deep NLP expertise—we'll build up the concepts as needed.
Before diving into code, let's map out what we're building. A RAG pipeline has three core phases: ingestion, retrieval, and generation. Think of it like a research assistant who first organizes all your documents (ingestion), then finds relevant passages when you ask a question (retrieval), and finally writes a comprehensive answer using those passages (generation).
The ingestion phase is where most production RAG systems succeed or fail. Your documents aren't clean markdown files—they're PDFs with embedded images, Word documents with complex formatting, and spreadsheets with data in unexpected places. We need a robust ingestion pipeline that can handle this reality.
Here's the architecture we'll build:
# Document Ingestion Pipeline
documents → parsing → chunking → embedding → vector_database
# Query Pipeline
question → embedding → similarity_search → context_retrieval → llm_generation → answer
The key insight is that we're not just storing documents—we're storing semantically meaningful chunks that can be retrieved independently. When someone asks "What's our data retention policy for customer emails?", we want to retrieve the specific paragraphs about email retention, not the entire 200-page privacy policy.
Let's start by building the ingestion system. We'll use a combination of libraries that handle different document types gracefully:
import os
from pathlib import Path
from typing import List, Dict, Any
import hashlib
from dataclasses import dataclass
from datetime import datetime
# Document processing libraries
import pymupdf # for PDFs
from docx import Document # for Word docs
import pandas as pd # for Excel/CSV
from bs4 import BeautifulSoup # for HTML
import tiktoken # for token counting
# Vector database and embeddings
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
@dataclass
class DocumentChunk:
"""Represents a chunk of a document with metadata."""
content: str
source_file: str
chunk_id: str
page_number: int = None
section_title: str = None
chunk_index: int = None
token_count: int = None
created_at: datetime = None
class DocumentProcessor:
"""Handles parsing of different document types."""
def __init__(self):
self.tokenizer = tiktoken.get_encoding("cl100k_base")
def process_pdf(self, file_path: Path) -> List[Dict[str, Any]]:
"""Extract text from PDF with page information."""
chunks = []
try:
pdf_document = pymupdf.open(file_path)
for page_num, page in enumerate(pdf_document):
text = page.get_text()
# Skip empty pages
if not text.strip():
continue
# Clean up common PDF artifacts
text = self._clean_pdf_text(text)
chunks.append({
'content': text,
'page_number': page_num + 1,
'source_file': str(file_path),
'document_type': 'pdf'
})
pdf_document.close()
except Exception as e:
print(f"Error processing PDF {file_path}: {str(e)}")
return chunks
def process_docx(self, file_path: Path) -> List[Dict[str, Any]]:
"""Extract text from Word documents with structure."""
chunks = []
try:
doc = Document(file_path)
current_section = None
for para in doc.paragraphs:
text = para.text.strip()
if not text:
continue
# Detect if this might be a heading
is_heading = (
para.style.name.startswith('Heading') or
len(text) < 100 and
not text.endswith('.')
)
if is_heading:
current_section = text
else:
chunks.append({
'content': text,
'section_title': current_section,
'source_file': str(file_path),
'document_type': 'docx'
})
except Exception as e:
print(f"Error processing DOCX {file_path}: {str(e)}")
return chunks
def _clean_pdf_text(self, text: str) -> str:
"""Clean common PDF extraction artifacts."""
# Remove excessive whitespace
import re
text = re.sub(r'\s+', ' ', text)
# Remove page headers/footers (simple heuristic)
lines = text.split('\n')
if len(lines) > 3:
# Remove first and last line if they're very short (likely headers/footers)
if len(lines[0]) < 50:
lines = lines[1:]
if len(lines) and len(lines[-1]) < 50:
lines = lines[:-1]
return ' '.join(lines).strip()
def count_tokens(self, text: str) -> int:
"""Count tokens in text using tiktoken."""
return len(self.tokenizer.encode(text))
This document processor handles the reality of messy documents. PDFs often have weird spacing and page artifacts, Word documents have complex formatting, and we need to preserve structural information like headings and page numbers for later citation.
Now comes the crucial part: breaking documents into chunks that preserve context while staying within token limits. Poor chunking is the #1 reason RAG systems give irrelevant answers. You want chunks that are semantically coherent—not arbitrary 500-character slices that cut sentences in half.
class IntelligentChunker:
"""Chunks documents while preserving semantic meaning."""
def __init__(self, max_tokens: int = 500, overlap_tokens: int = 50):
self.max_tokens = max_tokens
self.overlap_tokens = overlap_tokens
self.tokenizer = tiktoken.get_encoding("cl100k_base")
def chunk_documents(self, raw_chunks: List[Dict[str, Any]]) -> List[DocumentChunk]:
"""Convert raw document chunks into semantic chunks."""
semantic_chunks = []
for doc_chunk in raw_chunks:
# For each document section, create overlapping semantic chunks
section_chunks = self._create_semantic_chunks(
doc_chunk['content'],
doc_chunk
)
semantic_chunks.extend(section_chunks)
return semantic_chunks
def _create_semantic_chunks(self, text: str, metadata: Dict) -> List[DocumentChunk]:
"""Create overlapping chunks that respect sentence boundaries."""
sentences = self._split_into_sentences(text)
chunks = []
current_chunk = []
current_tokens = 0
chunk_index = 0
for sentence in sentences:
sentence_tokens = len(self.tokenizer.encode(sentence))
# If adding this sentence exceeds our limit, finalize current chunk
if current_tokens + sentence_tokens > self.max_tokens and current_chunk:
chunk_content = ' '.join(current_chunk)
chunks.append(DocumentChunk(
content=chunk_content,
source_file=metadata['source_file'],
chunk_id=self._generate_chunk_id(chunk_content, metadata['source_file']),
page_number=metadata.get('page_number'),
section_title=metadata.get('section_title'),
chunk_index=chunk_index,
token_count=current_tokens,
created_at=datetime.now()
))
# Start new chunk with overlap from previous chunk
overlap_sentences = self._get_overlap_sentences(current_chunk)
current_chunk = overlap_sentences
current_tokens = sum(len(self.tokenizer.encode(s)) for s in overlap_sentences)
chunk_index += 1
current_chunk.append(sentence)
current_tokens += sentence_tokens
# Don't forget the final chunk
if current_chunk:
chunk_content = ' '.join(current_chunk)
chunks.append(DocumentChunk(
content=chunk_content,
source_file=metadata['source_file'],
chunk_id=self._generate_chunk_id(chunk_content, metadata['source_file']),
page_number=metadata.get('page_number'),
section_title=metadata.get('section_title'),
chunk_index=chunk_index,
token_count=current_tokens,
created_at=datetime.now()
))
return chunks
def _split_into_sentences(self, text: str) -> List[str]:
"""Split text into sentences using multiple heuristics."""
import re
# First, handle common abbreviations that shouldn't trigger sentence breaks
abbreviations = r'\b(?:Dr|Mr|Mrs|Ms|Prof|Inc|Ltd|Corp|etc|vs|Ph\.D|M\.D|B\.A|M\.A)\.'
text = re.sub(abbreviations, lambda m: m.group().replace('.', '<!DOT!>'), text)
# Split on sentence endings
sentences = re.split(r'[.!?]+\s+', text)
# Restore abbreviation dots
sentences = [s.replace('<!DOT!>', '.') for s in sentences if s.strip()]
return sentences
def _get_overlap_sentences(self, sentences: List[str]) -> List[str]:
"""Get the last few sentences for overlap with next chunk."""
if not sentences:
return []
# Take last 1-2 sentences for overlap, respecting token limit
overlap = []
overlap_tokens = 0
for sentence in reversed(sentences):
sentence_tokens = len(self.tokenizer.encode(sentence))
if overlap_tokens + sentence_tokens <= self.overlap_tokens:
overlap.insert(0, sentence)
overlap_tokens += sentence_tokens
else:
break
return overlap
def _generate_chunk_id(self, content: str, source_file: str) -> str:
"""Generate a unique ID for this chunk."""
content_hash = hashlib.md5(content.encode()).hexdigest()[:8]
file_name = Path(source_file).stem
return f"{file_name}_{content_hash}"
The overlap strategy is critical here. When someone asks about "quarterly revenue projections," the answer might span multiple chunks. By overlapping chunks with 1-2 sentences, we ensure that context doesn't get lost at chunk boundaries.
Now we need to store our chunks in a way that enables fast semantic search. We'll use ChromaDB, which handles the complexities of vector storage and similarity search:
class RAGVectorStore:
"""Manages document chunks in a vector database with semantic search."""
def __init__(self, persist_directory: str = "./chroma_db"):
# Initialize ChromaDB with persistence
self.client = chromadb.PersistentClient(path=persist_directory)
# Create or get collection
self.collection = self.client.get_or_create_collection(
name="document_chunks",
metadata={"hnsw:space": "cosine"} # Use cosine similarity
)
# Initialize embedding model
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
print(f"Vector store initialized. Current chunk count: {self.collection.count()}")
def add_chunks(self, chunks: List[DocumentChunk]) -> None:
"""Add document chunks to the vector store."""
if not chunks:
return
# Prepare data for ChromaDB
documents = []
metadatas = []
ids = []
embeddings = []
print(f"Processing {len(chunks)} chunks for embedding...")
for chunk in chunks:
# Skip chunks that are too short to be meaningful
if len(chunk.content.strip()) < 50:
continue
# Generate embedding
embedding = self.embedding_model.encode(chunk.content).tolist()
documents.append(chunk.content)
embeddings.append(embedding)
ids.append(chunk.chunk_id)
# Store metadata
metadata = {
'source_file': chunk.source_file,
'page_number': chunk.page_number or 0,
'section_title': chunk.section_title or '',
'chunk_index': chunk.chunk_index or 0,
'token_count': chunk.token_count or 0,
'created_at': chunk.created_at.isoformat() if chunk.created_at else ''
}
metadatas.append(metadata)
# Add to ChromaDB
if documents:
self.collection.add(
documents=documents,
embeddings=embeddings,
metadatas=metadatas,
ids=ids
)
print(f"Added {len(documents)} chunks to vector store")
def search_similar(self, query: str, n_results: int = 5) -> List[Dict]:
"""Search for similar document chunks."""
# Generate query embedding
query_embedding = self.embedding_model.encode(query).tolist()
# Search ChromaDB
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
include=['documents', 'metadatas', 'distances']
)
# Format results
formatted_results = []
for i in range(len(results['documents'][0])):
formatted_results.append({
'content': results['documents'][0][i],
'metadata': results['metadatas'][0][i],
'similarity_score': 1 - results['distances'][0][i], # Convert distance to similarity
'chunk_id': results['ids'][0][i] if 'ids' in results else None
})
return formatted_results
def get_collection_stats(self) -> Dict:
"""Get statistics about the document collection."""
count = self.collection.count()
# Get sample of documents to analyze
sample_size = min(100, count)
sample = self.collection.get(limit=sample_size, include=['metadatas'])
if not sample['metadatas']:
return {'total_chunks': count}
# Analyze source files
source_files = [meta['source_file'] for meta in sample['metadatas']]
unique_files = len(set(source_files))
return {
'total_chunks': count,
'unique_documents': unique_files,
'average_chunks_per_doc': count / unique_files if unique_files > 0 else 0
}
The embedding model choice matters here. all-MiniLM-L6-v2 is fast and good for general text, but if you're working in a specialized domain (legal, medical, technical), consider fine-tuning or using domain-specific models.
With our documents indexed, we need a query pipeline that retrieves relevant chunks and generates comprehensive answers:
import openai
from typing import Optional
import json
class RAGQueryEngine:
"""Handles queries against the RAG system."""
def __init__(self, vector_store: RAGVectorStore, openai_api_key: str):
self.vector_store = vector_store
openai.api_key = openai_api_key
def query(self, question: str, n_contexts: int = 5, model: str = "gpt-3.5-turbo") -> Dict:
"""Answer a question using retrieved context."""
# Step 1: Retrieve relevant contexts
print(f"Searching for contexts related to: {question}")
contexts = self.vector_store.search_similar(question, n_results=n_contexts)
if not contexts:
return {
'answer': "I couldn't find any relevant information in the documents to answer your question.",
'sources': [],
'confidence': 0.0
}
# Step 2: Filter contexts by relevance threshold
relevant_contexts = [ctx for ctx in contexts if ctx['similarity_score'] > 0.3]
if not relevant_contexts:
return {
'answer': "I couldn't find sufficiently relevant information to answer your question confidently.",
'sources': [],
'confidence': 0.0
}
# Step 3: Generate answer using retrieved contexts
answer_data = self._generate_answer(question, relevant_contexts, model)
# Step 4: Add source information
answer_data['sources'] = self._format_sources(relevant_contexts)
answer_data['confidence'] = self._calculate_confidence(relevant_contexts)
return answer_data
def _generate_answer(self, question: str, contexts: List[Dict], model: str) -> Dict:
"""Generate an answer using OpenAI with retrieved contexts."""
# Prepare context string
context_str = self._format_contexts_for_prompt(contexts)
# Create the prompt
system_prompt = """You are a helpful assistant that answers questions based on provided documents.
Rules:
1. Answer based ONLY on the information provided in the contexts
2. If the contexts don't contain enough information to answer fully, say so
3. Cite specific information by referencing the source (e.g., "According to the Privacy Policy document...")
4. Be concise but comprehensive
5. If you're uncertain about any part of the answer, express that uncertainty
Do not make up information that isn't in the provided contexts."""
user_prompt = f"""Based on the following document excerpts, please answer this question: {question}
Document excerpts:
{context_str}
Question: {question}
Answer:"""
try:
response = openai.ChatCompletion.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1, # Low temperature for factual accuracy
max_tokens=500
)
answer = response.choices[0].message.content.strip()
return {
'answer': answer,
'model_used': model,
'prompt_tokens': response.usage.prompt_tokens,
'completion_tokens': response.usage.completion_tokens
}
except Exception as e:
print(f"Error generating answer: {str(e)}")
return {
'answer': "I encountered an error while generating an answer. Please try again.",
'error': str(e)
}
def _format_contexts_for_prompt(self, contexts: List[Dict]) -> str:
"""Format retrieved contexts for the LLM prompt."""
formatted_contexts = []
for i, ctx in enumerate(contexts):
source_info = self._get_source_description(ctx['metadata'])
context_block = f"""--- Context {i+1} ---
Source: {source_info}
Content: {ctx['content']}
Relevance Score: {ctx['similarity_score']:.2f}
"""
formatted_contexts.append(context_block)
return '\n'.join(formatted_contexts)
def _get_source_description(self, metadata: Dict) -> str:
"""Create a human-readable source description."""
source_file = Path(metadata['source_file']).name
parts = [source_file]
if metadata.get('page_number'):
parts.append(f"page {metadata['page_number']}")
if metadata.get('section_title'):
parts.append(f"section '{metadata['section_title']}'")
return ', '.join(parts)
def _format_sources(self, contexts: List[Dict]) -> List[Dict]:
"""Format source information for the response."""
sources = []
for ctx in contexts:
source = {
'file': Path(ctx['metadata']['source_file']).name,
'page': ctx['metadata'].get('page_number'),
'section': ctx['metadata'].get('section_title'),
'relevance_score': round(ctx['similarity_score'], 2),
'excerpt': ctx['content'][:200] + "..." if len(ctx['content']) > 200 else ctx['content']
}
sources.append(source)
return sources
def _calculate_confidence(self, contexts: List[Dict]) -> float:
"""Calculate confidence score based on context relevance."""
if not contexts:
return 0.0
# Average similarity score, weighted by position (earlier results are more important)
weighted_scores = []
for i, ctx in enumerate(contexts):
weight = 1.0 / (i + 1) # Decreasing weight for later results
weighted_scores.append(ctx['similarity_score'] * weight)
confidence = sum(weighted_scores) / sum(1.0 / (i + 1) for i in range(len(contexts)))
return round(confidence, 2)
This query engine does several important things: it filters out low-relevance results, provides source attribution, and calculates a confidence score. The confidence score helps users understand how reliable the answer might be.
Now let's put it all together into a complete system you can use with your own documents. Create this main pipeline class:
class DocumentRAGPipeline:
"""Complete RAG pipeline for your documents."""
def __init__(self, openai_api_key: str, persist_directory: str = "./rag_system"):
self.persist_directory = Path(persist_directory)
self.persist_directory.mkdir(exist_ok=True)
# Initialize components
self.processor = DocumentProcessor()
self.chunker = IntelligentChunker(max_tokens=400, overlap_tokens=50)
self.vector_store = RAGVectorStore(str(self.persist_directory / "vector_db"))
self.query_engine = RAGQueryEngine(self.vector_store, openai_api_key)
print("RAG Pipeline initialized successfully")
def ingest_documents(self, document_directory: str) -> None:
"""Ingest all supported documents from a directory."""
doc_path = Path(document_directory)
if not doc_path.exists():
print(f"Directory {document_directory} does not exist")
return
# Find supported files
supported_extensions = {'.pdf', '.docx', '.txt'}
files_to_process = []
for ext in supported_extensions:
files_to_process.extend(doc_path.glob(f"**/*{ext}"))
print(f"Found {len(files_to_process)} documents to process")
all_chunks = []
for file_path in files_to_process:
print(f"Processing: {file_path.name}")
try:
# Process based on file type
if file_path.suffix.lower() == '.pdf':
raw_chunks = self.processor.process_pdf(file_path)
elif file_path.suffix.lower() == '.docx':
raw_chunks = self.processor.process_docx(file_path)
elif file_path.suffix.lower() == '.txt':
# Simple text file processing
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
raw_chunks = [{
'content': content,
'source_file': str(file_path),
'document_type': 'txt'
}]
else:
continue
# Chunk the document
if raw_chunks:
chunks = self.chunker.chunk_documents(raw_chunks)
all_chunks.extend(chunks)
print(f" Created {len(chunks)} chunks")
except Exception as e:
print(f" Error processing {file_path}: {str(e)}")
# Add all chunks to vector store
if all_chunks:
self.vector_store.add_chunks(all_chunks)
print(f"\nIngestion complete! Total chunks: {len(all_chunks)}")
else:
print("No chunks were created. Check your documents and try again.")
def ask_question(self, question: str, verbose: bool = True) -> Dict:
"""Ask a question and get an answer with sources."""
if verbose:
print(f"\nQuestion: {question}")
print("Searching documents...")
result = self.query_engine.query(question)
if verbose:
print(f"\nAnswer: {result['answer']}")
print(f"Confidence: {result['confidence']}")
if result['sources']:
print(f"\nSources ({len(result['sources'])}):")
for i, source in enumerate(result['sources'], 1):
print(f"{i}. {source['file']}")
if source['page']:
print(f" Page {source['page']}")
if source['section']:
print(f" Section: {source['section']}")
print(f" Relevance: {source['relevance_score']}")
print()
return result
def get_system_stats(self) -> Dict:
"""Get statistics about the RAG system."""
stats = self.vector_store.get_collection_stats()
return stats
# Usage example
if __name__ == "__main__":
# Initialize the pipeline
rag = DocumentRAGPipeline(
openai_api_key="your-openai-api-key-here",
persist_directory="./my_rag_system"
)
# Ingest your documents
rag.ingest_documents("./my_documents")
# Ask questions
rag.ask_question("What is our company's data retention policy?")
rag.ask_question("How do we handle customer complaints?")
rag.ask_question("What are the technical requirements for our API?")
# Get system statistics
stats = rag.get_system_stats()
print(f"\nSystem Stats: {stats}")
To test this with your own documents:
my_documents and add some PDFs, Word docs, or text filesTip: Start with a small set of documents (5-10 files) to test the system, then scale up. This makes debugging much easier.
Problem: Answers are irrelevant or off-topic
This usually means your chunking strategy isn't preserving enough context. Try:
Problem: The system can't find information you know is in the documents
Check your document processing pipeline:
# Debug by examining what chunks were created
chunks = rag.vector_store.collection.get(limit=10, include=['documents', 'metadatas'])
for i, chunk in enumerate(chunks['documents']):
print(f"Chunk {i}: {chunk[:200]}...")
print(f"Metadata: {chunks['metadatas'][i]}")
print()
Common issues:
pdfplumber)pytesseract)Problem: Slow query performance
Vector search should be fast, but if you're seeing slowdowns:
Problem: High costs from OpenAI API
Monitor your token usage and optimize:
# Track API costs
def calculate_cost(prompt_tokens, completion_tokens, model="gpt-3.5-turbo"):
if model == "gpt-3.5-turbo":
prompt_cost = prompt_tokens * 0.001 / 1000 # $0.001 per 1K tokens
completion_cost = completion_tokens * 0.002 / 1000 # $0.002 per 1K tokens
return prompt_cost + completion_cost
return 0
Optimization strategies:
gpt-3.5-turbo instead of gpt-4 for most queriesOnce your system is running, you need to evaluate how well it's performing. Here's a framework for systematic evaluation:
class RAGEvaluator:
"""Evaluates RAG system performance."""
def __init__(self, rag_pipeline: DocumentRAGPipeline):
self.rag = rag_pipeline
self.test_questions = []
self.results = []
def create_test_set(self, questions_and_expected: List[Dict]) -> None:
"""Create a test set with questions and expected answer elements."""
self.test_questions = questions_and_expected
def evaluate_retrieval(self, question: str, expected_sources: List[str]) -> Dict:
"""Evaluate how well the system retrieves relevant documents."""
contexts = self.rag.vector_store.search_similar(question, n_results=5)
retrieved_files = {Path(ctx['metadata']['source_file']).name
for ctx in contexts}
expected_files = set(expected_sources)
# Calculate retrieval metrics
relevant_retrieved = len(retrieved_files.intersection(expected_files))
precision = relevant_retrieved / len(retrieved_files) if retrieved_files else 0
recall = relevant_retrieved / len(expected_files) if expected_files else 0
return {
'precision': precision,
'recall': recall,
'f1': 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0,
'retrieved_sources': list(retrieved_files),
'expected_sources': list(expected_files)
}
def run_evaluation(self) -> Dict:
"""Run complete evaluation on test set."""
if not self.test_questions:
print("No test questions defined. Use create_test_set() first.")
return {}
results = {
'retrieval_metrics': [],
'answer_quality': [],
'response_times': []
}
for test_case in self.test_questions:
question = test_case['question']
expected_sources = test_case.get('expected_sources', [])
# Time the query
import time
start_time = time.time()
answer_data = self.rag.ask_question(question, verbose=False)
response_time = time.time() - start_time
# Evaluate retrieval
if expected_sources:
retrieval_metrics = self.evaluate_retrieval(question, expected_sources)
results['retrieval_metrics'].append(retrieval_metrics)
# Store results
results['response_times'].append(response_time)
results['answer_quality'].append({
'question': question,
'answer': answer_data['answer'],
'confidence': answer_data['confidence'],
'sources_count': len(answer_data['sources'])
})
# Calculate averages
if results['retrieval_metrics']:
avg_precision = sum(r['precision'] for r in results['retrieval_metrics']) / len(results['retrieval_metrics'])
avg_recall = sum(r['recall'] for r in results['retrieval_metrics']) / len(results['retrieval_metrics'])
avg_f1 = sum(r['f1'] for r in results['retrieval_metrics']) / len(results['retrieval_metrics'])
results['average_metrics'] = {
'precision': avg_precision,
'recall': avg_recall,
'f1': avg_f1,
'avg_response_time': sum(results['response_times']) / len(results['response_times'])
}
return results
# Example usage
evaluator = RAGEvaluator(rag)
# Define test cases
test_cases = [
{
'question': 'What is our data retention policy?',
'expected_sources': ['privacy_policy.pdf', 'data_governance.docx']
},
{
'question': 'How do we handle API rate limiting?',
'expected_sources': ['api_documentation.pdf']
}
]
evaluator.create_test_set(test_cases)
evaluation_results = evaluator.run_evaluation()
This evaluation framework helps you systematically improve your RAG system by identifying where retrieval fails and tracking performance over time.
You've now built a production-ready RAG pipeline that can handle real-world documents and answer questions with proper source attribution. Your system includes document ingestion for multiple formats, intelligent chunking that preserves context, semantic search with vector databases, and a query pipeline that generates accurate answers.
The key insights from this implementation:
Next steps to enhance your RAG system:
Your RAG pipeline is now ready to handle your organization's documents and provide intelligent, source-backed answers to user questions. The foundation you've built can scale from hundreds to thousands of documents with the right infrastructure choices.
Learning Path: RAG & AI Agents