Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles

Agentic RAG: Building Self-Correcting Retrieval Pipelines That Query, Reflect, and Retry

AI & Machine Learning🔥 Expert27 min readJun 29, 2026Updated Jun 29, 2026
Table of Contents
  • Introduction
  • Prerequisites
  • Why Single-Pass RAG Breaks Down in the Real World
  • The Architecture: A Reasoning Graph, Not a Chain
  • Setting Up the State Object
  • Node 1: The Retriever
  • Node 2: The Relevance Grader
  • The Routing Logic
  • Node 3: The Query Rewriter
  • Node 4: The Generator
  • Node 5: The Hallucination Checker
  • Wiring the Graph with LangGraph
  • Running the Pipeline
  • Hands-On Exercise

Agentic RAG: Building Self-Correcting Retrieval Pipelines That Query, Reflect, and Retry

Introduction

Picture this: you've built a RAG pipeline for your company's internal knowledge base. It retrieves documents, passes them to an LLM, and generates answers. It works beautifully in your demos. Then it hits production, and users start asking questions that span multiple documents, require synthesizing conflicting information, or demand follow-up clarifications before a real answer is even possible. Your pipeline confidently returns garbage — or worse, it returns a plausible-sounding answer that's subtly wrong because the retrieval step pulled the wrong chunk.

Standard RAG is a single-pass architecture: query in, documents out, answer generated. It has no mechanism for recognizing when its own retrieval was inadequate, no way to ask a clarifying sub-question, no ability to say "wait, this evidence contradicts itself — let me look harder." That's the fundamental ceiling of naive RAG, and if you're working on anything beyond simple FAQ lookup, you've probably already hit it.

Agentic RAG breaks that ceiling by treating retrieval as a reasoning loop rather than a lookup step. In this lesson, we're going to build a self-correcting retrieval pipeline from scratch — one that retrieves, reflects on the quality of what it found, decides whether to retry with a different strategy, and synthesizes a final answer only when it's confident the evidence is actually good enough. By the end, you'll understand the architecture at a design level and have working Python code implementing the core patterns.

What you'll learn:

  • How agentic RAG differs architecturally from naive and advanced RAG, and when the complexity is actually justified
  • How to implement a reflection step that grades retrieved documents for relevance before passing them to a generator
  • How to build a query rewriting loop that reformulates failed queries based on what went wrong
  • How to implement hallucination detection as a post-generation check that can trigger re-retrieval
  • How to wire these components into a coherent loop with proper termination conditions and cycle guards

Prerequisites

