
Your legal team just dumped 500 regulatory compliance documents on your desk. Your boss needs answers to complex questions spanning multiple documents by tomorrow morning. Traditional search falls short—you need something that understands context, can synthesize information across documents, and handles nuanced queries like "What are the reporting requirements for financial institutions with assets over $10 billion that operate in multiple jurisdictions?"
This is where document Q&A systems powered by embeddings transform impossible tasks into routine queries. By the end of this lesson, you'll build a production-ready system that can ingest any document corpus, understand semantic relationships between concepts, and provide accurate answers with proper source attribution.
What you'll learn:
You should have experience with Python, basic familiarity with transformer models and embeddings, and have worked with APIs. Knowledge of vector databases is helpful but not required—we'll cover the fundamentals as we build.
Document Q&A systems follow a two-phase pattern: an offline indexing phase and an online retrieval phase. Understanding this separation is crucial for building systems that scale.
During indexing, we transform unstructured documents into a queryable knowledge base. Documents get chunked into semantically coherent pieces, each chunk gets converted into a dense vector embedding, and these embeddings get stored in a vector database with metadata linking back to source documents.
During retrieval, user questions follow the same embedding process. We use vector similarity search to find the most relevant chunks, then feed both the question and retrieved context to a language model that synthesizes a final answer.
This architecture solves three critical problems: it makes semantic search possible (finding relevant information even when exact keywords don't match), it provides controllable context (you decide what information the LLM sees), and it enables source attribution (answers can cite specific documents and page numbers).
Let's build this step by step, starting with the document processing pipeline.
Real-world documents come in messy formats—PDFs with inconsistent text extraction, Word documents with embedded images, HTML with navigation cruft. Your preprocessing pipeline needs to handle this chaos while preserving semantic structure.
import os
from pathlib import Path
from typing import List, Dict, Any
import PyPDF2
import docx
from bs4 import BeautifulSoup
import tiktoken
from dataclasses import dataclass
@dataclass
class DocumentChunk:
content: str
metadata: Dict[str, Any]
chunk_id: str
class DocumentProcessor:
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.tokenizer = tiktoken.encoding_for_model("text-embedding-ada-002")
def extract_text(self, file_path: Path) -> Dict[str, Any]:
"""Extract text and metadata from various file formats."""
suffix = file_path.suffix.lower()
if suffix == '.pdf':
return self._extract_from_pdf(file_path)
elif suffix == '.docx':
return self._extract_from_docx(file_path)
elif suffix == '.html':
return self._extract_from_html(file_path)
elif suffix == '.txt':
return self._extract_from_text(file_path)
else:
raise ValueError(f"Unsupported file type: {suffix}")
def _extract_from_pdf(self, file_path: Path) -> Dict[str, Any]:
"""Extract text from PDF, preserving page structure."""
text_by_page = []
metadata = {"source": str(file_path), "type": "pdf", "pages": []}
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page_num, page in enumerate(pdf_reader.pages):
page_text = page.extract_text().strip()
if page_text: # Skip empty pages
text_by_page.append(page_text)
metadata["pages"].append({
"page_num": page_num + 1,
"start_char": sum(len(p) for p in text_by_page[:-1]),
"end_char": sum(len(p) for p in text_by_page)
})
return {
"content": "\n\n".join(text_by_page),
"metadata": metadata
}
def _extract_from_docx(self, file_path: Path) -> Dict[str, Any]:
"""Extract text from Word documents."""
doc = docx.Document(file_path)
paragraphs = []
for paragraph in doc.paragraphs:
if paragraph.text.strip():
paragraphs.append(paragraph.text)
content = "\n\n".join(paragraphs)
metadata = {
"source": str(file_path),
"type": "docx",
"paragraph_count": len(paragraphs)
}
return {"content": content, "metadata": metadata}
Notice how we preserve structural information during extraction. PDF pages, Word paragraphs, and HTML sections all become part of the metadata. This proves crucial for source attribution—users want to know not just which document contained an answer, but where exactly within that document.
The real challenge lies in chunking. Naive approaches split on sentence boundaries or fixed character counts, but this destroys semantic coherence. A paragraph about "quarterly reporting requirements" might get split, with half the context landing in one chunk and half in another.
def chunk_document(self, document: Dict[str, Any]) -> List[DocumentChunk]:
"""Create semantically coherent chunks with proper overlap."""
content = document["content"]
base_metadata = document["metadata"]
# First, split into natural sections (paragraphs, etc.)
sections = self._split_into_sections(content)
chunks = []
current_chunk = ""
current_tokens = 0
chunk_counter = 0
for section in sections:
section_tokens = len(self.tokenizer.encode(section))
# If this section alone exceeds chunk size, split it further
if section_tokens > self.chunk_size:
# Save current chunk if it exists
if current_chunk:
chunks.append(self._create_chunk(
current_chunk, base_metadata, chunk_counter
))
chunk_counter += 1
# Split the oversized section
subsection_chunks = self._split_oversized_section(
section, base_metadata, chunk_counter
)
chunks.extend(subsection_chunks)
chunk_counter += len(subsection_chunks)
current_chunk = ""
current_tokens = 0
# If adding this section would exceed chunk size
elif current_tokens + section_tokens > self.chunk_size:
# Save current chunk
if current_chunk:
chunks.append(self._create_chunk(
current_chunk, base_metadata, chunk_counter
))
chunk_counter += 1
# Start new chunk with overlap from previous
overlap_text = self._get_overlap_text(current_chunk)
current_chunk = overlap_text + "\n\n" + section if overlap_text else section
current_tokens = len(self.tokenizer.encode(current_chunk))
else:
# Add section to current chunk
if current_chunk:
current_chunk += "\n\n" + section
else:
current_chunk = section
current_tokens += section_tokens
# Handle final chunk
if current_chunk:
chunks.append(self._create_chunk(
current_chunk, base_metadata, chunk_counter
))
return chunks
def _split_into_sections(self, text: str) -> List[str]:
"""Split text into natural sections, preserving semantic units."""
# Look for double line breaks (paragraph separators)
sections = []
# Split on double newlines first
paragraphs = text.split('\n\n')
for para in paragraphs:
para = para.strip()
if not para:
continue
# Check if paragraph is too long
if len(self.tokenizer.encode(para)) > self.chunk_size // 2:
# Split long paragraphs on sentence boundaries
sentences = self._split_sentences(para)
sections.extend(sentences)
else:
sections.append(para)
return sections
def _get_overlap_text(self, text: str) -> str:
"""Extract overlap text from end of current chunk."""
tokens = self.tokenizer.encode(text)
if len(tokens) <= self.chunk_overlap:
return text
overlap_tokens = tokens[-self.chunk_overlap:]
return self.tokenizer.decode(overlap_tokens)
This chunking strategy balances several competing concerns. We want chunks large enough to provide meaningful context but small enough that embeddings remain focused. We preserve semantic boundaries while ensuring reasonable overlap so related information doesn't get isolated.
With clean, chunked documents, we need to convert them into searchable vector representations. The choice of embedding model dramatically affects retrieval quality—different models excel at different types of content and query patterns.
For general document Q&A, OpenAI's text-embedding-ada-002 offers excellent performance across domains. For specialized fields like legal or medical documents, fine-tuned models often perform better, but require significantly more infrastructure.
import openai
import numpy as np
from typing import List, Tuple
import asyncio
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential
class EmbeddingService:
def __init__(self, model: str = "text-embedding-ada-002", batch_size: int = 100):
self.model = model
self.batch_size = batch_size
self.client = openai.OpenAI()
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def get_embedding(self, text: str) -> List[float]:
"""Get embedding for a single text."""
response = self.client.embeddings.create(
input=text,
model=self.model
)
return response.data[0].embedding
async def get_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
"""Get embeddings for multiple texts efficiently."""
# Process in batches to respect rate limits
all_embeddings = []
for i in range(0, len(texts), self.batch_size):
batch = texts[i:i + self.batch_size]
try:
response = await self._async_embed_batch(batch)
embeddings = [item.embedding for item in response.data]
all_embeddings.extend(embeddings)
except Exception as e:
print(f"Batch {i//self.batch_size} failed: {e}")
# Fallback to individual embeddings
for text in batch:
try:
embedding = self.get_embedding(text)
all_embeddings.append(embedding)
except:
# Use zero vector as fallback
all_embeddings.append([0.0] * 1536) # Ada-002 dimension
return all_embeddings
async def _async_embed_batch(self, texts: List[str]):
"""Async wrapper for batch embedding requests."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.client.embeddings.create(input=texts, model=self.model)
)
The retry logic handles transient API failures, while batching maximizes throughput within rate limits. For production systems, you'll want more sophisticated error handling and potentially caching frequently requested embeddings.
Vector similarity search finds chunks semantically related to a query, but the choice of similarity metric and search algorithm affects both accuracy and performance.
from typing import Optional, Tuple
import faiss
import pickle
class VectorIndex:
def __init__(self, dimension: int = 1536, index_type: str = "flat"):
self.dimension = dimension
self.index_type = index_type
self.index = None
self.chunk_metadata = []
def build_index(self, embeddings: np.ndarray, metadata: List[Dict]):
"""Build FAISS index from embeddings."""
embeddings = np.array(embeddings, dtype=np.float32)
if self.index_type == "flat":
# Exact search, slower but most accurate
self.index = faiss.IndexFlatIP(self.dimension) # Inner product
elif self.index_type == "ivf":
# Approximate search with inverted file index
nlist = min(100, len(embeddings) // 10) # Number of clusters
quantizer = faiss.IndexFlatIP(self.dimension)
self.index = faiss.IndexIVFFlat(quantizer, self.dimension, nlist)
# Train the index
self.index.train(embeddings)
elif self.index_type == "hnsw":
# Hierarchical navigable small world graph
m = 16 # Number of bi-directional links for each node
self.index = faiss.IndexHNSWFlat(self.dimension, m)
self.index.hnsw.efConstruction = 200 # Controls index time/accuracy
# Add vectors to index
self.index.add(embeddings)
self.chunk_metadata = metadata
print(f"Built {self.index_type} index with {len(embeddings)} vectors")
def search(self, query_embedding: List[float], k: int = 5,
score_threshold: float = 0.0) -> List[Tuple[Dict, float]]:
"""Search for similar chunks."""
if self.index is None:
raise ValueError("Index not built yet")
query_vector = np.array([query_embedding], dtype=np.float32)
if self.index_type == "ivf":
# Set search parameters for IVF index
self.index.nprobe = min(10, self.index.nlist) # Number of clusters to search
elif self.index_type == "hnsw":
# Set search parameters for HNSW index
self.index.hnsw.efSearch = max(k * 2, 50) # Controls search accuracy
scores, indices = self.index.search(query_vector, k)
results = []
for idx, score in zip(indices[0], scores[0]):
if idx >= 0 and score >= score_threshold: # Valid result above threshold
results.append((self.chunk_metadata[idx], float(score)))
return results
def save(self, path: str):
"""Save index and metadata to disk."""
faiss.write_index(self.index, f"{path}.index")
with open(f"{path}.metadata", 'wb') as f:
pickle.dump(self.chunk_metadata, f)
def load(self, path: str):
"""Load index and metadata from disk."""
self.index = faiss.read_index(f"{path}.index")
with open(f"{path}.metadata", 'rb') as f:
self.chunk_metadata = pickle.load(f)
Index choice involves performance trade-offs. Flat indexes provide exact results but scale linearly with corpus size. IVF indexes offer sub-linear search time through clustering but may miss relevant results. HNSW indexes provide excellent recall with fast search times but require more memory.
Performance Tip: For document corpora under 100K chunks, start with a flat index. Beyond that, HNSW typically offers the best balance of accuracy and speed for Q&A systems.
User queries rarely match document language exactly. Someone might ask "What's the penalty for late submission?" while the document says "Entities failing to file by the prescribed deadline incur monetary sanctions." Effective query processing bridges this semantic gap.
class QueryProcessor:
def __init__(self, embedding_service: EmbeddingService, vector_index: VectorIndex):
self.embedding_service = embedding_service
self.vector_index = vector_index
self.query_cache = {} # Simple in-memory cache
def process_query(self, query: str, k: int = 5,
rerank: bool = True) -> List[Tuple[DocumentChunk, float]]:
"""Process a user query and return relevant chunks."""
# Normalize and expand query
processed_query = self._preprocess_query(query)
# Check cache first
cache_key = f"{processed_query}:{k}:{rerank}"
if cache_key in self.query_cache:
return self.query_cache[cache_key]
# Get query embedding
query_embedding = self.embedding_service.get_embedding(processed_query)
# Initial retrieval - get more candidates than needed
initial_k = k * 3 if rerank else k
candidates = self.vector_index.search(
query_embedding,
k=initial_k,
score_threshold=0.1
)
if not candidates:
return []
# Convert to DocumentChunk objects
chunks_with_scores = [
(self._metadata_to_chunk(metadata), score)
for metadata, score in candidates
]
# Optional reranking for better relevance
if rerank and len(chunks_with_scores) > k:
chunks_with_scores = self._rerank_results(query, chunks_with_scores, k)
# Cache results
final_results = chunks_with_scores[:k]
self.query_cache[cache_key] = final_results
return final_results
def _preprocess_query(self, query: str) -> str:
"""Enhance query for better retrieval."""
# Remove common question words that don't add semantic value
stop_words = {'what', 'when', 'where', 'who', 'why', 'how', 'is', 'are', 'the'}
# Simple query expansion - in production, use more sophisticated methods
expansions = {
'penalty': 'penalty fine sanction',
'requirement': 'requirement obligation must',
'deadline': 'deadline due date time limit',
'company': 'company organization entity firm business'
}
words = query.lower().split()
expanded_words = []
for word in words:
if word not in stop_words:
expanded_words.append(word)
if word in expansions:
expanded_words.extend(expansions[word].split())
return ' '.join(expanded_words)
def _rerank_results(self, query: str, candidates: List[Tuple[DocumentChunk, float]],
k: int) -> List[Tuple[DocumentChunk, float]]:
"""Rerank candidates using cross-encoder or other sophisticated scoring."""
# For this example, we'll use a simple keyword-based reranking
# In production, consider using cross-encoders like ms-marco-MiniLM
query_terms = set(query.lower().split())
def compute_rerank_score(chunk: DocumentChunk, similarity_score: float) -> float:
content_lower = chunk.content.lower()
# Keyword overlap score
content_terms = set(content_lower.split())
overlap = len(query_terms.intersection(content_terms))
keyword_score = overlap / len(query_terms) if query_terms else 0
# Position boost for title/header content
position_boost = 1.2 if any(term in content_lower[:100]
for term in query_terms) else 1.0
# Document type boost (some document types might be more authoritative)
type_boost = 1.1 if chunk.metadata.get('type') == 'pdf' else 1.0
return similarity_score * (1 + keyword_score * 0.3) * position_boost * type_boost
# Recompute scores
reranked = [
(chunk, self.compute_rerank_score(chunk, score))
for chunk, score in candidates
]
# Sort by new scores
reranked.sort(key=lambda x: x[1], reverse=True)
return reranked[:k]
This multi-stage approach first casts a wide net with vector similarity, then applies more sophisticated scoring to the top candidates. The reranking step can incorporate domain-specific signals—regulatory documents might boost content from official sources, while technical documentation might prioritize recent updates.
With relevant context retrieved, we need to synthesize coherent answers. The prompt design determines whether your system produces helpful responses or confident nonsense.
from typing import Optional
import openai
from dataclasses import dataclass
@dataclass
class QAResponse:
answer: str
sources: List[Dict[str, Any]]
confidence: float
query: str
class DocumentQASystem:
def __init__(self, query_processor: QueryProcessor, model: str = "gpt-4"):
self.query_processor = query_processor
self.model = model
self.client = openai.OpenAI()
def answer_question(self, question: str, max_context_length: int = 4000) -> QAResponse:
"""Generate an answer to a question using retrieved context."""
# Retrieve relevant chunks
relevant_chunks = self.query_processor.process_query(question, k=8)
if not relevant_chunks:
return QAResponse(
answer="I couldn't find relevant information to answer this question.",
sources=[],
confidence=0.0,
query=question
)
# Build context and sources
context_parts = []
sources = []
total_length = 0
for i, (chunk, score) in enumerate(relevant_chunks):
source_info = {
"source": chunk.metadata.get('source', 'Unknown'),
"page": chunk.metadata.get('page_num'),
"chunk_id": chunk.chunk_id,
"relevance_score": score
}
chunk_text = f"[Source {i+1}] {chunk.content}"
# Check if adding this chunk would exceed context limit
if total_length + len(chunk_text) > max_context_length:
break
context_parts.append(chunk_text)
sources.append(source_info)
total_length += len(chunk_text)
context = "\n\n".join(context_parts)
# Generate answer using carefully crafted prompt
prompt = self._build_qa_prompt(question, context)
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self._get_system_prompt()},
{"role": "user", "content": prompt}
],
temperature=0.1, # Low temperature for factual responses
max_tokens=500
)
answer = response.choices[0].message.content
confidence = self._estimate_confidence(answer, relevant_chunks)
return QAResponse(
answer=answer,
sources=sources,
confidence=confidence,
query=question
)
except Exception as e:
print(f"Error generating answer: {e}")
return QAResponse(
answer="I encountered an error while processing your question.",
sources=sources,
confidence=0.0,
query=question
)
def _get_system_prompt(self) -> str:
"""System prompt that defines the assistant's behavior."""
return """You are a helpful document analysis assistant. Your job is to answer questions based solely on the provided document excerpts.
CRITICAL INSTRUCTIONS:
- Only use information explicitly stated in the provided sources
- If the sources don't contain enough information to answer completely, say so
- When citing information, reference the source number (e.g., "According to Source 2...")
- Be precise and factual - avoid speculation or inference beyond what's directly stated
- If sources contradict each other, acknowledge the contradiction
- Structure your answers clearly with relevant details"""
def _build_qa_prompt(self, question: str, context: str) -> str:
"""Build the main QA prompt with question and context."""
return f"""Based on the following document excerpts, please answer this question: "{question}"
DOCUMENT EXCERPTS:
{context}
QUESTION: {question}
Please provide a comprehensive answer based on the information in the excerpts above. If the excerpts don't contain sufficient information to fully answer the question, please indicate what aspects cannot be answered based on the provided sources."""
def _estimate_confidence(self, answer: str, chunks: List[Tuple[DocumentChunk, float]]) -> float:
"""Estimate confidence in the answer based on various signals."""
# This is a simplified confidence estimation
# Production systems might use more sophisticated methods
base_confidence = 0.5
# Boost confidence if multiple sources agree
if len(chunks) >= 3:
base_confidence += 0.2
# Boost confidence for high similarity scores
avg_score = sum(score for _, score in chunks) / len(chunks)
if avg_score > 0.8:
base_confidence += 0.2
# Reduce confidence for hedged language
hedge_words = ['might', 'could', 'possibly', 'unclear', 'insufficient']
if any(word in answer.lower() for word in hedge_words):
base_confidence -= 0.2
# Reduce confidence for very short answers (might indicate missing info)
if len(answer.split()) < 20:
base_confidence -= 0.1
return min(max(base_confidence, 0.0), 1.0)
The system prompt is crucial—it establishes ground rules that prevent hallucination and ensure source attribution. Notice how we explicitly instruct the model to acknowledge when information is insufficient rather than fabricating answers.
Prompt Engineering Tip: Test your prompts with edge cases—ambiguous questions, contradictory sources, and completely unrelated queries. The prompt should handle these gracefully rather than producing confident wrong answers.
Moving from prototype to production requires addressing scalability, reliability, and cost concerns. Document Q&A systems can become expensive quickly without proper optimization.
import redis
import hashlib
import json
from typing import Optional
import time
from contextlib import contextmanager
class ProductionQASystem:
def __init__(self, base_qa_system: DocumentQASystem, redis_url: str = None):
self.qa_system = base_qa_system
self.redis_client = redis.from_url(redis_url) if redis_url else None
self.cache_ttl = 3600 # 1 hour cache
# Rate limiting
self.rate_limit_window = 60 # 1 minute
self.rate_limit_max = 10 # 10 requests per minute per user
# Performance monitoring
self.metrics = {
'total_queries': 0,
'cache_hits': 0,
'avg_response_time': 0.0,
'error_count': 0
}
@contextmanager
def _performance_monitor(self):
"""Context manager for tracking performance metrics."""
start_time = time.time()
try:
yield
finally:
duration = time.time() - start_time
self.metrics['total_queries'] += 1
# Update rolling average
current_avg = self.metrics['avg_response_time']
total_queries = self.metrics['total_queries']
self.metrics['avg_response_time'] = (
(current_avg * (total_queries - 1) + duration) / total_queries
)
def answer_question_cached(self, question: str, user_id: str = None) -> Optional[QAResponse]:
"""Answer question with caching and rate limiting."""
with self._performance_monitor():
# Check rate limits
if user_id and not self._check_rate_limit(user_id):
raise Exception("Rate limit exceeded")
# Generate cache key
cache_key = self._generate_cache_key(question)
# Try cache first
if self.redis_client:
cached_result = self._get_from_cache(cache_key)
if cached_result:
self.metrics['cache_hits'] += 1
return cached_result
try:
# Generate answer
response = self.qa_system.answer_question(question)
# Cache the result
if self.redis_client and response.confidence > 0.7:
self._store_in_cache(cache_key, response)
return response
except Exception as e:
self.metrics['error_count'] += 1
print(f"Error in answer generation: {e}")
return None
def _check_rate_limit(self, user_id: str) -> bool:
"""Check if user has exceeded rate limits."""
if not self.redis_client:
return True
key = f"rate_limit:{user_id}"
current_count = self.redis_client.get(key)
if current_count is None:
# First request in window
self.redis_client.setex(key, self.rate_limit_window, 1)
return True
current_count = int(current_count)
if current_count >= self.rate_limit_max:
return False
# Increment counter
self.redis_client.incr(key)
return True
def _generate_cache_key(self, question: str) -> str:
"""Generate a consistent cache key for a question."""
# Normalize question for better cache hits
normalized = question.lower().strip().replace(' ', '_')
return hashlib.md5(normalized.encode()).hexdigest()
def _get_from_cache(self, cache_key: str) -> Optional[QAResponse]:
"""Retrieve cached response."""
try:
cached_data = self.redis_client.get(f"qa:{cache_key}")
if cached_data:
data = json.loads(cached_data)
return QAResponse(**data)
except Exception as e:
print(f"Cache retrieval error: {e}")
return None
def _store_in_cache(self, cache_key: str, response: QAResponse):
"""Store response in cache."""
try:
# Convert to dict for JSON serialization
cache_data = {
'answer': response.answer,
'sources': response.sources,
'confidence': response.confidence,
'query': response.query
}
self.redis_client.setex(
f"qa:{cache_key}",
self.cache_ttl,
json.dumps(cache_data)
)
except Exception as e:
print(f"Cache storage error: {e}")
def get_metrics(self) -> Dict[str, Any]:
"""Return current performance metrics."""
cache_hit_rate = (
self.metrics['cache_hits'] / max(self.metrics['total_queries'], 1)
)
return {
**self.metrics,
'cache_hit_rate': cache_hit_rate
}
Caching is critical for both performance and cost control. High-confidence answers to common questions can be cached for hours, while lower-confidence responses might have shorter TTLs or skip caching entirely.
For large document corpora, consider these additional optimizations:
class ScalableVectorIndex:
"""Production-ready vector index with advanced features."""
def __init__(self, dimension: int = 1536):
self.dimension = dimension
self.shards = {} # Document type or date-based sharding
self.global_index = None
def build_sharded_index(self, documents_by_shard: Dict[str, List[Dict]]):
"""Build separate indexes for different document types/dates."""
for shard_name, documents in documents_by_shard.items():
embeddings = np.array([doc['embedding'] for doc in documents])
metadata = [doc['metadata'] for doc in documents]
# Use IVF-PQ for large shards (product quantization saves memory)
if len(embeddings) > 50000:
index = self._create_ivf_pq_index(embeddings)
else:
index = self._create_hnsw_index(embeddings)
self.shards[shard_name] = {
'index': index,
'metadata': metadata
}
def search_with_filters(self, query_embedding: List[float],
document_types: List[str] = None,
date_range: Tuple[str, str] = None,
k: int = 5) -> List[Tuple[Dict, float]]:
"""Search with filters to reduce search space."""
# Determine which shards to search
relevant_shards = self._filter_shards(document_types, date_range)
all_results = []
for shard_name in relevant_shards:
shard_data = self.shards[shard_name]
# Search within shard
scores, indices = shard_data['index'].search(
np.array([query_embedding], dtype=np.float32),
k * 2 # Get extra candidates from each shard
)
# Add shard results
for idx, score in zip(indices[0], scores[0]):
if idx >= 0:
metadata = shard_data['metadata'][idx].copy()
metadata['shard'] = shard_name
all_results.append((metadata, float(score)))
# Sort globally and return top k
all_results.sort(key=lambda x: x[1], reverse=True)
return all_results[:k]
def _create_ivf_pq_index(self, embeddings: np.ndarray):
"""Create memory-efficient IVF-PQ index for large datasets."""
nlist = min(4096, len(embeddings) // 39) # Heuristic for cluster count
m = 8 # Number of subquantizers
bits = 8 # Bits per subquantizer
quantizer = faiss.IndexFlatL2(self.dimension)
index = faiss.IndexIVFPQ(quantizer, self.dimension, nlist, m, bits)
# Train the quantizer
index.train(embeddings)
index.add(embeddings)
return index
Production Q&A systems require rigorous evaluation beyond simple accuracy metrics. You need to detect various failure modes and continuously monitor performance in the wild.
import pandas as pd
from typing import List, Dict
from dataclasses import dataclass
from sklearn.metrics import precision_recall_fscore_support
import re
@dataclass
class EvaluationResult:
question: str
expected_answer: str
actual_answer: str
sources_used: List[str]
relevance_score: float
faithfulness_score: float
completeness_score: float
overall_score: float
class QAEvaluator:
def __init__(self, qa_system: DocumentQASystem):
self.qa_system = qa_system
def evaluate_dataset(self, test_cases: List[Dict]) -> List[EvaluationResult]:
"""Evaluate Q&A system on a test dataset."""
results = []
for case in test_cases:
question = case['question']
expected_answer = case['expected_answer']
expected_sources = case.get('expected_sources', [])
# Generate actual answer
response = self.qa_system.answer_question(question)
# Evaluate different aspects
relevance = self._score_relevance(question, response.answer)
faithfulness = self._score_faithfulness(response.answer, response.sources)
completeness = self._score_completeness(expected_answer, response.answer)
overall = (relevance + faithfulness + completeness) / 3
results.append(EvaluationResult(
question=question,
expected_answer=expected_answer,
actual_answer=response.answer,
sources_used=[s['source'] for s in response.sources],
relevance_score=relevance,
faithfulness_score=faithfulness,
completeness_score=completeness,
overall_score=overall
))
return results
def _score_relevance(self, question: str, answer: str) -> float:
"""Score how well the answer addresses the question."""
# Extract key entities and concepts from question
question_keywords = self._extract_keywords(question)
answer_keywords = self._extract_keywords(answer)
# Simple overlap-based relevance
if not question_keywords:
return 0.5 # Neutral if no keywords extracted
overlap = len(question_keywords.intersection(answer_keywords))
relevance = overlap / len(question_keywords)
# Boost for direct question patterns
if self._contains_direct_answer_pattern(question, answer):
relevance += 0.2
return min(relevance, 1.0)
def _score_faithfulness(self, answer: str, sources: List[Dict]) -> float:
"""Score how well the answer is supported by the sources."""
if not sources:
return 0.0
# Extract claims from answer
answer_claims = self._extract_claims(answer)
# Check if claims are supported by sources
supported_claims = 0
for claim in answer_claims:
if self._claim_supported_by_sources(claim, sources):
supported_claims += 1
if not answer_claims:
return 0.5 # Neutral for very short answers
return supported_claims / len(answer_claims)
def _score_completeness(self, expected: str, actual: str) -> float:
"""Score how completely the answer addresses expected content."""
expected_concepts = self._extract_keywords(expected)
actual_concepts = self._extract_keywords(actual)
if not expected_concepts:
return 1.0 # No specific expectations
coverage = len(expected_concepts.intersection(actual_concepts))
return coverage / len(expected_concepts)
def _extract_keywords(self, text: str) -> set:
"""Extract meaningful keywords from text."""
# Remove common stopwords and extract meaningful terms
stopwords = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for'}
words = re.findall(r'\b\w+\b', text.lower())
return {word for word in words if word not in stopwords and len(word) > 2}
def generate_evaluation_report(self, results: List[EvaluationResult]) -> Dict:
"""Generate comprehensive evaluation report."""
df = pd.DataFrame([
{
'question': r.question,
'relevance': r.relevance_score,
'faithfulness': r.faithfulness_score,
'completeness': r.completeness_score,
'overall': r.overall_score
}
for r in results
])
# Aggregate statistics
report = {
'total_questions': len(results),
'avg_relevance': df['relevance'].mean(),
'avg_faithfulness': df['faithfulness'].mean(),
'avg_completeness': df['completeness'].mean(),
'avg_overall': df['overall'].mean(),
'score_distribution': df['overall'].describe().to_dict()
}
# Identify failure modes
low_relevance = df[df['relevance'] < 0.5]
low_faithfulness = df[df['faithfulness'] < 0.5]
report['failure_analysis'] = {
'low_relevance_count': len(low_relevance),
'low_faithfulness_count': len(low_faithfulness),
'sample_low_relevance': low_relevance['question'].head().tolist(),
'sample_low_faithfulness': low_faithfulness['question'].head().tolist()
}
return report
This evaluation framework catches multiple failure modes: answers that don't address the question (low relevance), answers that aren't supported by retrieved documents (low faithfulness), and answers that miss important information (low completeness).
Let's build a complete document Q&A system for a regulatory compliance use case. You'll process a corpus of financial regulations and create a system that can answer complex compliance questions.
# Complete implementation bringing everything together
import os
from pathlib import Path
import asyncio
async def build_regulatory_qa_system():
"""Build and test a complete document Q&A system."""
# Initialize components
doc_processor = DocumentProcessor(chunk_size=800, chunk_overlap=150)
embedding_service = EmbeddingService(batch_size=50)
vector_index = VectorIndex(dimension=1536, index_type="hnsw")
# Process documents
print("Processing documents...")
document_dir = Path("regulatory_docs") # Your document directory
all_chunks = []
for file_path in document_dir.glob("*.pdf"):
try:
# Extract and chunk document
document = doc_processor.extract_text(file_path)
chunks = doc_processor.chunk_document(document)
all_chunks.extend(chunks)
print(f"Processed {file_path.name}: {len(chunks)} chunks")
except Exception as e:
print(f"Error processing {file_path}: {e}")
print(f"Total chunks: {len(all_chunks)}")
# Generate embeddings
print("Generating embeddings...")
chunk_texts = [chunk.content for chunk in all_chunks]
embeddings = await embedding_service.get_embeddings_batch(chunk_texts)
# Build index
print("Building vector index...")
chunk_metadata = [chunk.metadata for chunk in all_chunks]
vector_index.build_index(embeddings, chunk_metadata)
# Save index for later use
vector_index.save("regulatory_qa_index")
# Initialize Q&A system
query_processor = QueryProcessor(embedding_service, vector_index)
qa_system = DocumentQASystem(query_processor, model="gpt-4")
# Test with sample questions
test_questions = [
"What are the reporting requirements for banks with assets over $50 billion?",
"What penalties apply for late regulatory filings?",
"How often must stress tests be conducted?",
"What documentation is required for risk management frameworks?"
]
print("\nTesting Q&A system:")
for question in test_questions:
print(f"\nQ: {question}")
response = qa_system.answer_question(question)
print(f"A: {response.answer}")
print(f"Confidence: {response.confidence:.2f}")
print(f"Sources: {len(response.sources)}")
# Show top source
if response.sources:
top_source = response.sources[0]
print(f"Top source: {top_source['source']} (score: {top_source['relevance_score']:.3f})")
return qa_system
# Run the exercise
if __name__ == "__main__":
qa_system = asyncio.run(build_regulatory_qa_system())
This exercise demonstrates the complete pipeline from document ingestion through answer generation. Try it with your own document corpus and adjust the parameters based on your specific use case.
Poor chunk boundaries: The most common error is chunking documents without considering semantic structure. Symptoms include incomplete answers that reference "the aforementioned requirements" without context. Fix this by preserving paragraph structure and using overlap between chunks.
Inadequate prompt engineering: Generic prompts produce generic answers. If your system gives vague responses like "The document mentions several requirements," your prompt isn't specific enough. Include explicit instructions about answer structure, source citation, and handling uncertainty.
Ignoring retrieval quality: High-level metrics might look good while individual queries fail badly. Monitor retrieval performance separately—are the right chunks being found? Use query logging and regular spot-checks to catch retrieval failures.
Over-reliance on similarity scores: Vector similarity doesn't always correlate with relevance for your specific domain. Consider hybrid approaches that combine semantic similarity with keyword matching, especially for technical domains with specific terminology.
Memory and performance issues: Vector indexes can consume enormous amounts of memory. If you're hitting memory limits, try:
Caching inappropriate content: Don't cache answers with low confidence scores or time-sensitive information. Set different TTLs based on content type—regulatory guidance might cache for hours, while news content should have much shorter lifespans.
Security and data leakage: Document Q&A systems can inadvertently expose sensitive information. Implement:
Debugging Tip: When answers seem wrong, trace through the entire pipeline. Check what chunks were retrieved, examine their relevance scores, and review the exact prompt sent to the language model. Often the issue is in retrieval, not generation.
You've built a production-ready document Q&A system that can handle real-world document corpora at scale. The system combines semantic search through vector embeddings with careful prompt engineering to produce accurate, well-sourced answers.
Key achievements from this lesson:
The next frontier involves several advanced techniques:
Multi-modal document understanding: Real documents contain images, tables, and complex layouts. Explore vision-language models that can process these elements alongside text.
Conversational Q&A: Extend your system to handle follow-up questions and maintain conversation context. This requires conversation memory and query disambiguation.
Active learning: Implement feedback loops where user ratings improve retrieval and generation quality over time.
Cross-lingual capabilities: For global organizations, consider embedding models that work across languages and translation strategies for multilingual document corpora.
Domain adaptation: Fine-tune embedding models on your specific document types for improved retrieval quality, especially in specialized domains like legal, medical, or technical documentation.
The architecture you've built provides a solid foundation for these extensions. Start with the use case that provides the most immediate value to your organization, and iterate based on user feedback and performance metrics.
Learning Path: Building with LLMs