
You've built a RAG pipeline. Your vector store is populated, your embeddings are solid, and your retriever pulls back the top-k chunks. But something is still off — the answers your LLM generates are occasionally wrong, sometimes confidently so, and when you trace the problem back, you find that the most relevant chunk wasn't in the first position. It was fourth. Or seventh. Or it wasn't in the top-k at all because the semantic similarity score wasn't high enough, even though the chunk clearly contained the answer.
This is the retrieval quality problem, and it's one of the most common reasons RAG systems underperform in production. Bi-encoder models — the embedding models you use to generate vectors — are designed for speed at scale. They compress documents into fixed-size vectors independently of the query, which means they can't capture fine-grained, query-specific relevance signals. They're excellent at narrowing a million documents down to a hundred candidates. But they're not great at telling you which of those hundred candidates is actually the best answer to this specific question.
That's where cross-encoders and reranking come in. By the end of this lesson, you'll have a complete, production-ready reranking layer that plugs into your existing RAG pipeline. You'll understand exactly why it improves accuracy, when to use it, what it costs you, and how to avoid the failure modes that trip up practitioners implementing this for the first time.
What you'll learn:
initial_k) and reranking cutoff (final_k) for accuracy/latency tradeoffsThis lesson assumes you're comfortable with:
pip packagesYou don't need a deep background in transformer architecture, though understanding the bi-encoder vs. cross-encoder distinction at a high level will help — we'll cover that clearly below.
Let's be precise about what bi-encoders optimize for. When you embed a query and a document with a model like text-embedding-3-small or sentence-transformers/all-MiniLM-L6-v2, the model produces a vector for each independently, then cosine similarity does the ranking work. This is the bi-encoder architecture: two separate encoding passes, no interaction between query and document.
The problem is information loss. When you encode a document independently of the query, you're producing a general-purpose semantic fingerprint. That fingerprint captures what the document is roughly about, but it can't capture subtle, query-specific signals. Consider these two chunks from a pharmaceutical dataset:
Chunk A: "Metformin is commonly prescribed for type 2 diabetes and has a well-established safety profile in patients with normal renal function."
Chunk B: "Renal impairment significantly affects metformin clearance; patients with eGFR below 30 mL/min/1.73m² should not use metformin due to risk of lactic acidosis."
For the query "Can patients with kidney disease take metformin?", Chunk B is unambiguously more relevant. But both chunks mention metformin and renal function, so their embedding vectors may be quite close in cosine space. A bi-encoder often ranks Chunk A higher simply because it's a more generic, complete-sounding statement that happens to match more tokens in the query semantically.
A cross-encoder, by contrast, takes the query and document together as a single input — [CLS] query [SEP] document [SEP] — and runs a full attention pass over both. Every token in the query can attend to every token in the document. This captures the nuanced signal that Chunk B directly addresses the contraindication the query is asking about. The cross-encoder outputs a single relevance score, not a vector.
The tradeoff is immediate and obvious: you cannot pre-compute cross-encoder scores. You must run the model at query time, for every query-document pair. This makes cross-encoders completely impractical as a first-stage retriever across a large corpus. But as a second-stage reranker over 20–100 candidates? Entirely feasible.
The classic two-stage architecture: Stage 1 — use a bi-encoder to retrieve
initial_kcandidates fast. Stage 2 — use a cross-encoder to rerank those candidates, then pass the topfinal_kto the LLM. This is the pattern we're implementing today.
Let's establish a complete working environment. We'll use a realistic scenario: a customer support knowledge base for a B2B SaaS product, stored in ChromaDB, with documents covering API documentation, pricing policies, troubleshooting guides, and SLA terms.
pip install sentence-transformers chromadb langchain langchain-openai cohere tiktoken
Our project structure:
rag_with_reranking/
├── ingest.py # Load and embed documents
├── retriever.py # Bi-encoder retrieval layer
├── reranker.py # Cross-encoder reranking layer
├── pipeline.py # End-to-end RAG pipeline
├── evaluate.py # Measure reranking gains
└── data/
└── kb_chunks.jsonl # Your chunked knowledge base
Let's start by building the retrieval foundation, then layer reranking on top.
Here's a clean retriever class that wraps ChromaDB and handles the initial fetch of candidates:
# retriever.py
import chromadb
from chromadb.utils import embedding_functions
from typing import List, Dict, Any
class BiEncoderRetriever:
def __init__(
self,
collection_name: str = "support_kb",
embedding_model: str = "all-MiniLM-L6-v2",
persist_directory: str = "./chroma_db"
):
self.client = chromadb.PersistentClient(path=persist_directory)
self.embedding_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=embedding_model
)
self.collection = self.client.get_collection(
name=collection_name,
embedding_function=self.embedding_fn
)
def retrieve(self, query: str, initial_k: int = 20) -> List[Dict[str, Any]]:
"""
Retrieve initial_k candidates using vector similarity.
Returns a list of dicts with 'text', 'metadata', and 'score'.
"""
results = self.collection.query(
query_texts=[query],
n_results=initial_k,
include=["documents", "metadatas", "distances"]
)
candidates = []
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
):
candidates.append({
"text": doc,
"metadata": meta,
"bi_encoder_score": 1 - dist # Convert L2 distance to similarity
})
return candidates
Notice we're fetching initial_k=20 rather than the 3–5 chunks you'd typically pass to an LLM. We're casting a wide net deliberately — the whole point is to give the reranker enough candidates to work with. If the right chunk isn't in your initial 20, reranking can't save you.
Critical insight: Your
initial_kmust be large enough to include the truly relevant documents with high probability. If your bi-encoder puts the right answer at rank 22 and you only retrieve 20, you've lost before reranking even starts. In practice, 20–50 is a reasonable range; the right number depends on your corpus and how spread-out relevant information is.
Now for the core of this lesson. We'll implement two options: a local cross-encoder using sentence-transformers, and the Cohere Rerank API. Both have legitimate production use cases.
# reranker.py
from sentence_transformers import CrossEncoder
from typing import List, Dict, Any
import numpy as np
class LocalCrossEncoderReranker:
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
"""
Good local models for reranking:
- cross-encoder/ms-marco-MiniLM-L-6-v2 (fast, good quality)
- cross-encoder/ms-marco-MiniLM-L-12-v2 (slower, better quality)
- BAAI/bge-reranker-base (strong multilingual option)
- BAAI/bge-reranker-large (best quality, expensive)
"""
self.model = CrossEncoder(model_name, max_length=512)
self.model_name = model_name
def rerank(
self,
query: str,
candidates: List[Dict[str, Any]],
final_k: int = 5
) -> List[Dict[str, Any]]:
"""
Score each (query, document) pair and return top final_k results.
"""
if not candidates:
return []
# Build input pairs for the cross-encoder
input_pairs = [(query, candidate["text"]) for candidate in candidates]
# Get relevance scores — these are raw logits, not probabilities
scores = self.model.predict(input_pairs, show_progress_bar=False)
# Attach scores to candidates
for candidate, score in zip(candidates, scores):
candidate["rerank_score"] = float(score)
# Sort by rerank score descending and return top final_k
reranked = sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)
return reranked[:final_k]
def rerank_with_diagnostics(
self,
query: str,
candidates: List[Dict[str, Any]],
final_k: int = 5
) -> Dict[str, Any]:
"""
Extended version that also returns rank change information —
useful for evaluating how much reranking is actually helping.
"""
if not candidates:
return {"results": [], "rank_changes": []}
input_pairs = [(query, candidate["text"]) for candidate in candidates]
scores = self.model.predict(input_pairs, show_progress_bar=False)
ranked_candidates = []
for original_rank, (candidate, score) in enumerate(zip(candidates, scores)):
ranked_candidates.append({
**candidate,
"rerank_score": float(score),
"original_rank": original_rank
})
reranked = sorted(ranked_candidates, key=lambda x: x["rerank_score"], reverse=True)
rank_changes = []
for new_rank, candidate in enumerate(reranked[:final_k]):
rank_changes.append({
"text_preview": candidate["text"][:100],
"original_rank": candidate["original_rank"],
"new_rank": new_rank,
"delta": candidate["original_rank"] - new_rank
})
return {
"results": reranked[:final_k],
"rank_changes": rank_changes
}
A few things to note here. First, CrossEncoder.predict() returns raw logits by default for MS-MARCO models — they're trained with pointwise binary relevance labels (relevant/not relevant), so the raw score is meaningful for ranking but shouldn't be interpreted as a calibrated probability without additional processing. Second, the max_length=512 limit means very long chunks will be truncated. If your chunks are 1000+ tokens, either shorten them during ingestion or use a model with a longer context window like BAAI/bge-reranker-large.
The Cohere Rerank API is a strong choice when you want to avoid the GPU/CPU overhead of running a local model, or when you're working in environments where model hosting is constrained.
# reranker.py (extended with Cohere option)
import cohere
from typing import List, Dict, Any
class CohereReranker:
def __init__(self, api_key: str, model: str = "rerank-english-v3.0"):
"""
Available models:
- rerank-english-v3.0 (best English quality)
- rerank-multilingual-v3.0 (for non-English or mixed corpora)
- rerank-english-v2.0 (legacy, still solid)
"""
self.client = cohere.Client(api_key)
self.model = model
def rerank(
self,
query: str,
candidates: List[Dict[str, Any]],
final_k: int = 5
) -> List[Dict[str, Any]]:
"""
Rerank candidates using Cohere's API.
Returns top final_k results with relevance scores.
"""
if not candidates:
return []
documents = [c["text"] for c in candidates]
response = self.client.rerank(
model=self.model,
query=query,
documents=documents,
top_n=final_k,
return_documents=True
)
reranked = []
for result in response.results:
original_candidate = candidates[result.index]
reranked.append({
**original_candidate,
"rerank_score": result.relevance_score,
"original_rank": result.index
})
return reranked
Cohere API note: Cohere Rerank v3 accepts up to 1000 documents per request and handles long documents better than many local models. The
relevance_scoreis a float between 0 and 1 and is reasonably calibrated — scores above 0.7 are typically quite relevant, below 0.3 typically not. This is useful for score-based filtering in addition to rank-based cutoffs.
Now let's wire everything together into a production-grade RAG pipeline with reranking built in:
# pipeline.py
from typing import List, Dict, Any, Optional
from openai import OpenAI
from retriever import BiEncoderRetriever
from reranker import LocalCrossEncoderReranker, CohereReranker
class RAGPipelineWithReranking:
def __init__(
self,
retriever: BiEncoderRetriever,
reranker, # LocalCrossEncoderReranker or CohereReranker
openai_api_key: str,
initial_k: int = 20,
final_k: int = 5,
score_threshold: Optional[float] = None
):
self.retriever = retriever
self.reranker = reranker
self.llm_client = OpenAI(api_key=openai_api_key)
self.initial_k = initial_k
self.final_k = final_k
self.score_threshold = score_threshold
def _build_context(self, reranked_docs: List[Dict[str, Any]]) -> str:
"""
Format the top documents into a context block for the LLM.
Includes source metadata so the LLM can cite its sources.
"""
context_parts = []
for i, doc in enumerate(reranked_docs, 1):
source = doc["metadata"].get("source", "unknown")
section = doc["metadata"].get("section", "")
header = f"[Document {i} | Source: {source}"
if section:
header += f" | Section: {section}"
header += "]"
context_parts.append(f"{header}\n{doc['text']}")
return "\n\n---\n\n".join(context_parts)
def _filter_by_threshold(
self,
reranked_docs: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Optionally remove documents that fall below a minimum relevance score.
Prevents the LLM from receiving low-quality context.
"""
if self.score_threshold is None:
return reranked_docs
return [d for d in reranked_docs if d.get("rerank_score", 0) >= self.score_threshold]
def answer(self, query: str, verbose: bool = False) -> Dict[str, Any]:
"""
Full RAG pipeline: retrieve -> rerank -> filter -> generate.
"""
# Stage 1: Broad retrieval
candidates = self.retriever.retrieve(query, initial_k=self.initial_k)
if verbose:
print(f"Stage 1: Retrieved {len(candidates)} candidates")
print(f" Top bi-encoder match: {candidates[0]['text'][:120]}...")
# Stage 2: Rerank
reranked = self.reranker.rerank(query, candidates, final_k=self.final_k)
# Stage 3: Optional score threshold filtering
filtered = self._filter_by_threshold(reranked)
if not filtered:
return {
"answer": "I couldn't find sufficiently relevant information to answer this question confidently.",
"sources": [],
"retrieved_count": len(candidates),
"used_count": 0
}
if verbose:
print(f"Stage 2: Reranked to {len(reranked)}, after filtering: {len(filtered)}")
print(f" Top reranked match: {filtered[0]['text'][:120]}...")
# Stage 4: Generate answer
context = self._build_context(filtered)
system_prompt = """You are a helpful customer support assistant.
Answer the user's question using only the provided context documents.
If the context doesn't contain enough information to answer fully, say so explicitly.
Cite the document numbers when you reference specific information."""
user_message = f"""Context:
{context}
Question: {query}
Answer:"""
response = self.llm_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
],
temperature=0.1
)
return {
"answer": response.choices[0].message.content,
"sources": [d["metadata"] for d in filtered],
"rerank_scores": [d.get("rerank_score") for d in filtered],
"retrieved_count": len(candidates),
"used_count": len(filtered)
}
Let's run it against a realistic query:
# main.py
import os
from retriever import BiEncoderRetriever
from reranker import LocalCrossEncoderReranker
from pipeline import RAGPipelineWithReranking
retriever = BiEncoderRetriever(
collection_name="support_kb",
persist_directory="./chroma_db"
)
reranker = LocalCrossEncoderReranker(
model_name="cross-encoder/ms-marco-MiniLM-L-6-v2"
)
pipeline = RAGPipelineWithReranking(
retriever=retriever,
reranker=reranker,
openai_api_key=os.environ["OPENAI_API_KEY"],
initial_k=20,
final_k=5,
score_threshold=None # Disable threshold initially
)
result = pipeline.answer(
query="What happens to my data if I downgrade from Enterprise to Pro plan mid-billing cycle?",
verbose=True
)
print(result["answer"])
print(f"\nSources used: {result['used_count']} of {result['retrieved_count']} retrieved")
print(f"Rerank scores: {[f'{s:.3f}' for s in result['rerank_scores']]}")
Intuition isn't enough. You need to measure whether reranking is actually helping before you deploy it. Here's a practical evaluation harness using nDCG@k (Normalized Discounted Cumulative Gain), the standard metric for ranking quality.
# evaluate.py
import json
import numpy as np
from typing import List, Dict, Tuple
def ndcg_at_k(ranked_docs: List[Dict], relevant_doc_ids: List[str], k: int) -> float:
"""
Compute nDCG@k.
- ranked_docs: ordered list of retrieved documents, each with a 'doc_id' in metadata
- relevant_doc_ids: ground truth set of relevant document IDs
- k: cutoff position
"""
def dcg(relevances):
return sum(
rel / np.log2(i + 2)
for i, rel in enumerate(relevances[:k])
)
# Score each position: 1 if relevant, 0 if not
relevances = [
1 if doc["metadata"].get("doc_id") in relevant_doc_ids else 0
for doc in ranked_docs[:k]
]
actual_dcg = dcg(relevances)
# Ideal DCG: put all relevant docs first
ideal_relevances = sorted(relevances, reverse=True)
ideal_dcg = dcg(ideal_relevances)
if ideal_dcg == 0:
return 0.0
return actual_dcg / ideal_dcg
def evaluate_pipeline(
test_cases: List[Dict], # Each has 'query' and 'relevant_doc_ids'
retriever,
reranker,
initial_k: int = 20,
final_k: int = 5
) -> Dict:
"""
Compare retrieval-only vs retrieval+reranking on a test set.
"""
retrieval_only_ndcgs = []
reranked_ndcgs = []
rank_changes_summary = []
for case in test_cases:
query = case["query"]
relevant_ids = case["relevant_doc_ids"]
# Stage 1: Retrieve
candidates = retriever.retrieve(query, initial_k=initial_k)
# Evaluate retrieval-only (top final_k by bi-encoder)
retrieval_top_k = candidates[:final_k]
retrieval_ndcg = ndcg_at_k(retrieval_top_k, relevant_ids, final_k)
retrieval_only_ndcgs.append(retrieval_ndcg)
# Stage 2: Rerank
reranked = reranker.rerank_with_diagnostics(query, candidates, final_k=final_k)
reranked_ndcg = ndcg_at_k(reranked["results"], relevant_ids, final_k)
reranked_ndcgs.append(reranked_ndcg)
# Track rank changes for the relevant documents
for change in reranked["rank_changes"]:
if any(
candidates[change["original_rank"]]["metadata"].get("doc_id") in relevant_ids
for _ in [1]
):
rank_changes_summary.append(change["delta"])
return {
"retrieval_only_ndcg_at_k": np.mean(retrieval_only_ndcgs),
"reranked_ndcg_at_k": np.mean(reranked_ndcgs),
"improvement": np.mean(reranked_ndcgs) - np.mean(retrieval_only_ndcgs),
"improvement_pct": (
(np.mean(reranked_ndcgs) - np.mean(retrieval_only_ndcgs))
/ max(np.mean(retrieval_only_ndcgs), 1e-9) * 100
),
"avg_rank_improvement": np.mean(rank_changes_summary) if rank_changes_summary else 0,
"n_queries": len(test_cases)
}
# Example usage with a small hand-labeled test set
test_cases = [
{
"query": "What is the data retention policy after account cancellation?",
"relevant_doc_ids": ["policy_doc_42", "tos_section_7"]
},
{
"query": "How do I configure SAML SSO for enterprise accounts?",
"relevant_doc_ids": ["sso_setup_guide", "enterprise_admin_doc"]
},
{
"query": "What happens to API rate limits during free trial?",
"relevant_doc_ids": ["api_limits_doc", "trial_terms"]
}
]
Building your test set: Creating even 50 hand-labeled query/relevant-document pairs will give you more confidence in deployment than any amount of intuition. A practical approach: pull your support ticket logs, find the tickets that got resolved, and use the resolution documentation as ground truth relevance labels.
When you run this evaluation on a representative sample, you'll typically see nDCG@5 improvements of 8–20% in domains with precise factual content (legal, medical, technical documentation). In more conversational or general-knowledge corpora, the gains are smaller. Knowing your actual improvement before deploying is what separates a production engineer from someone who assumes it works.
Reranking adds latency. Let's be honest about that and handle it intelligently.
import time
import statistics
def benchmark_reranker(reranker, query: str, candidates, iterations: int = 20):
latencies = []
for _ in range(iterations):
start = time.perf_counter()
reranker.rerank(query, candidates, final_k=5)
latencies.append((time.perf_counter() - start) * 1000) # ms
return {
"mean_ms": statistics.mean(latencies),
"p50_ms": statistics.median(latencies),
"p95_ms": statistics.quantiles(latencies, n=20)[18], # 95th percentile
"p99_ms": statistics.quantiles(latencies, n=100)[98]
}
Typical numbers on a modern CPU with ms-marco-MiniLM-L-6-v2 and 20 candidates:
On GPU, this drops to ~20–40ms. The Cohere API typically adds 100–300ms of network latency but is consistent regardless of hardware.
If your application can stream the LLM response while reranking happens, you can hide some of the latency:
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncReranker:
def __init__(self, reranker, max_workers: int = 4):
self.reranker = reranker
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def rerank_async(
self,
query: str,
candidates: list,
final_k: int = 5
) -> list:
"""
Run the blocking cross-encoder in a thread pool to avoid
blocking the event loop in async applications.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
lambda: self.reranker.rerank(query, candidates, final_k)
)
For applications with repeated queries (e.g., a customer support tool where many users ask similar questions), caching rerank results can dramatically reduce load:
import hashlib
import json
from functools import lru_cache
from typing import Tuple
class CachedReranker:
def __init__(self, reranker, cache_size: int = 1000):
self.reranker = reranker
self._cache = {}
self.cache_size = cache_size
def _make_cache_key(self, query: str, candidate_texts: Tuple[str]) -> str:
content = json.dumps({"query": query, "docs": list(candidate_texts)}, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()
def rerank(self, query: str, candidates: list, final_k: int = 5) -> list:
candidate_texts = tuple(c["text"] for c in candidates)
cache_key = self._make_cache_key(query, candidate_texts)
if cache_key in self._cache:
return self._cache[cache_key]
result = self.reranker.rerank(query, candidates, final_k)
# Evict oldest entry if at capacity (simple FIFO)
if len(self._cache) >= self.cache_size:
oldest_key = next(iter(self._cache))
del self._cache[oldest_key]
self._cache[cache_key] = result
return result
Warning on caching: Only cache when the candidate set is stable. If your knowledge base updates frequently, a stale cache can serve reranked results based on old document scores with new document text — a subtle but real correctness bug. Use TTLs or invalidate the cache on ingestion events.
This is where practitioners make the most consequential configuration decisions. Here's how to think about it:
initial_k (retrieval breadth) is bounded by two factors: the recall ceiling of your bi-encoder (you can't rerank what wasn't retrieved), and the context limit of your cross-encoder (most local models cap at 512 tokens per pair, meaning each document can be at most ~380 tokens if your query is 130 tokens). Retrieving 50 candidates with a model that truncates them to 380 tokens is fine — just be aware of the truncation.
final_k (context provided to LLM) is bounded by the LLM's context window and your cost tolerance. More context = more tokens = higher cost per query. But more importantly, research consistently shows that LLMs suffer from the "lost in the middle" problem: they pay less attention to documents in the middle of a long context. If you send 10 documents, the LLM may effectively ignore documents 4–7. Sending 3–5 well-reranked documents usually outperforms sending 10 mediocre ones.
A practical tuning approach:
# Parameter sweep to find the best initial_k for your corpus
from evaluate import evaluate_pipeline
for initial_k in [10, 20, 30, 50]:
for final_k in [3, 5, 7]:
if final_k >= initial_k:
continue
metrics = evaluate_pipeline(
test_cases=test_cases,
retriever=retriever,
reranker=reranker,
initial_k=initial_k,
final_k=final_k
)
print(f"initial_k={initial_k}, final_k={final_k}: "
f"nDCG={metrics['reranked_ndcg_at_k']:.3f}, "
f"improvement={metrics['improvement_pct']:.1f}%")
For most knowledge base RAG applications, initial_k=20 and final_k=5 is a strong starting point that balances recall, reranking quality, and LLM context efficiency.
Let's put everything together in a realistic project. You're building a QA system over a corpus of commercial contracts — NDAs, SaaS agreements, data processing addenda. This is exactly the domain where precise retrieval matters most: "Can this customer use our product for financial services?" requires finding the specific clause, not just a vaguely relevant paragraph about industry restrictions.
Step 1: Ingest your documents with rich metadata
# ingest.py
import json
import chromadb
from chromadb.utils import embedding_functions
from pathlib import Path
def chunk_contract(text: str, doc_id: str, source: str, chunk_size: int = 400) -> list:
"""
Chunk by approximate token count with sentence-boundary awareness.
In production, use a proper chunker (e.g., LangChain's RecursiveCharacterTextSplitter).
"""
words = text.split()
chunks = []
current_chunk = []
chunk_idx = 0
for word in words:
current_chunk.append(word)
if len(current_chunk) >= chunk_size:
chunk_text = " ".join(current_chunk)
chunks.append({
"text": chunk_text,
"doc_id": f"{doc_id}_chunk_{chunk_idx}",
"source": source,
"parent_doc_id": doc_id
})
current_chunk = []
chunk_idx += 1
if current_chunk:
chunks.append({
"text": " ".join(current_chunk),
"doc_id": f"{doc_id}_chunk_{chunk_idx}",
"source": source,
"parent_doc_id": doc_id
})
return chunks
def ingest_contracts(contracts_dir: str = "./data/contracts"):
client = chromadb.PersistentClient(path="./chroma_db")
embedding_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name="all-MiniLM-L6-v2"
)
collection = client.get_or_create_collection(
name="legal_contracts",
embedding_function=embedding_fn
)
for contract_path in Path(contracts_dir).glob("*.txt"):
text = contract_path.read_text()
doc_id = contract_path.stem
chunks = chunk_contract(text, doc_id, source=contract_path.name)
collection.add(
documents=[c["text"] for c in chunks],
metadatas=[{"doc_id": c["doc_id"], "source": c["source"], "parent_doc_id": c["parent_doc_id"]} for c in chunks],
ids=[c["doc_id"] for c in chunks]
)
print(f"Ingested {len(chunks)} chunks from {contract_path.name}")
if __name__ == "__main__":
ingest_contracts()
Step 2: Build the QA pipeline with score-threshold filtering
For legal documents, you want to be conservative. If the reranker isn't confident any document is relevant, you should say so rather than hallucinate an answer:
# legal_qa.py
import os
from retriever import BiEncoderRetriever
from reranker import LocalCrossEncoderReranker
from pipeline import RAGPipelineWithReranking
retriever = BiEncoderRetriever(
collection_name="legal_contracts",
persist_directory="./chroma_db"
)
reranker = LocalCrossEncoderReranker(
model_name="cross-encoder/ms-marco-MiniLM-L-12-v2" # Use the larger model for legal
)
# Use score_threshold=0.3 — below this, the cross-encoder doesn't think the doc is relevant
pipeline = RAGPipelineWithReranking(
retriever=retriever,
reranker=reranker,
openai_api_key=os.environ["OPENAI_API_KEY"],
initial_k=25,
final_k=4,
score_threshold=0.3
)
queries = [
"Are customers in regulated financial services industries permitted to use the product under the standard Enterprise agreement?",
"What are the notice requirements for termination for cause?",
"Does the indemnification clause cover third-party IP claims?"
]
for query in queries:
print(f"\nQuery: {query}")
result = pipeline.answer(query, verbose=False)
print(f"Answer: {result['answer']}")
print(f"Sources: {[s['source'] for s in result['sources']]}")
print(f"Rerank scores: {[f'{s:.3f}' for s in result['rerank_scores']]}")
print("-" * 80)
Step 3: Run the evaluation to confirm reranking is helping
Create a small hand-labeled test set from known contract questions and their relevant clause locations, then run the evaluate_pipeline function from earlier. In legal/compliance domains, you'll typically see the most dramatic reranking gains because the relevant clause language is often quite specific but surrounded by tangentially related text that confuses bi-encoders.
Mistake 1: Setting initial_k too low
If you only retrieve 5 candidates and rerank them to 3, you're not getting the benefit of reranking — you're just adding latency. The power of reranking comes from the cross-encoder being able to promote highly relevant documents that the bi-encoder underranked. If those documents aren't in the candidate pool, you've gained nothing. Start with initial_k at least 4–5x your final_k.
Mistake 2: Ignoring chunk length relative to cross-encoder max_length
The cross-encoder/ms-marco-MiniLM-L-6-v2 model has a maximum input of 512 tokens, which is split between the query and the document. If your query is 50 tokens and your document chunk is 600 tokens, the chunk gets truncated. If the answer is in the truncated portion, the cross-encoder can't score it correctly. The fix: match your chunk size to the cross-encoder's capacity, or use a longer-context model.
Mistake 3: Using rerank scores as calibrated relevance probabilities
Raw scores from MS-MARCO models are logits — they have a meaningful ordering but the absolute values aren't calibrated. Don't build business logic like "if score > 0.5, the document is definitely relevant." Instead, use relative ranking (position) and validate any threshold empirically on your specific corpus and query distribution.
Mistake 4: Not testing on your actual domain
Cross-encoders are trained on datasets like MS-MARCO (web search queries). If your queries are in a highly specialized domain (semiconductor manufacturing processes, derivatives contract language, genomics research), the model's notion of "relevant" may not match yours. Fine-tuning a cross-encoder on domain-specific labeled pairs is the right fix, though it's a larger project. A quick check: if your cross-encoder is giving high scores to documents you can clearly see are wrong, suspect a domain mismatch.
Mistake 5: Reranking but not measuring
You implemented reranking, it feels better, and you ship it. Months later, you find a subset of queries where reranking is actively hurting (this happens — some query types may be better served by bi-encoder ordering). Without ongoing evaluation metrics, you'll never detect this. Log your rerank scores, track answer quality metrics (user feedback, thumbs up/down, ticket deflection rate), and periodically re-run your evaluation harness on new test cases.
Debugging tip: When reranking seems wrong, print out all
initial_kcandidates with both their bi-encoder score and cross-encoder score. Usually the problem is immediately visible — either the relevant document wasn't retrieved at all (bi-encoder failure) or the cross-encoder scored an irrelevant document highly due to keyword overlap without semantic understanding (model limitation on your domain).
You've built a complete two-stage RAG pipeline with cross-encoder reranking. Let's recap what we covered:
The core insight is architectural: bi-encoders are optimized for scale (encode once, query via similarity), but that independence from the query is also their weakness. Cross-encoders process query-document pairs jointly, capturing fine-grained relevance signals at the cost of speed. By using bi-encoders for broad retrieval and cross-encoders for precise reranking, you get the best of both.
The implementation pattern — retrieve initial_k candidates, rerank to final_k, optionally filter by score threshold, then generate — is clean and composable. You can swap out the retriever (FAISS, Pinecone, Weaviate) and the reranker (local model, Cohere API, a fine-tuned domain model) without touching the rest of the pipeline.
The production concerns that actually matter in practice: measure latency before and after, tune initial_k and final_k on real test data, don't assume reranking always helps, and handle the failure mode (no relevant documents) explicitly rather than letting the LLM hallucinate.
Where to go from here:
Fine-tune a cross-encoder on your domain data. If you have labeled query-document relevance pairs (even 500–1000 examples), fine-tuning a cross-encoder will outperform a generic model. The sentence-transformers library makes this straightforward with CrossEncoderTrainer.
Implement reciprocal rank fusion. If you have multiple retrieval systems (dense + sparse BM25 + graph-based), RRF is an effective way to combine their rankings before applying a cross-encoder reranker.
Explore LLM-based reranking. Using a smaller LLM (like GPT-4o-mini) to directly score relevance is a powerful option when your queries are complex and your domain is highly specialized. It's slower and more expensive than cross-encoders, but sometimes the quality gains justify it.
Add observability. Instrument your pipeline with tools like LangSmith or Arize to track retrieval quality, rerank score distributions, and answer quality over time. The evaluation harness you built today is the foundation — connect it to your production logs to run continuously.
Learning Path: RAG & AI Agents