You should be comfortable with:

  • Python at an intermediate-to-advanced level (we'll use dataclasses, type hints, and async patterns)
  • Basic RAG concepts: chunking, embedding, vector stores, cosine similarity
  • Working familiarity with LangChain or a similar framework, though we'll minimize framework magic in favor of explicit code
  • The concept of LLM function calling / structured outputs
  • Basic familiarity with LangGraph or willingness to read the docs alongside this lesson

You'll need: langchain, langgraph, openai, chromadb, and pydantic installed. We'll use OpenAI models throughout, but the patterns translate to any capable LLM.


Why Single-Pass RAG Breaks Down in the Real World

Before we write a single line of agentic code, let's be precise about what fails in standard RAG and why. This matters because agentic RAG adds significant complexity — you should only reach for it when you understand the specific failure mode it's solving.

Standard RAG fails in four distinct ways:

Retrieval failure (wrong documents). The embedding similarity between a question and the relevant passage is lower than the similarity to an irrelevant but topically adjacent passage. This is particularly common with technical jargon, acronyms, or domain-specific phrasing. You retrieve documents about the right topic that don't actually answer the question.

Coverage failure (partial documents). The answer requires synthesizing information from three separate sections of a codebase, or combining a policy document with a specific amendment that supersedes it. Single-pass retrieval with a fixed top_k can't guarantee coverage of all required pieces.

Ambiguity failure (wrong interpretation). The query "how do I handle errors in the pipeline?" could mean exception handling in code, error recovery in data processing, or alerting and escalation policy. Without disambiguation, the retrieval is a coin flip.

Generation failure (hallucination despite good retrieval). Even with perfect documents in context, the LLM can confabulate. The model says something that isn't in any of the retrieved documents, or synthesizes a plausible-but-wrong conclusion.

Each of these failures calls for a different corrective mechanism:

Failure Type Agentic Mechanism
Wrong documents Relevance grading + query rewriting
Partial coverage Iterative sub-question decomposition
Ambiguous query Query clarification / expansion
Hallucination Post-generation grounding check

Agentic RAG adds these mechanisms as nodes in a reasoning graph. The pipeline can traverse multiple paths through this graph before committing to a final answer.


The Architecture: A Reasoning Graph, Not a Chain

The mental model shift here is crucial. Standard RAG is a chain: each step executes once, in sequence, and passes its output to the next step. Agentic RAG is a graph: nodes represent operations (retrieve, grade, rewrite, generate, check), and edges represent conditional routing decisions.

Here's the full graph we'll build:

[Start]
   │
   ▼
[Query Rewriter] ─────────────────────┐
   │                                  │ (rewrite on failure)
   ▼                                  │
[Retriever]                           │
   │                                  │
   ▼                                  │
[Relevance Grader] ──── (all docs irrelevant) ──►[Rewrite?]──┘
   │ (sufficient relevant docs)              │
   │                                    (max retries hit)
   ▼                                         │
[Generator]                                  ▼
   │                                   [Fallback Response]
   ▼
[Hallucination Checker]
   │                     │
   (grounded)       (hallucination detected)
   │                     │
   ▼                     ▼
[Answer]          [Retry with augmented context]

This graph has cycles (the rewrite loop), conditional branches (relevance grade routing, hallucination routing), and a termination guard (max retries). Getting these right is the difference between a useful self-correcting pipeline and an infinite loop that burns your OpenAI budget.

Let's build it piece by piece.


Setting Up the State Object

In LangGraph, the entire pipeline shares a single state object that flows through nodes and gets updated at each step. Define it thoughtfully — it's the backbone of every decision in your pipeline.

from dataclasses import dataclass, field
from typing import Optional
from pydantic import BaseModel, Field

class AgenticRAGState(BaseModel):
    """
    Shared state for the agentic RAG pipeline.
    All nodes read from and write to this object.
    """
    # The original question, never modified
    original_query: str
    
    # The current working query (may be rewritten)
    current_query: str
    
    # Retrieved document chunks with metadata
    retrieved_documents: list[dict] = Field(default_factory=list)
    
    # Documents that passed the relevance grader
    relevant_documents: list[dict] = Field(default_factory=list)
    
    # The generated answer (if any)
    generation: Optional[str] = None
    
    # Number of query rewrites attempted
    rewrite_count: int = 0
    
    # Maximum rewrites before giving up
    max_rewrites: int = 3
    
    # Did the hallucination checker pass the final generation?
    grounded: Optional[bool] = None
    
    # Trace of decisions made (useful for debugging and logging)
    reasoning_trace: list[str] = Field(default_factory=list)
    
    # Final answer, set only when the pipeline commits
    final_answer: Optional[str] = None
    
    # Failure message if the pipeline exhausts all retries
    failure_reason: Optional[str] = None

The reasoning_trace field is genuinely important in production — it's how you audit why the pipeline made each decision. When an answer is wrong, you don't want to guess; you want a record of every routing decision.


Node 1: The Retriever

This is your standard retrieval step, but we're making it explicit as a node so the graph can route back to it after a rewrite.

import chromadb
from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction
from openai import OpenAI

client = OpenAI()

# Initialize ChromaDB with OpenAI embeddings
embedding_fn = OpenAIEmbeddingFunction(
    api_key="your-api-key",
    model_name="text-embedding-3-small"
)

chroma_client = chromadb.PersistentClient(path="./knowledge_base")
collection = chroma_client.get_collection(
    name="engineering_docs",
    embedding_function=embedding_fn
)

def retrieve(state: AgenticRAGState) -> AgenticRAGState:
    """
    Query the vector store with the current working query.
    Returns top-k documents with their distances and metadata.
    """
    results = collection.query(
        query_texts=[state.current_query],
        n_results=6,  # Retrieve more than you need; the grader will filter
        include=["documents", "metadatas", "distances"]
    )
    
    # Flatten ChromaDB's nested result format into a usable list
    documents = []
    for doc, meta, dist in zip(
        results["documents"][0],
        results["metadatas"][0],
        results["distances"][0]
    ):
        documents.append({
            "content": doc,
            "metadata": meta,
            "distance": dist,  # Lower = more similar in ChromaDB
            "source": meta.get("source", "unknown")
        })
    
    state.retrieved_documents = documents
    state.reasoning_trace.append(
        f"Retrieved {len(documents)} documents for query: '{state.current_query}'"
    )
    
    return state

Notice we retrieve n_results=6 even though we might only need 3-4. We're deliberately over-fetching because the relevance grader will filter. You want the grader to have options to work with.


Node 2: The Relevance Grader

This is where agentic RAG starts to diverge from standard RAG. Instead of passing everything retrieved directly to the generator, we stop and ask: are these documents actually relevant to what was asked?

The grader uses structured output to give us a machine-readable decision plus a reasoning trace.

from pydantic import BaseModel
from typing import Literal

class RelevanceGrade(BaseModel):
    """Structured output for document relevance grading."""
    relevant: bool = Field(
        description="Whether this document is relevant to answering the query"
    )
    relevance_score: Literal["high", "medium", "low"] = Field(
        description="Qualitative relevance score"
    )
    reasoning: str = Field(
        description="One sentence explaining why this document is or isn't relevant"
    )

def grade_document_relevance(document: dict, query: str) -> RelevanceGrade:
    """
    Use an LLM to grade whether a retrieved document is relevant
    to the user's query. This is a binary decision with reasoning.
    """
    system_prompt = """You are a precise relevance grader for a technical knowledge base.
    
    Your job is to assess whether a retrieved document chunk contains information 
    that would help answer the user's question. Be strict — a document that is 
    topically related but doesn't actually address the question should be marked 
    NOT relevant.
    
    Consider: Does this document contain facts, procedures, or explanations that 
    a reader would need to answer this specific question? If the question asks 
    about error handling in Python and the document discusses Python decorators 
    without mentioning errors, mark it not relevant."""
    
    user_prompt = f"""Query: {query}
    
    Document chunk:
    ---
    {document['content']}
    ---
    Source: {document['source']}
    
    Is this document relevant to answering the query?"""
    
    response = client.beta.chat.completions.parse(
        model="gpt-4o-mini",  # Use mini here — this is a cheap classification call
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        response_format=RelevanceGrade,
        temperature=0  # Deterministic grading
    )
    
    return response.choices[0].message.parsed

def grade_relevance(state: AgenticRAGState) -> AgenticRAGState:
    """
    Grade all retrieved documents and filter to only the relevant ones.
    If fewer than 2 relevant documents are found, flag for query rewrite.
    """
    relevant_docs = []
    
    for doc in state.retrieved_documents:
        grade = grade_document_relevance(doc, state.current_query)
        
        if grade.relevant and grade.relevance_score in ("high", "medium"):
            relevant_docs.append({
                **doc,
                "relevance_score": grade.relevance_score,
                "grade_reasoning": grade.reasoning
            })
        
        state.reasoning_trace.append(
            f"Document from {doc['source']}: {grade.relevance_score} relevance. "
            f"Reason: {grade.reasoning}"
        )
    
    state.relevant_documents = relevant_docs
    state.reasoning_trace.append(
        f"Grading complete: {len(relevant_docs)}/{len(state.retrieved_documents)} "
        f"documents passed relevance check."
    )
    
    return state

Cost optimization note: We use gpt-4o-mini for grading, not gpt-4o. Grading is a simple binary classification task — it doesn't need a powerful model. Save your expensive model calls for generation and hallucination checking. In production, you might even fine-tune a small classifier for this step and eliminate the LLM call entirely.


The Routing Logic

After grading, the graph needs to decide: do we have enough good documents to generate an answer, or do we need to rewrite the query and try again?

def route_after_grading(state: AgenticRAGState) -> str:
    """
    Conditional edge function. Returns the name of the next node.
    
    Rules:
    - If we have 2+ relevant documents: proceed to generation
    - If we have fewer and haven't hit max retries: rewrite query
    - If we've hit max retries: go to fallback
    """
    MIN_RELEVANT_DOCS = 2
    
    if len(state.relevant_documents) >= MIN_RELEVANT_DOCS:
        state.reasoning_trace.append(
            "Routing decision: Sufficient relevant documents found. Proceeding to generation."
        )
        return "generate"
    
    if state.rewrite_count >= state.max_rewrites:
        state.reasoning_trace.append(
            f"Routing decision: Max rewrites ({state.max_rewrites}) reached. "
            f"Routing to fallback."
        )
        return "fallback"
    
    state.reasoning_trace.append(
        f"Routing decision: Insufficient relevant docs ({len(state.relevant_documents)}). "
        f"Routing to query rewriter (attempt {state.rewrite_count + 1}/{state.max_rewrites})."
    )
    return "rewrite_query"

The MIN_RELEVANT_DOCS threshold is a parameter you'll tune based on your use case. For factual lookups, 2 might be enough. For complex synthesis tasks, you might want 4 or 5. Make it configurable.


Node 3: The Query Rewriter

When retrieval fails, the question is why it failed and how to fix it. A naive rewriter just rephrases the question slightly. A good rewriter looks at what was retrieved, understands why it was irrelevant, and produces a fundamentally different query strategy.

class RewrittenQuery(BaseModel):
    """Structured output for the query rewriter."""
    rewritten_query: str = Field(
        description="The new, improved query to try"
    )
    rewrite_strategy: Literal[
        "expand_terms", 
        "narrow_focus", 
        "rephrase_intent", 
        "decompose_to_subquestion",
        "add_context"
    ] = Field(
        description="What strategy was used to rewrite the query"
    )
    reasoning: str = Field(
        description="Why this rewrite should perform better than the original"
    )

def rewrite_query(state: AgenticRAGState) -> AgenticRAGState:
    """
    Rewrite the current query based on what was retrieved and why it failed.
    
    We give the LLM the original query, the current query (if different),
    the retrieved documents, and the grading decisions — all the context
    it needs to understand what went wrong and how to fix it.
    """
    
    # Build context about what was retrieved and why it failed
    retrieval_context = []
    for doc in state.retrieved_documents[:3]:  # Show top 3 to save tokens
        grade_info = f" (Grade: {doc.get('relevance_score', 'not graded')}, Reason: {doc.get('grade_reasoning', 'N/A')})"
        retrieval_context.append(
            f"Source: {doc['source']}{grade_info}\n"
            f"Content preview: {doc['content'][:200]}..."
        )
    
    context_str = "\n\n".join(retrieval_context) if retrieval_context else "No documents were retrieved."
    
    system_prompt = """You are an expert at query optimization for vector search retrieval systems.

    When a query fails to retrieve relevant documents, your job is to diagnose why 
    and produce a better query. Common failure modes:
    
    1. Query uses jargon the knowledge base doesn't — try expanding with synonyms
    2. Query is too broad — narrow it to the specific aspect needed  
    3. Query asks multiple things at once — focus on one sub-question
    4. Query describes the problem without naming the concept — reframe using 
       the conceptual terminology likely to appear in documentation
    
    Your rewritten query should be substantively different, not just a synonym swap."""
    
    user_prompt = f"""Original user question: {state.original_query}
    
    Current query being used for retrieval: {state.current_query}
    
    This query retrieved documents that were not relevant. Here's what was retrieved:
    
    {context_str}
    
    Previous rewrite attempts: {state.rewrite_count}
    
    Analyze why the retrieval failed and produce a better query."""
    
    response = client.beta.chat.completions.parse(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        response_format=RewrittenQuery,
        temperature=0.3  # Slight temperature to encourage creative rewrites
    )
    
    result = response.choices[0].message.parsed
    
    state.current_query = result.rewritten_query
    state.rewrite_count += 1
    state.reasoning_trace.append(
        f"Query rewrite #{state.rewrite_count}: Strategy='{result.rewrite_strategy}'. "
        f"New query: '{result.rewritten_query}'. "
        f"Reasoning: {result.reasoning}"
    )
    
    return state

The rewrite_strategy field in the structured output is not just documentation — it's data. In production, you should log these to a database and analyze which strategies succeed most often for your knowledge base. This data can drive improvements to your chunking strategy, metadata tagging, or even your embedding model choice.


Node 4: The Generator

Once we have relevant documents, we generate an answer. Notice that we pass the graded relevance scores to the generator — this gives it a signal about which documents to weight more heavily.

def generate_answer(state: AgenticRAGState) -> AgenticRAGState:
    """
    Generate an answer using only the relevant, graded documents.
    We explicitly tell the generator which documents were high-relevance
    and instruct it to stay strictly within the provided context.
    """
    
    # Format documents with their relevance scores
    context_parts = []
    for i, doc in enumerate(state.relevant_documents, 1):
        relevance_label = doc.get("relevance_score", "medium")
        context_parts.append(
            f"[Document {i} | Relevance: {relevance_label} | Source: {doc['source']}]\n"
            f"{doc['content']}"
        )
    
    context = "\n\n---\n\n".join(context_parts)
    
    system_prompt = """You are a precise technical assistant. Answer questions using 
    ONLY the information provided in the context documents below. 
    
    Critical rules:
    1. If the context doesn't contain enough information to answer the question 
       completely, say so explicitly — do not fill gaps with general knowledge.
    2. Cite the source documents when making specific claims (e.g., "According to 
       Document 2...").
    3. If documents contradict each other, acknowledge the contradiction and explain 
       what each source says.
    4. High-relevance documents should be prioritized over medium-relevance ones."""
    
    user_prompt = f"""Question: {state.original_query}
    
    Context documents:
    {context}
    
    Answer the question based strictly on the provided context."""
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        temperature=0.1
    )
    
    state.generation = response.choices[0].message.content
    state.reasoning_trace.append(
        f"Generated answer using {len(state.relevant_documents)} relevant documents."
    )
    
    return state

Node 5: The Hallucination Checker

This is the most underimplemented component in most RAG pipelines. Post-generation grounding verification asks: does the answer we just generated actually come from the retrieved documents, or did the model confabulate?

class GroundingCheck(BaseModel):
    """Structured output for the hallucination checker."""
    is_grounded: bool = Field(
        description="Whether the answer is fully supported by the provided documents"
    )
    ungrounded_claims: list[str] = Field(
        description="List of specific claims in the answer that cannot be verified "
                   "in the source documents. Empty list if fully grounded.",
        default_factory=list
    )
    confidence: Literal["high", "medium", "low"] = Field(
        description="Confidence in the grounding assessment"
    )
    reasoning: str = Field(
        description="Brief explanation of the grounding assessment"
    )

def check_hallucination(state: AgenticRAGState) -> AgenticRAGState:
    """
    Verify that every factual claim in the generated answer 
    can be traced back to a source document.
    """
    
    # Build the full context used for generation
    context = "\n\n".join([
        f"[{doc['source']}]: {doc['content']}" 
        for doc in state.relevant_documents
    ])
    
    system_prompt = """You are a meticulous fact-checker for AI-generated content.
    
    Your job is to verify whether a generated answer is fully supported by the 
    provided source documents. Check every specific factual claim:
    
    - Numbers, dates, version numbers, thresholds
    - Procedural steps and their ordering  
    - Names, identifiers, configurations
    - Causal relationships and logical conclusions
    
    A claim is ungrounded if it:
    1. States something not mentioned in any source document
    2. Contradicts what the source documents say
    3. Makes an inference that isn't directly supported (even if plausible)
    
    Be thorough. A confident-sounding wrong answer is worse than "I don't know."
    """
    
    user_prompt = f"""Source documents used to generate this answer:
    ---
    {context}
    ---
    
    Generated answer to verify:
    ---
    {state.generation}
    ---
    
    Is every factual claim in this answer supported by the source documents?"""
    
    response = client.beta.chat.completions.parse(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        response_format=GroundingCheck,
        temperature=0
    )
    
    check = response.choices[0].message.parsed
    state.grounded = check.is_grounded
    
    if check.ungrounded_claims:
        state.reasoning_trace.append(
            f"Hallucination check: FAILED. Found {len(check.ungrounded_claims)} "
            f"ungrounded claims: {'; '.join(check.ungrounded_claims[:3])}"
        )
    else:
        state.reasoning_trace.append(
            f"Hallucination check: PASSED with {check.confidence} confidence. "
            f"{check.reasoning}"
        )
    
    return state

def route_after_hallucination_check(state: AgenticRAGState) -> str:
    """
    If grounded: finalize the answer.
    If not grounded and retries remain: trigger a re-retrieval with augmented context.
    If not grounded and no retries: return answer with a caveat.
    """
    if state.grounded:
        return "finalize"
    
    if state.rewrite_count < state.max_rewrites:
        state.reasoning_trace.append(
            "Routing: Answer not grounded. Attempting retrieval augmentation."
        )
        return "augment_and_retry"
    
    state.reasoning_trace.append(
        "Routing: Answer not grounded but max retries reached. "
        "Will finalize with caveat."
    )
    return "finalize_with_caveat"

Wiring the Graph with LangGraph

Now we assemble all nodes into a LangGraph StateGraph. This is where the routing logic becomes executable.

from langgraph.graph import StateGraph, END

def fallback_response(state: AgenticRAGState) -> AgenticRAGState:
    """Called when max retries are exhausted without finding relevant docs."""
    state.final_answer = (
        f"I was unable to find relevant information in the knowledge base to "
        f"answer your question: '{state.original_query}'. "
        f"I attempted {state.rewrite_count} different query formulations "
        f"without finding sufficient evidence. Please try rephrasing your "
        f"question or check if this topic is covered in the knowledge base."
    )
    state.failure_reason = "max_retries_exhausted_no_relevant_docs"
    return state

def finalize_answer(state: AgenticRAGState) -> AgenticRAGState:
    """Commit the generated answer as the final answer."""
    state.final_answer = state.generation
    return state

def finalize_with_caveat(state: AgenticRAGState) -> AgenticRAGState:
    """
    Commit the generated answer but prepend a caveat about unverified claims.
    This is better than suppressing the answer entirely when retries are exhausted.
    """
    caveat = (
        "⚠️ Note: The following answer may contain claims that could not be "
        "fully verified against the source documents. Please verify critical "
        "details independently.\n\n"
    )
    state.final_answer = caveat + (state.generation or "No answer was generated.")
    return state

def augment_and_retry(state: AgenticRAGState) -> AgenticRAGState:
    """
    When hallucination is detected, formulate a targeted follow-up query
    to retrieve the specific facts that were confabulated.
    """
    # In a more sophisticated implementation, this would analyze the
    # ungrounded claims and generate targeted retrieval queries.
    # For now, we fall back to query rewriting with hallucination context.
    state.current_query = (
        f"{state.original_query} - specifically looking for "
        f"factual details and specific values"
    )
    state.rewrite_count += 1
    state.generation = None  # Clear the failed generation
    state.grounded = None
    return state

# Build the graph
workflow = StateGraph(AgenticRAGState)

# Add all nodes
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_relevance", grade_relevance)
workflow.add_node("rewrite_query", rewrite_query)
workflow.add_node("generate", generate_answer)
workflow.add_node("check_hallucination", check_hallucination)
workflow.add_node("fallback", fallback_response)
workflow.add_node("finalize", finalize_answer)
workflow.add_node("finalize_with_caveat", finalize_with_caveat)
workflow.add_node("augment_and_retry", augment_and_retry)

# Set entry point
workflow.set_entry_point("retrieve")

# Add edges
workflow.add_edge("retrieve", "grade_relevance")

# Conditional routing after grading
workflow.add_conditional_edges(
    "grade_relevance",
    route_after_grading,
    {
        "generate": "generate",
        "rewrite_query": "rewrite_query",
        "fallback": "fallback"
    }
)

# After rewrite, go back to retrieval
workflow.add_edge("rewrite_query", "retrieve")

# After generation, check for hallucinations
workflow.add_edge("generate", "check_hallucination")

# Conditional routing after hallucination check
workflow.add_conditional_edges(
    "check_hallucination",
    route_after_hallucination_check,
    {
        "finalize": "finalize",
        "augment_and_retry": "augment_and_retry",
        "finalize_with_caveat": "finalize_with_caveat"
    }
)

# After augmentation, go back to retrieval
workflow.add_edge("augment_and_retry", "retrieve")

# Terminal nodes
workflow.add_edge("fallback", END)
workflow.add_edge("finalize", END)
workflow.add_edge("finalize_with_caveat", END)

# Compile the graph
app = workflow.compile()

Running the Pipeline

Here's how you invoke it and inspect the full reasoning trace:

def run_agentic_rag(question: str, max_rewrites: int = 3) -> dict:
    """
    Run the agentic RAG pipeline and return the result with full trace.
    """
    initial_state = AgenticRAGState(
        original_query=question,
        current_query=question,
        max_rewrites=max_rewrites
    )
    
    # Run the graph
    result = app.invoke(initial_state)
    
    return {
        "question": result["original_query"],
        "answer": result["final_answer"],
        "queries_attempted": [result["original_query"]] + [
            # Extract rewritten queries from trace for display
        ],
        "documents_used": len(result["relevant_documents"]),
        "rewrites": result["rewrite_count"],
        "grounded": result["grounded"],
        "failure_reason": result.get("failure_reason"),
        "reasoning_trace": result["reasoning_trace"]
    }

# Example usage
result = run_agentic_rag(
    "What's the retry backoff strategy for the ingestion pipeline when "
    "the data warehouse connection times out?"
)

print(f"Answer: {result['answer']}\n")
print(f"Documents used: {result['documents_used']}")
print(f"Query rewrites: {result['rewrites']}")
print(f"Grounded: {result['grounded']}")
print("\n--- Reasoning Trace ---")
for i, step in enumerate(result["reasoning_trace"], 1):
    print(f"{i}. {step}")

A typical trace for a successful multi-attempt retrieval looks like:

1. Retrieved 6 documents for query: 'retry backoff strategy ingestion pipeline...'
2. Document from pipeline/ingestion.py: low relevance. Reason: Document covers...
3. Document from ops/runbook.md: low relevance. Reason: General runbook...
4. Grading complete: 0/6 documents passed relevance check.
5. Routing decision: Insufficient relevant docs (0). Routing to query rewriter.
6. Query rewrite #1: Strategy='rephrase_intent'. New query: 'exponential backoff...
7. Retrieved 6 documents for query: 'exponential backoff connection timeout...'
8. Document from config/pipeline_config.yaml: high relevance. Reason: Contains...
9. Document from src/retry_handler.py: high relevance. Reason: Implements...
10. Grading complete: 4/6 documents passed relevance check.
11. Routing decision: Sufficient relevant documents found. Proceeding to generation.
12. Generated answer using 4 relevant documents.
13. Hallucination check: PASSED with high confidence. All claims traceable...

Hands-On Exercise

Now that you understand the full pipeline, your task is to extend it with one of the most powerful agentic RAG patterns: iterative sub-question decomposition.

Some questions can't be answered by a single retrieval pass because they require synthesizing information that lives in genuinely separate parts of the knowledge base. For example: "Compare the error handling strategies used in our Python data pipelines vs. our Go microservices." This requires at least two separate retrieval passes, each targeting different content.

Your exercise is to add a query decomposition node that sits before the retriever for complex queries:

  1. Implement a QueryDecomposer class that uses an LLM with structured output to detect whether a query is "complex" (requires multiple sub-questions) or "simple" (single retrieval pass). For complex queries, produce a list of 2-4 sub-questions.

  2. Add a decompose_query node to the graph that runs before the first retrieval. Route simple queries directly to retrieve, complex queries through the decomposer.

  3. Modify the AgenticRAGState to include a sub_questions: list[str] field and a current_sub_question_index: int field. The retriever should use sub_questions[current_sub_question_index] as the query when sub-questions exist.

  4. Add a loop that iterates through all sub-questions, accumulating relevant documents from each pass. Route back to the retriever if there are remaining sub-questions, or forward to generation once all sub-questions have been processed.

  5. Test it with a question that clearly requires synthesis across multiple topics in your knowledge base. Verify (via the reasoning trace) that the pipeline actually retrieves documents for each sub-question separately.

Hint on structured output for the decomposer:

class DecompositionDecision(BaseModel):
    is_complex: bool
    sub_questions: list[str] = Field(
        description="List of 2-4 specific sub-questions. Empty if not complex.",
        default_factory=list
    )
    reasoning: str

Common Mistakes & Troubleshooting

Mistake 1: Infinite Loops from Misconfigured Termination Conditions

The most dangerous bug in agentic RAG is a loop that never terminates. This typically happens when max_rewrites is checked inconsistently — for example, if your routing function increments the counter in one place and checks it in another, or if the augment_and_retry node bypasses the rewrite counter.

Fix: Make the counter increment atomic with the routing decision. Never increment in the routing function — increment in the action node (like rewrite_query), and check in the routing function.

# BAD: Incrementing in the router creates race conditions
def route_after_grading(state):
    if ...:
        state.rewrite_count += 1  # Don't do this in routers
        return "rewrite_query"

# GOOD: Increment in the action node
def rewrite_query(state):
    state.rewrite_count += 1  # Increment here
    ...
    return state

Mistake 2: Grading Every Document with GPT-4o

Running gpt-4o on every document for every grading call will make your pipeline 3-5x more expensive than necessary. Relevance grading is a classification task, not a reasoning task.

Fix: Use gpt-4o-mini for grading. In high-volume scenarios, consider a fine-tuned classifier or a cross-encoder re-ranker (like cross-encoder/ms-marco-MiniLM-L-6-v2 from Sentence Transformers) that runs locally and costs nothing per call.

Mistake 3: Passing Too Much Context to the Hallucination Checker

If you have 6 relevant documents totaling 12,000 tokens and then pass all of that plus the generated answer to the hallucination checker, you're burning expensive context. The hallucination checker doesn't need to see every nuance of every document — it needs to verify specific claims.

Fix: For the hallucination check, pass only the document excerpts that are specifically cited in the generated answer, not the full context. You can extract cited sources from the answer text or from the structured metadata your generator logs.

Mistake 4: Not Handling Empty Knowledge Base Responses

If your ChromaDB collection is empty, collection.query() will return empty lists. Downstream nodes expecting state.retrieved_documents to be a non-empty list will fail unexpectedly.

Fix: Always validate the retrieval result before proceeding:

def retrieve(state: AgenticRAGState) -> AgenticRAGState:
    results = collection.query(...)
    
    if not results["documents"] or not results["documents"][0]:
        state.retrieved_documents = []
        state.reasoning_trace.append(
            "WARNING: No documents returned from vector store. "
            "Check collection population and query format."
        )
        return state
    
    # ... rest of the function

Mistake 5: The Rewriter Gets Stuck in a Local Optimum

If the rewriter always produces semantically similar reformulations, you'll waste retries on queries that all retrieve the same irrelevant documents. This happens when the rewriter doesn't get enough context about what failed.

Fix: Pass the rewriter the actual content snippets of the failed documents, not just their count. Seeing "this is what I retrieved and it's about X" gives the rewriter the signal it needs to try a genuinely different direction.

Mistake 6: Treating `grounded: False` as a Hard Error

Not all ungrounded claims are hallucinations — some are the model correctly reporting general knowledge that happens to supplement the retrieved documents. A finalize_with_caveat path (as we implemented) is almost always better than a hard failure.

Fix: Use ungrounded_claims to make intelligent decisions. If there are 0 ungrounded claims that are factual (only stylistic additions), consider it grounded. Reserve the caveat path for when specific named entities, numbers, or procedures appear in the answer without source support.


Performance Considerations and Scaling

Latency budget: A full agentic RAG cycle — retrieve, grade 6 docs, generate, check hallucination — takes approximately 4-8 seconds with gpt-4o. Each retry adds 3-6 seconds. Design your UX around this: streaming intermediate status updates ("Searching knowledge base...", "Verifying answer quality...") dramatically improves perceived performance.

Parallelizing the grading step: Grading each document sequentially is the biggest latency bottleneck after the LLM calls themselves. Use asyncio.gather() to grade all retrieved documents in parallel:

import asyncio

async def grade_document_relevance_async(document, query):
    # Async version of grade_document_relevance
    ...

async def grade_relevance_parallel(state: AgenticRAGState) -> AgenticRAGState:
    tasks = [
        grade_document_relevance_async(doc, state.current_query)
        for doc in state.retrieved_documents
    ]
    grades = await asyncio.gather(*tasks)
    # Process grades...
    return state

This alone can cut grading latency by 60-70% for 6 documents.

Caching: Query rewriting produces structured queries that are often semantically equivalent. Implement a semantic cache keyed on query embeddings — if the cosine similarity between a new query and a cached query is above 0.97, return the cached results. This is particularly effective for high-traffic production systems where many users ask similar questions.

Monitoring what matters: Instrument these specific metrics:

  • Rewrite rate (% of queries requiring at least one rewrite)
  • Average rewrites per successful query
  • Hallucination detection rate
  • The rewrite_strategy distribution

A high rewrite rate combined with narrow_focus as the dominant strategy tells you your queries are too broad — improve your chunking. A high hallucination detection rate tells you your generator prompt isn't strict enough.


When Agentic RAG Is Overkill

This is an expert lesson, so let's be honest about the cost-benefit. Agentic RAG adds latency, cost, and operational complexity. It is not the right choice when:

  • Your queries are simple and well-scoped. A customer service bot answering "what are your business hours?" doesn't need a hallucination checker.
  • Your knowledge base is small and well-structured. Under 10,000 documents with clear, consistent chunking and metadata? A good embedding model and a well-tuned retrieval step is probably sufficient.
  • You have strict latency requirements under 2 seconds. Multi-step agentic pipelines are hard to keep fast. If you need sub-2-second response times, invest in retrieval quality improvements first.
  • You can't accept non-determinism. Agentic loops with LLM-driven routing introduce probabilistic behavior. The same query might take different paths on different runs. If your use case requires fully deterministic behavior, rule-based routing is safer.

The right trigger for adopting agentic RAG is a specific, measured failure mode in a production system — not architectural enthusiasm. Build the simpler version first, measure where it fails, and add agentic components surgically.


Summary & Next Steps

You've now built a production-grade agentic RAG pipeline with four core self-correcting mechanisms: relevance grading to filter bad retrievals before they corrupt your context, query rewriting to reformulate failed queries with diagnostic context, a generation step that grounds answers in graded evidence, and hallucination checking that verifies every factual claim post-generation.

The architectural pattern we've implemented — a directed graph with conditional edges, shared state, and termination guards — is the right abstraction for any agentic pipeline. The specific nodes and routing logic are application-specific, but the structure scales to much more complex orchestration.

What to explore next:

  1. Multi-agent RAG: split the rewriter, grader, and checker into separate specialized agents with their own context and memory, coordinated by a meta-agent. This improves specialization at the cost of orchestration complexity.

  2. Self-RAG paper implementation: the original Self-RAG paper (Asai et al., 2023) introduces special reflection tokens that the model generates inline, eliminating the need for separate grading calls. Understanding it will deepen your intuition for where the field is heading.

  3. Tool-augmented RAG: give your agentic pipeline access to structured data queries (SQL), API calls, and web search in addition to vector retrieval. The graph patterns we've built here extend naturally to multi-tool orchestration.

  4. Evaluation frameworks: building the pipeline is half the work. Implement RAGAS metrics (faithfulness, answer relevancy, context precision, context recall) to measure whether your agentic improvements actually improve outcomes — or just add latency.

  5. Production hardening: add Redis-based caching for repeated queries, distributed tracing with OpenTelemetry, and a feedback loop that lets users flag bad answers to automatically trigger knowledge base updates.

The hardest part of agentic RAG isn't writing the code — it's knowing when to trust the pipeline's decisions and when to override them. Build the observability layer (reasoning traces, metrics, human-in-the-loop flagging) from day one, and let the data tell you where your agents need more help.

Learning Path: RAG & AI Agents

Previous

Reranking Retrieved Results: Implementing Cross-Encoders to Improve RAG Accuracy

Related Articles

AI & Machine Learning🔥 Expert

Guardrails and Safety Layers: Implementing Input Validation, Output Filtering, and Jailbreak Defense in Production LLM Systems

30 min
AI & Machine Learning🔥 Expert

Designing AI Evaluation Frameworks: How to Benchmark, Test, and Monitor LLM Performance in Production Workflows

30 min
AI & Machine Learning⚡ Practitioner

Reranking Retrieved Results: Implementing Cross-Encoders to Improve RAG Accuracy

23 min

On this page

  • Introduction
  • Prerequisites
  • Why Single-Pass RAG Breaks Down in the Real World
  • The Architecture: A Reasoning Graph, Not a Chain
  • Setting Up the State Object
  • Node 1: The Retriever
  • Node 2: The Relevance Grader
  • The Routing Logic
  • Node 3: The Query Rewriter
  • Node 4: The Generator
  • Node 5: The Hallucination Checker
  • Common Mistakes & Troubleshooting
  • Mistake 1: Infinite Loops from Misconfigured Termination Conditions
  • Mistake 2: Grading Every Document with GPT-4o
  • Mistake 3: Passing Too Much Context to the Hallucination Checker
  • Mistake 4: Not Handling Empty Knowledge Base Responses
  • Mistake 5: The Rewriter Gets Stuck in a Local Optimum
  • Mistake 6: Treating `grounded: False` as a Hard Error
  • Performance Considerations and Scaling
  • When Agentic RAG Is Overkill
  • Summary & Next Steps
  • Wiring the Graph with LangGraph
  • Running the Pipeline
  • Hands-On Exercise
  • Common Mistakes & Troubleshooting
  • Mistake 1: Infinite Loops from Misconfigured Termination Conditions
  • Mistake 2: Grading Every Document with GPT-4o
  • Mistake 3: Passing Too Much Context to the Hallucination Checker
  • Mistake 4: Not Handling Empty Knowledge Base Responses
  • Mistake 5: The Rewriter Gets Stuck in a Local Optimum
  • Mistake 6: Treating `grounded: False` as a Hard Error
  • Performance Considerations and Scaling
  • When Agentic RAG Is Overkill
  • Summary & Next Steps