Ephemeral Artificial Intelligence in Web 4.0 EPH4

HomeBlogRAG systemsRAG (Retrieval-Augmented Generation): The Full Technical Architecture Guide

RAG (Retrieval-Augmented Generation): The Full Technical Architecture Guide

Technical Visualization: The RAG System Architecture

RAG pipeline - How RAG works - injestion & retrieval pipelines

Try our Open Source Assets for RAG Systems

Open Source - by EPH4™

AI RAG Chunkenizer

Document chunking for RAG pipelines. Process PDF, Word, Excel — 100% in browser. No data leaves your device.

License
MIT
Free & Open Source
GDPR SOC2

Part I: The Fundamental Problem

Large language models possess remarkable capabilities but suffer from a critical architectural constraint: the context window. This window represents the maximum number of tokens (discrete units of text typically representing approximately four characters or three-quarters of an English word) that a model can process in a single inference call.

Even as context windows have expanded into the millions of tokens, enterprise reality renders them insufficient. A mid-sized organization might maintain 500,000 documents across contracts, technical specifications, internal communications, research papers, and operational records. A single legal discovery request might involve 2 million documents. Financial institutions routinely manage document repositories exceeding one petabyte. Converting these figures to tokens produces numbers that dwarf any context window: a petabyte of text documents translates to roughly 250 trillion tokens.

Beyond sheer volume, stuffing massive context windows creates practical problems. Inference costs scale with token count. Latency increases. Most critically, LLM attention mechanisms degrade with context length, models struggle to maintain focus across extremely long contexts, a phenomenon researchers call “lost in the middle” where information in the center of long contexts receives less attention than information at the beginning or end.

Retrieval-Augmented Generation addresses these constraints by decoupling knowledge storage from knowledge processing. Rather than forcing all information through the context window, RAG systems maintain external knowledge bases and retrieve only the specific information relevant to each query. The LLM receives a focused, manageable context containing precisely the passages needed to answer the user’s question.

Part II: System Architecture Overview

A production RAG system comprises two distinct pipelines operating asynchronously.

The injection pipeline (also called the indexing pipeline) processes source documents into a searchable format. This pipeline runs offline during initial system setup and periodically thereafter as new documents enter the corpus. Its output is a populated vector database containing mathematical representations of document content.

The retrieval pipeline (also called the query pipeline) handles real-time user interactions. When a user submits a query, this pipeline searches the vector database, retrieves relevant content, and orchestrates LLM response generation. Latency requirements are strict; users expect responses within seconds.

Understanding both pipelines in technical depth is essential for building systems that actually work.

Part III: The Injection Pipeline

Stage 1: Document Ingestion and Preprocessing

Raw documents arrive in heterogeneous formats: PDF, DOCX, PPTX, HTML, Markdown, plain text, scanned images requiring OCR, emails with attachments, spreadsheets containing textual data. The ingestion layer must normalize this diversity into clean, structured text.

PDF extraction presents particular challenges. PDFs are presentation formats, not semantic formats, they specify where characters appear on a page, not what those characters mean. Tables become scrambled text. Multi-column layouts interleave incorrectly. Headers and footers repeat on every page. Figures contain text that extraction tools miss or misplace. Production systems require specialized PDF parsers: libraries like PyMuPDF (fitz), pdfplumber, or commercial solutions like Amazon Textract or Azure Document Intelligence that apply machine learning to reconstruct document structure.

Metadata preservation matters significantly. Document titles, authors, creation dates, section headings, and source URLs provide crucial context during retrieval. A chunk stating “revenue increased 15%” means nothing without knowing which company, which quarter, which document. Injection pipelines must extract and associate metadata with each chunk.

Text cleaning removes noise that degrades embedding quality: excessive whitespace, special characters, encoding artifacts, boilerplate headers/footers, page numbers, and irrelevant formatting markers. However, aggressive cleaning risks removing meaningful content—code snippets, technical notation, structured data. Cleaning strategies must be tuned to document types.

Stage 2: Chunking Strategies

Chunking: dividing documents into retrievable units is where RAG systems succeed or fail. Poor chunking cascades through the entire system: embeddings capture wrong semantic boundaries, similarity search returns irrelevant results, and LLM responses hallucinate or miss critical information.

Fixed-Size Chunking

The simplest approach divides text into chunks of predetermined token count, typically 256 to 1,024 tokens. Implementation is straightforward: tokenize the document, split at regular intervals, optionally add overlap between consecutive chunks to preserve context across boundaries.

Document: [Token1, Token2, ... Token10000]
Chunk size: 512 tokens
Overlap: 50 tokens

Chunk 1: Tokens 1-512
Chunk 2: Tokens 463-974
Chunk 3: Tokens 925-1436
...

Overlap prevents information loss when important passages span chunk boundaries. Without overlap, a sentence beginning at token 510 and ending at token 520 would be split across chunks, potentially rendering both fragments semantically incomplete.

Advantages: Predictable chunk sizes simplify embedding model input management and retrieval token budgeting. Implementation requires minimal document understanding.

Disadvantages: Arbitrary boundaries ignore semantic structure. A chunk might begin mid-sentence, split a paragraph discussing a single concept, or combine the conclusion of one topic with the introduction of another. These semantically incoherent chunks produce embeddings that fail to capture meaningful content.

Sentence-Based Chunking

This approach respects grammatical boundaries by splitting only at sentence endpoints. Sentences are grouped until reaching a target chunk size.

Sentence boundary detection requires more than splitting on periods, abbreviations such as (Dr., Inc., U.S.), decimal numbers (3.14), and ellipses (…) create false positives. Production implementations use NLP libraries with trained sentence tokenizers: spaCy, NLTK’s Punkt tokenizer, or regex patterns tuned to specific document types.

python.py
# Conceptual implementation
sentences = sentence_tokenize(document)
chunks = []
current_chunk = []
current_length = 0

for sentence in sentences:
    sentence_length = count_tokens(sentence)
    if current_length + sentence_length > max_chunk_size:
        chunks.append(' '.join(current_chunk))
        current_chunk = [sentence]
        current_length = sentence_length
    else:
        current_chunk.append(sentence)
        current_length += sentence_length

Advantages: Chunks contain complete grammatical units. Embeddings capture coherent statements rather than fragments.

Disadvantages: Sentence length varies dramatically. Technical documents might contain 200-word sentences; social media content might average 8 words. Resulting chunk sizes are inconsistent, complicating downstream processing.

Recursive Character Text Splitting

This strategy, popularized by LangChain, attempts to split at the most semantically meaningful boundary available. It tries a hierarchy of separators in order: paragraph breaks (double newlines), single newlines, sentences (periods), and finally spaces or characters.

Separator hierarchy:
1. "\n\n" (paragraphs)
2. "\n" (lines)
3. ". " (sentences)
4. " " (words)
5. "" (characters)

The algorithm recursively applies this hierarchy: first attempt paragraph splits; if resulting chunks exceed the target size, split those chunks at line breaks; if still too large, split at sentences; and so on.

Advantages: Preserves the highest-level semantic structure possible for each section of text. A document with clear paragraph breaks chunks at paragraphs; dense text without breaks chunks at sentences.

Disadvantages: Results vary unpredictably based on document formatting. Documents with inconsistent formatting produce inconsistent chunk quality.

Semantic Chunking

Rather than relying on textual markers, semantic chunking uses embedding similarity to identify topic boundaries. The algorithm embeds individual sentences, then measures similarity between consecutive sentences. Sharp drops in similarity indicate topic transitions—natural chunk boundaries.

Sentence 1: "Q3 revenue reached $4.2 billion." 
Sentence 2: "This represents 12% year-over-year growth."
Similarity: 0.89 (high—same topic)

Sentence 2: "This represents 12% year-over-year growth."
Sentence 3: "The company announced a new CEO yesterday."
Similarity: 0.34 (low—topic shift, chunk boundary)

Implementation requires embedding each sentence individually, computationally expensive for large corporations. Threshold tuning determines sensitivity to topic shifts.

Advantages: Chunks align with actual semantic boundaries regardless of formatting. Each chunk discusses a coherent topic.

Disadvantages: Computational cost scales with sentence count. Threshold selection requires experimentation. Short documents may lack sufficient topic variation.

Document-Structure-Aware Chunking

Technical documents, research papers, legal contracts, software documentation, contain explicit structural hierarchies: chapters, sections, subsections, headings. Structure-aware chunking parses this hierarchy and chunks within structural boundaries.

For Markdown documents, this means splitting at header levels. For HTML, parsing the DOM tree. For PDFs with detected structure, using section boundaries identified by document understanding models.

# Chapter 1: Introduction          → Chunk boundary
## 1.1 Background                  → Chunk boundary  
Content content content...
## 1.2 Problem Statement           → Chunk boundary
Content content content...
# Chapter 2: Methodology           → Chunk boundary

Advantages: Chunks correspond to author-intended organizational units. Retrieval can leverage structure—”find information from the Methodology section.”

Disadvantages: Requires reliable structure detection, which fails on poorly formatted or unstructured documents. Section sizes vary enormously; a 50-page chapter and a 2-paragraph subsection are both “sections.”

Agentic Chunking

An emerging approach uses LLMs themselves to perform chunking. The document is passed to a language model with instructions to identify semantically coherent passages and propose chunk boundaries.

System: You are a document analyst. Read the following text and identify 
        natural topic boundaries. Output a list of character positions 
        where the text should be split to create semantically coherent chunks.

User: [Document text]

The model analyzes content meaning, identifies topic transitions, and returns boundary positions. This approach leverages the LLM’s deep language understanding to make intelligent chunking decisions.

python.py
def agentic_chunk(document: str, llm_client) -> list[str]:
    response = llm_client.complete(
        system="Identify natural topic boundaries in this document. "
               "Return a JSON array of character positions where splits should occur.",
        user=document
    )
    
    boundaries = json.loads(response)
    chunks = []
    start = 0
    
    for boundary in boundaries:
        chunks.append(document[start:boundary])
        start = boundary
    
    chunks.append(document[start:])
    return chunks

Advantages: Highest-quality semantic boundaries. Adapts to any document type without format-specific rules. Can handle complex documents where other methods fail.

Disadvantages: Extremely expensive, requires LLM inference for every document during indexing. Latency makes it impractical for large corpora. Results are non-deterministic.

Choosing a Chunking Strategy

No single chunking strategy dominates. Selection depends on document characteristics and system requirements:

Document Type Recommended Strategy
Well-structured technical docs Structure-aware
Conversational/informal text Sentence-based
Mixed-format enterprise docs Recursive splitting
High-value, low-volume content Semantic or agentic
Large-scale commodity content Fixed-size with overlap
Swipe to scroll

Many production systems combine strategies: structure-aware chunking for documents with clear hierarchies, falling back to recursive splitting for unstructured content.

Stage 3: Embedding Generation

Once documents are chunked, each chunk must be converted into a vector representation (a list of floating-point numbers that captures semantic meaning). These vectors enable similarity search: chunks with similar meanings produce vectors that are geometrically close in the embedding space.

Understanding Embeddings

An embedding model is a neural network trained to map text to vectors such that semantically similar texts produce similar vectors. The training process uses contrastive learning: the model sees pairs of related texts (positive pairs) and unrelated texts (negative pairs), learning to push positive pairs together and negative pairs apart in the vector space.

Text: "The cat sat on the mat"
Embedding: [0.023, -0.156, 0.891, ..., 0.445]  # 384-1536 dimensions

Text: "A feline rested on the rug"
Embedding: [0.019, -0.148, 0.887, ..., 0.451]  # Similar vector

Text: "Stock prices rose yesterday"
Embedding: [-0.445, 0.667, -0.123, ..., -0.298]  # Different vector

The resulting vector space exhibits remarkable properties. Synonyms cluster together. Analogies manifest as geometric relationships. Documents discussing similar topics occupy nearby regions regardless of surface-level vocabulary differences.

Selecting an Embedding Model

The embedding model choice significantly impacts retrieval quality. Key selection criteria include:

Dimensionality: Vector size affects storage costs and search speed. Smaller dimensions (384) are faster and cheaper; larger dimensions (1536+) capture more nuance. Most production systems use 768-1024 dimensions as a balanced choice.

Context Length: Embedding models have maximum input lengths, typically 512-8192 tokens. Chunks exceeding this limit are truncated, losing information. Chunk sizes must align with model limits.

Domain Specificity: General-purpose models (OpenAI’s text-embedding-3, Cohere’s embed-v3) work well across domains. Specialized models trained on medical, legal, or scientific text outperform general models in those domains.

Multilingual Support: For international deployments, models must handle multiple languages. Some models embed different languages into the same vector space, enabling cross-lingual retrieval.

Popular embedding models for production RAG systems:

Model Dimensions Max Tokens Strengths
OpenAI text-embedding-3-large 3072 8191 High quality, easy API
Cohere embed-v3 1024 512 Strong multilingual
Voyage AI voyage-large-2 1024 16000 Long context
BGE-large-en-v1.5 1024 512 Open source, self-hostable
E5-large-v2 1024 512 Strong zero-shot performance
Swipe to scroll

Embedding Pipeline Implementation

Production embedding pipelines must handle scale efficiently. Key implementation patterns:

Batching: Embedding models process multiple texts per inference call. Batching amortizes API overhead and GPU utilization.

python.py
from openai import OpenAI
import numpy as np

client = OpenAI()

def embed_chunks(chunks: list[str], batch_size: int = 100) -> np.ndarray:
    """Embed chunks with batching for efficiency."""
    all_embeddings = []
    
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i + batch_size]
        
        response = client.embeddings.create(
            model="text-embedding-3-large",
            input=batch,
            dimensions=1024  # Optional dimensionality reduction
        )
        
        batch_embeddings = [item.embedding for item in response.data]
        all_embeddings.extend(batch_embeddings)
    
    return np.array(all_embeddings)

Parallelization: For large corpora, parallelize embedding generation across multiple workers or API calls.

python.py
from concurrent.futures import ThreadPoolExecutor, as_completed

def parallel_embed(chunks: list[str], max_workers: int = 10) -> np.ndarray:
    """Parallel embedding with thread pool."""
    
    def embed_batch(batch):
        response = client.embeddings.create(
            model="text-embedding-3-large",
            input=batch
        )
        return [item.embedding for item in response.data]
    
    # Split into batches
    batch_size = 100
    batches = [chunks[i:i+batch_size] for i in range(0, len(chunks), batch_size)]
    
    embeddings = [None] * len(batches)
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_idx = {
            executor.submit(embed_batch, batch): idx 
            for idx, batch in enumerate(batches)
        }
        
        for future in as_completed(future_to_idx):
            idx = future_to_idx[future]
            embeddings[idx] = future.result()
    
    # Flatten results maintaining order
    return np.array([e for batch in embeddings for e in batch])

Caching: Avoid re-embedding unchanged content. Hash chunk content and check cache before embedding.

python.py
import hashlib
import redis

cache = redis.Redis()

def cached_embed(chunk: str) -> list[float]:
    """Embed with Redis caching."""
    chunk_hash = hashlib.sha256(chunk.encode()).hexdigest()
    
    cached = cache.get(f"embedding:{chunk_hash}")
    if cached:
        return json.loads(cached)
    
    response = client.embeddings.create(
        model="text-embedding-3-large",
        input=[chunk]
    )
    embedding = response.data[0].embedding
    
    cache.set(f"embedding:{chunk_hash}", json.dumps(embedding))
    return embedding

Stage 4: Vector Database Storage

Embeddings require specialized storage optimized for similarity search. Vector databases provide this capability, storing vectors alongside metadata and enabling fast nearest-neighbor queries across millions or billions of vectors.

Vector Database Fundamentals

Traditional databases excel at exact match queries: find records where user_id = 12345. Vector databases solve a different problem: find vectors most similar to a query vector according to some distance metric.

Common similarity metrics include:

Cosine Similarity: Measures the angle between vectors, ignoring magnitude. Values range from -1 (opposite) to 1 (identical). Most common for text embeddings.

python.py
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

Euclidean Distance (L2): Measures straight-line distance between vector endpoints. Smaller values indicate higher similarity.

python.py
def euclidean_distance(a: np.ndarray, b: np.ndarray) -> float:
    return np.linalg.norm(a - b)

Dot Product: Simple inner product, equivalent to cosine similarity for normalized vectors. Fastest to compute.

python.py
def dot_product(a: np.ndarray, b: np.ndarray) -> float:
    return np.dot(a, b)

Approximate Nearest Neighbor (ANN) Algorithms

Exact nearest neighbor search requires comparing the query vector to every vector in the database; O(n) complexity that becomes prohibitive at scale. ANN algorithms trade perfect accuracy for dramatic speed improvements.

HNSW (Hierarchical Navigable Small World): Builds a multi-layer graph where each node connects to nearby neighbors. Search navigates from coarse upper layers to precise lower layers. Excellent query performance with high recall.

IVF (Inverted File Index): Clusters vectors into partitions using k-means. Search first identifies relevant clusters, then searches only within those clusters. Good for very large datasets with acceptable recall trade-offs.

PQ (Product Quantization): Compresses vectors by dividing them into subvectors and quantizing each independently. Dramatically reduces memory usage at some accuracy cost. Often combined with IVF.

Most production systems use HNSW for datasets under 100 million vectors, switching to IVF-PQ hybrids for larger scale.

Vector Database Options

Pinecone: Fully managed, serverless vector database. Excellent developer experience, automatic scaling, built-in metadata filtering. Higher cost at scale.

python.py
from pinecone import Pinecone

pc = Pinecone(api_key="your-api-key")
index = pc.Index("rag-index")

# Upsert vectors with metadata
index.upsert(
    vectors=[
        {
            "id": "chunk_001",
            "values": embedding,
            "metadata": {
                "document_id": "doc_123",
                "source": "annual_report_2024.pdf",
                "section": "Financial Results",
                "page": 15
            }
        }
    ],
    namespace="financial_docs"
)

# Query with metadata filtering
results = index.query(
    vector=query_embedding,
    top_k=10,
    namespace="financial_docs",
    filter={"source": {"$eq": "annual_report_2024.pdf"}}
)

Weaviate: Open-source, supports hybrid search combining vectors with BM25 keyword search. GraphQL API. Can self-host or use managed cloud.

python.py
import weaviate

client = weaviate.Client("http://localhost:8080")

# Create schema
client.schema.create_class({
    "class": "Document",
    "vectorizer": "none",  # We provide our own vectors
    "properties": [
        {"name": "content", "dataType": ["text"]},
        {"name": "source", "dataType": ["string"]},
        {"name": "page", "dataType": ["int"]}
    ]
})

# Add data
client.data_object.create(
    class_name="Document",
    data_object={
        "content": chunk_text,
        "source": "report.pdf",
        "page": 5
    },
    vector=embedding
)

# Hybrid search
results = client.query.get(
    "Document", 
    ["content", "source"]
).with_hybrid(
    query="revenue growth",
    vector=query_embedding,
    alpha=0.5  # Balance between vector and keyword
).with_limit(10).do()

Qdrant: Open-source, Rust-based for performance. Strong filtering capabilities, payload indexing, quantization support.

python.py
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Distance, PointStruct

client = QdrantClient(host="localhost", port=6333)

# Create collection
client.create_collection(
    collection_name="documents",
    vectors_config=VectorParams(
        size=1024,
        distance=Distance.COSINE
    )
)

# Upsert points
client.upsert(
    collection_name="documents",
    points=[
        PointStruct(
            id=1,
            vector=embedding,
            payload={
                "content": chunk_text,
                "document_id": "doc_123",
                "created_at": "2024-01-15"
            }
        )
    ]
)

# Search with filtering
results = client.search(
    collection_name="documents",
    query_vector=query_embedding,
    limit=10,
    query_filter={
        "must": [
            {"key": "document_id", "match": {"value": "doc_123"}}
        ]
    }
)

pgvector: PostgreSQL extension adding vector similarity search. Ideal when you need vectors alongside relational data without managing separate infrastructure.

python.py
-- Enable extension
CREATE EXTENSION vector;

-- Create table with vector column
CREATE TABLE document_chunks (
    id SERIAL PRIMARY KEY,
    content TEXT,
    embedding vector(1024),
    document_id INTEGER REFERENCES documents(id),
    created_at TIMESTAMP DEFAULT NOW()
);

-- Create index for fast search
CREATE INDEX ON document_chunks 
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

-- Query similar chunks
SELECT id, content, 1 - (embedding <=> $1) as similarity
FROM document_chunks
WHERE document_id = $2
ORDER BY embedding <=> $1
LIMIT 10;

Chroma: Lightweight, embedded vector database. Excellent for prototyping and small-scale applications. Runs in-process without separate server.

python.py
import chromadb

client = chromadb.Client()
collection = client.create_collection("documents")

# Add documents
collection.add(
    ids=["chunk_1", "chunk_2"],
    embeddings=[embedding_1, embedding_2],
    metadatas=[
        {"source": "doc1.pdf", "page": 1},
        {"source": "doc1.pdf", "page": 2}
    ],
    documents=[chunk_1_text, chunk_2_text]
)

# Query
results = collection.query(
    query_embeddings=[query_embedding],
    n_results=10,
    where={"source": "doc1.pdf"}
)

Index Configuration Best Practices

Vector database performance depends heavily on index configuration:

Segment Size: Balance between search speed (fewer segments) and update latency (more segments). Start with defaults, tune based on query patterns.

HNSW Parameters:

  • M: Number of connections per node. Higher values improve recall but increase memory and build time. Typical range: 16-64.
  • ef_construction: Search width during index building. Higher values improve index quality at build-time cost. Typical range: 100-500.
  • ef_search: Search width during queries. Higher values improve recall at query-time cost. Typical range: 50-200.

Quantization: Enable scalar or product quantization for datasets exceeding available RAM. Accept 2-5% recall degradation for 4-8x memory reduction.

python.py
# Qdrant quantization example
client.create_collection(
    collection_name="documents",
    vectors_config=VectorParams(size=1024, distance=Distance.COSINE),
    quantization_config=ScalarQuantization(
        scalar=ScalarQuantizationConfig(
            type=ScalarType.INT8,
            always_ram=True
        )
    )
)

Part IV: The Retrieval Pipeline

With the injection pipeline complete, documents exist as searchable vectors. The retrieval pipeline handles real-time queries, transforming user questions into relevant context for LLM generation.

Stage 1: Query Processing

User queries arrive in natural language, often ambiguous, incomplete, or poorly phrased. Query processing transforms raw input into effective search queries.

Query Embedding

The simplest approach embeds the user query using the same embedding model used for chunks, then searches for similar vectors.

python.py
def basic_retrieve(query: str, top_k: int = 5) -> list[dict]:
    query_embedding = embed_text(query)
    
    results = vector_db.search(
        vector=query_embedding,
        top_k=top_k
    )
    
    return [
        {
            "content": r.payload["content"],
            "score": r.score,
            "metadata": r.payload
        }
        for r in results
    ]

This works well for queries that closely match document language. It fails when queries use different terminology than documents—a user asking “how to fix a broken build” won’t match documentation about “resolving compilation errors.”

Query Expansion

Query expansion enriches the original query with related terms, synonyms, or reformulations to improve recall.

LLM-Based Expansion: Use a language model to generate alternative phrasings:

python.py
def expand_query(original_query: str) -> list[str]:
    response = llm.complete(
        system="""Generate 3 alternative phrasings of the user's query 
                  that might match relevant documents. Return as JSON array.""",
        user=original_query
    )
    
    alternatives = json.loads(response)
    return [original_query] + alternatives

# Example
# Input: "how to fix broken build"
# Output: [
#     "how to fix broken build",
#     "resolving build failures",
#     "troubleshooting compilation errors",
#     "debugging failed builds"
# ]

HyDE (Hypothetical Document Embeddings): Generate a hypothetical answer to the query, then search using that answer’s embedding. The hypothesis matches document language better than the question.

python.py
def hyde_retrieve(query: str, top_k: int = 5) -> list[dict]:
    # Generate hypothetical answer
    hypothesis = llm.complete(
        system="Write a short passage that would answer this question.",
        user=query
    )
    
    # Embed the hypothesis, not the query
    hypothesis_embedding = embed_text(hypothesis)
    
    results = vector_db.search(
        vector=hypothesis_embedding,
        top_k=top_k
    )
    
    return results

Query Decomposition

Complex queries often contain multiple sub-questions. Decomposition breaks them into simpler queries that can be answered independently.

python.py
def decompose_query(complex_query: str) -> list[str]:
    response = llm.complete(
        system="""Break this complex question into simpler sub-questions 
                  that can be answered independently. Return as JSON array.""",
        user=complex_query
    )
    
    return json.loads(response)

# Example
# Input: "Compare the Q3 and Q4 revenue, and explain what caused the difference"
# Output: [
#     "What was the Q3 revenue?",
#     "What was the Q4 revenue?",
#     "What factors affected revenue between Q3 and Q4?"
# ]

Each sub-query retrieves independently, and results are aggregated for the final response.

Stage 2: Retrieval Strategies

Beyond basic vector search, sophisticated retrieval strategies improve result quality.

Hybrid Search

Pure vector search misses exact keyword matches that users expect. Hybrid search combines vector similarity with traditional keyword search (BM25).

python.py
def hybrid_search(query: str, alpha: float = 0.5, top_k: int = 10) -> list[dict]:
    """
    Combine vector and keyword search.
    alpha: weight for vector search (1-alpha for keyword)
    """
    
    # Vector search
    query_embedding = embed_text(query)
    vector_results = vector_db.search(query_embedding, top_k=top_k * 2)
    
    # Keyword search (BM25)
    keyword_results = keyword_index.search(query, top_k=top_k * 2)
    
    # Combine scores using Reciprocal Rank Fusion
    combined_scores = {}
    
    for rank, result in enumerate(vector_results):
        doc_id = result.id
        combined_scores[doc_id] = combined_scores.get(doc_id, 0) + alpha / (rank + 60)
    
    for rank, result in enumerate(keyword_results):
        doc_id = result.id
        combined_scores[doc_id] = combined_scores.get(doc_id, 0) + (1 - alpha) / (rank + 60)
    
    # Sort by combined score
    sorted_results = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)
    
    return sorted_results[:top_k]

Many vector databases support hybrid search natively:

python.py
# Weaviate hybrid search
results = client.query.get(
    "Document",
    ["content", "source"]
).with_hybrid(
    query="revenue growth projections",
    alpha=0.7  # 70% vector, 30% keyword
).with_limit(10).do()

Multi-Vector Retrieval

Some systems create multiple embeddings per chunk, capturing different semantic aspects:

ColBERT-style late interaction: Embed each token separately, compute fine-grained similarity between query and document tokens.

Summary + content embeddings: Embed both chunk content and an LLM-generated summary. Summaries capture high-level meaning; content captures details.

python.py
def create_multi_vector_chunk(content: str) -> dict:
    # Generate summary
    summary = llm.complete(
        system="Summarize this passage in one sentence.",
        user=content
    )
    
    return {
        "content": content,
        "content_embedding": embed_text(content),
        "summary": summary,
        "summary_embedding": embed_text(summary)
    }

Parent-Child Retrieval

Retrieval often returns chunks too small for context. Parent-child retrieval searches on small chunks for precision but returns larger parent chunks for context.

python.py
def parent_child_retrieve(query: str, top_k: int = 5) -> list[str]:
    query_embedding = embed_text(query)
    
    # Search child chunks (small, precise)
    child_results = child_index.search(query_embedding, top_k=top_k)
    
    # Retrieve parent chunks (larger context)
    parent_ids = set(r.payload["parent_id"] for r in child_results)
    parent_chunks = [parent_store.get(pid) for pid in parent_ids]
    
    return parent_chunks

During indexing:

python.py
def index_with_hierarchy(document: str):
    # Create large parent chunks
    parent_chunks = chunk_document(document, chunk_size=2000)
    
    for parent_idx, parent in enumerate(parent_chunks):
        parent_id = f"parent_{parent_idx}"
        
        # Store parent chunk
        parent_store.set(parent_id, parent)
        
        # Create smaller child chunks
        child_chunks = chunk_document(parent, chunk_size=400)
        
        for child_idx, child in enumerate(child_chunks):
            child_embedding = embed_text(child)
            
            # Index child with reference to parent
            child_index.upsert({
                "id": f"child_{parent_idx}_{child_idx}",
                "embedding": child_embedding,
                "payload": {
                    "content": child,
                    "parent_id": parent_id
                }
            })

Stage 3: Reranking

Initial retrieval optimizes for recall, finding all potentially relevant chunks. Reranking optimizes for precision, ordering results by actual relevance to the query.

Cross-Encoder Reranking

Cross-encoders process query-document pairs jointly, enabling fine-grained relevance assessment impossible with independent embeddings.

python.py
from sentence_transformers import CrossEncoder

reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2')

def rerank_results(query: str, results: list[dict], top_k: int = 5) -> list[dict]:
    # Create query-document pairs
    pairs = [[query, r["content"]] for r in results]
    
    # Score pairs
    scores = reranker.predict(pairs)
    
    # Sort by score
    scored_results = list(zip(results, scores))
    scored_results.sort(key=lambda x: x[1], reverse=True)
    
    return [r for r, s in scored_results[:top_k]]

Cross-encoders are slower than bi-encoders (embeddings) but significantly more accurate. Use them to rerank a shortlist (20-100 candidates) retrieved by faster methods.

LLM-Based Reranking

Language models can assess relevance with nuanced understanding:

python.py
def llm_rerank(query: str, results: list[dict], top_k: int = 5) -> list[dict]:
    # Format results for LLM
    results_text = "\n\n".join([
        f"[{i}] {r['content'][:500]}" 
        for i, r in enumerate(results)
    ])
    
    response = llm.complete(
        system="""Given a query and search results, rank the results by relevance.
                  Return a JSON array of result indices in order of relevance.""",
        user=f"Query: {query}\n\nResults:\n{results_text}"
    )
    
    ranking = json.loads(response)
    return [results[i] for i in ranking[:top_k]]

Cohere Rerank API

Production-ready reranking as a service:

python.py
import cohere

co = cohere.Client(api_key="your-api-key")

def cohere_rerank(query: str, results: list[dict], top_k: int = 5) -> list[dict]:
    documents = [r["content"] for r in results]
    
    response = co.rerank(
        model="rerank-english-v3.0",
        query=query,
        documents=documents,
        top_n=top_k
    )
    
    return [results[r.index] for r in response.results]

Stage 4: Context Assembly

Retrieved chunks must be assembled into a coherent context for the LLM. This stage handles deduplication, ordering, and token budget management.

Deduplication

Retrieval often returns overlapping or duplicate content, especially with chunk overlap or multi-query strategies.

python.py
def deduplicate_results(results: list[dict], similarity_threshold: float = 0.9) -> list[dict]:
    unique_results = []
    seen_embeddings = []
    
    for result in results:
        embedding = result.get("embedding") or embed_text(result["content"])
        
        # Check similarity to already-selected results
        is_duplicate = False
        for seen in seen_embeddings:
            if cosine_similarity(embedding, seen) > similarity_threshold:
                is_duplicate = True
                break
        
        if not is_duplicate:
            unique_results.append(result)
            seen_embeddings.append(embedding)
    
    return unique_results

Token Budget Management

LLM context windows impose hard limits. Context assembly must fit retrieved content within budget while maximizing information.

python.py
def assemble_context(
    results: list[dict], 
    max_tokens: int = 4000,
    reserved_tokens: int = 1000  # For query + response
) -> str:
    available_tokens = max_tokens - reserved_tokens
    
    context_parts = []
    current_tokens = 0
    
    for result in results:
        content = result["content"]
        content_tokens = count_tokens(content)
        
        if current_tokens + content_tokens <= available_tokens:
            context_parts.append(content)
            current_tokens += content_tokens
        else:
            # Truncate last chunk to fit
            remaining = available_tokens - current_tokens
            if remaining > 100:  # Only include if meaningful
                truncated = truncate_to_tokens(content, remaining)
                context_parts.append(truncated)
            break
    
    return "\n\n---\n\n".join(context_parts)

Context Ordering

Order affects LLM attention. Place most relevant content at the beginning and end (avoiding the “lost in the middle” problem).

python.py
def order_for_attention(results: list[dict]) -> list[dict]:
    """Place best results at beginning and end."""
    if len(results) <= 2:
        return results
    
    # Assume results are ranked by relevance
    n = len(results)
    ordered = []
    
    # Interleave: best at start and end
    for i in range(n):
        if i % 2 == 0:
            ordered.insert(0, results[i])  # Add to beginning
        else:
            ordered.append(results[i])  # Add to end
    
    return ordered

Stage 5: Response Generation

With context assembled, the LLM generates the final response.

Basic RAG Prompt

python.py
def generate_response(query: str, context: str) -> str:
    prompt = f"""Answer the question based on the provided context. 
If the context doesn't contain relevant information, say so.

Context:
{context}

Question: {query}

Answer:"""
    
    return llm.complete(prompt)

Citation and Attribution

Production systems often require citations to source documents:

python.py
def generate_with_citations(query: str, results: list[dict]) -> str:
    # Number each source
    context_parts = []
    for i, r in enumerate(results):
        context_parts.append(f"[Source {i+1}] {r['content']}")
    
    context = "\n\n".join(context_parts)
    
    prompt = f"""Answer the question using the provided sources. 
Cite sources using [Source N] notation.

Sources:
{context}

Question: {query}

Answer with citations:"""
    
    response = llm.complete(prompt)
    
    # Attach source metadata
    return {
        "answer": response,
        "sources": [
            {
                "index": i + 1,
                "document": r["metadata"].get("source"),
                "page": r["metadata"].get("page")
            }
            for i, r in enumerate(results)
        ]
    }

Handling Insufficient Context

When retrieved context doesn’t answer the query:

python.py
def generate_with_fallback(query: str, context: str, confidence_threshold: float = 0.5) -> dict:
    # Check context relevance
    relevance_check = llm.complete(
        f"""Rate 0-1 how relevant this context is to the question.
        
Context: {context[:1000]}

Question: {query}

Relevance score (0-1):"""
    )
    
    relevance = float(relevance_check.strip())
    
    if relevance < confidence_threshold:
        return {
            "answer": "I don't have enough information in the available documents to answer this question confidently.",
            "confidence": relevance,
            "suggestion": "Try rephrasing your question or asking about a different topic."
        }
    
    answer = llm.complete(f"Context: {context}\n\nQuestion: {query}\n\nAnswer:")
    
    return {
        "answer": answer,
        "confidence": relevance
    }

Part V: Advanced RAG Techniques

Basic RAG retrieves static chunks and generates single-pass responses. Advanced techniques add sophistication for complex use cases.

Agentic RAG

Rather than a single retrieval-generation cycle, agentic RAG uses LLMs to orchestrate multi-step reasoning.

python.py
class AgenticRAG:
    def __init__(self, retriever, llm):
        self.retriever = retriever
        self.llm = llm
    
    def answer(self, query: str, max_iterations: int = 5) -> str:
        context = []
        
        for iteration in range(max_iterations):
            # Decide next action
            action = self.llm.complete(f"""
Given the question and information gathered so far, decide the next action.

Question: {query}

Information gathered:
{self._format_context(context)}

Actions:
1. SEARCH: <search query> - Search for more information
2. ANSWER: <answer> - Provide final answer
3. CLARIFY: <clarification> - Ask for clarification

Next action:""")
            
            if action.startswith("ANSWER:"):
                return action[7:].strip()
            
            elif action.startswith("SEARCH:"):
                search_query = action[7:].strip()
                results = self.retriever.retrieve(search_query)
                context.extend(results)
            
            elif action.startswith("CLARIFY:"):
                return action[8:].strip()
        
        # Max iterations reached
        return self._generate_best_effort(query, context)
    
    def _format_context(self, context: list) -> str:
        if not context:
            return "None yet"
        return "\n".join([f"- {c['content'][:200]}..." for c in context])
    
    def _generate_best_effort(self, query: str, context: list) -> str:
        context_text = "\n\n".join([c["content"] for c in context])
        return self.llm.complete(f"Context: {context_text}\n\nQuestion: {query}\n\nAnswer:")

Graph RAG

Standard RAG treats documents as independent units. Graph RAG captures relationships between entities and concepts.

Entity Extraction and Graph Construction

python.py
def build_knowledge_graph(chunks: list[str]) -> nx.Graph:
    graph = nx.Graph()
    
    for chunk in chunks:
        # Extract entities and relationships
        extraction = llm.complete(f"""
Extract entities and relationships from this text.
Return JSON: {{"entities": ["entity1", ...], "relationships": [["entity1", "relationship", "entity2"], ...]}}

Text: {chunk}""")
        
        data = json.loads(extraction)
        
        # Add to graph
        for entity in data["entities"]:
            graph.add_node(entity, chunks=[chunk])
        
        for subj, rel, obj in data["relationships"]:
            graph.add_edge(subj, obj, relationship=rel)
            # Associate chunk with both nodes
            if chunk not in graph.nodes[subj].get("chunks", []):
                graph.nodes[subj]["chunks"] = graph.nodes[subj].get("chunks", []) + [chunk]
            if chunk not in graph.nodes[obj].get("chunks", []):
                graph.nodes[obj]["chunks"] = graph.nodes[obj].get("chunks", []) + [chunk]
    
    return graph

Graph-Enhanced Retrieval

python.py
def graph_retrieve(query: str, graph: nx.Graph, top_k: int = 5) -> list[str]:
    # Extract entities from query
    query_entities = llm.complete(f"Extract key entities from: {query}")
    entities = json.loads(query_entities)
    
    relevant_chunks = set()
    
    for entity in entities:
        # Find entity in graph (fuzzy match)
        matches = [n for n in graph.nodes if entity.lower() in n.lower()]
        
        for match in matches:
            # Get chunks directly associated with entity
            relevant_chunks.update(graph.nodes[match].get("chunks", []))
            
            # Get chunks from connected entities (1-hop)
            for neighbor in graph.neighbors(match):
                relevant_chunks.update(graph.nodes[neighbor].get("chunks", []))
    
    # Rank by relevance
    ranked = rank_chunks(query, list(relevant_chunks))
    return ranked[:top_k]

Self-RAG

Self-RAG adds reflection, the LLM critiques its own retrieval and generation, iterating to improve quality.

python.py
class SelfRAG:
    def __init__(self, retriever, llm):
        self.retriever = retriever
        self.llm = llm
    
    def answer(self, query: str) -> str:
        # Initial retrieval
        results = self.retriever.retrieve(query)
        
        # Critique retrieval
        retrieval_critique = self.llm.complete(f"""
Evaluate if these retrieved passages are sufficient to answer the question.

Question: {query}

Passages:
{self._format_results(results)}

Evaluation (SUFFICIENT/INSUFFICIENT + explanation):""")
        
        if "INSUFFICIENT" in retrieval_critique:
            # Identify gaps and re-retrieve
            gap_query = self.llm.complete(f"""
The retrieved information is insufficient. What specific information is missing?
Generate a search query to find the missing information.

Original question: {query}
Current information: {self._format_results(results)}
Critique: {retrieval_critique}

New search query:""")
            
            additional_results = self.retriever.retrieve(gap_query)
            results.extend(additional_results)
        
        # Generate answer
        answer = self._generate(query, results)
        
        # Critique answer
        answer_critique = self.llm.complete(f"""
Evaluate this answer for accuracy and completeness.

Question: {query}
Answer: {answer}
Source passages: {self._format_results(results)}

Evaluation (GOOD/NEEDS_IMPROVEMENT + specific issues):""")
        
        if "NEEDS_IMPROVEMENT" in answer_critique:
            # Regenerate with critique
            answer = self.llm.complete(f"""
Improve this answer based on the critique.

Question: {query}
Original answer: {answer}
Critique: {answer_critique}
Sources: {self._format_results(results)}

Improved answer:""")
        
        return answer

Corrective RAG (CRAG)

CRAG evaluates retrieval quality and takes corrective action when results are poor.

python.py
class CorrectiveRAG:
    def __init__(self, retriever, web_search, llm):
        self.retriever = retriever
        self.web_search = web_search
        self.llm = llm
    
    def answer(self, query: str) -> str:
        results = self.retriever.retrieve(query)
        
        # Evaluate each result
        evaluations = []
        for r in results:
            score = self.llm.complete(f"""
Rate relevance of this passage to the question (CORRECT/AMBIGUOUS/INCORRECT):

Question: {query}
Passage: {r['content'][:500]}

Rating:""")
            evaluations.append((r, score.strip()))
        
        correct = [r for r, s in evaluations if "CORRECT" in s]
        ambiguous = [r for r, s in evaluations if "AMBIGUOUS" in s]
        
        if not correct and not ambiguous:
            # All incorrect - fall back to web search
            web_results = self.web_search.search(query)
            context = self._format_web_results(web_results)
        elif not correct:
            # Only ambiguous - refine with web search
            refined_query = self._refine_query(query, ambiguous)
            web_results = self.web_search.search(refined_query)
            context = self._combine_sources(ambiguous, web_results)
        else:
            # Have correct results
            context = self._format_results(correct + ambiguous)
        
        return self.llm.complete(f"Context: {context}\n\nQuestion: {query}\n\nAnswer:")

Contextual Retrieval

Anthropic’s contextual retrieval technique prepends chunk-specific context generated by an LLM, improving retrieval accuracy significantly.

python.py
def add_contextual_headers(document: str, chunks: list[str]) -> list[str]:
    """Add document-aware context to each chunk."""
    
    contextualized_chunks = []
    
    for chunk in chunks:
        context = llm.complete(f"""
<document>
{document[:20000]}
</document>

Here is the chunk we want to situate within the document:
<chunk>
{chunk}
</chunk>

Please give a short, succinct context to situate this chunk within the overall document. 
Answer only with the context, nothing else.""")
        
        contextualized_chunk = f"{context}\n\n{chunk}"
        contextualized_chunks.append(contextualized_chunk)
    
    return contextualized_chunks

This technique adds context like “This section describes the Q3 2024 financial results, specifically addressing revenue growth in the enterprise segment” before a chunk that otherwise just contains numbers and growth figures without identifying information.

Part VI: Evaluation and Optimization

RAG system quality requires systematic measurement and continuous improvement.

Retrieval Metrics

Recall (K)

What proportion of relevant documents appear in the top K results?

python.py
def recall_at_k(retrieved: list[str], relevant: set[str], k: int) -> float:
    retrieved_set = set(retrieved[:k])
    return len(retrieved_set & relevant) / len(relevant)

Mean Reciprocal Rank (MRR)

How high does the first relevant result rank?

python.py
def mrr(retrieved: list[str], relevant: set[str]) -> float:
    for i, doc in enumerate(retrieved):
        if doc in relevant:
            return 1.0 / (i + 1)
    return 0.0

Normalized Discounted Cumulative Gain (NDCG)

Accounts for graded relevance (not just binary relevant/irrelevant):

python.py
def ndcg_at_k(retrieved: list[str], relevance_scores: dict[str, float], k: int) -> float:
    dcg = sum(
        relevance_scores.get(doc, 0) / np.log2(i + 2)
        for i, doc in enumerate(retrieved[:k])
    )
    
    ideal_order = sorted(relevance_scores.values(), reverse=True)[:k]
    idcg = sum(
        score / np.log2(i + 2)
        for i, score in enumerate(ideal_order)
    )
    
    return dcg / idcg if idcg > 0 else 0.0

End-to-End Metrics

Answer Correctness

Compare generated answers against ground truth:

python.py
def evaluate_correctness(generated: str, ground_truth: str) -> dict:
    evaluation = llm.complete(f"""
Compare the generated answer to the ground truth.

Generated: {generated}
Ground Truth: {ground_truth}

Rate on these dimensions (0-1 each):
1. Factual accuracy: Are stated facts correct?
2. Completeness: Does it cover all key points?
3. Relevance: Does it address the question?

Return JSON: {{"accuracy": X, "completeness": X, "relevance": X}}""")
    
    return json.loads(evaluation)

Faithfulness

Does the answer only contain information from retrieved context (no hallucinations)?

python.py
def evaluate_faithfulness(answer: str, context: str) -> float:
    evaluation = llm.complete(f"""
Check if every claim in the answer is supported by the context.

Answer: {answer}

Context: {context}

For each claim in the answer, is it:
- SUPPORTED: Clearly stated or implied in context
- NOT_SUPPORTED: Not found in context (hallucination)

Return JSON: {{"supported_claims": N, "unsupported_claims": N}}""")
    
    data = json.loads(evaluation)
    total = data["supported_claims"] + data["unsupported_claims"]
    return data["supported_claims"] / total if total > 0 else 0.0

Answer Relevance

Does the answer actually address the question?

python.py
def evaluate_relevance(query: str, answer: str) -> float:
    evaluation = llm.complete(f"""
Rate how well the answer addresses the question (0-1).

Question: {query}
Answer: {answer}

Consider:
- Does it answer what was asked?
- Is it appropriately detailed?
- Does it stay on topic?

Score (0-1):""")
    
    return float(evaluation.strip())

Building Evaluation Datasets

Effective evaluation requires curated test sets:

python.py
def generate_eval_dataset(documents: list[str], num_questions: int = 100) -> list[dict]:
    """Generate question-answer pairs from documents."""
    
    eval_set = []
    
    for doc in random.sample(documents, min(len(documents), num_questions)):
        qa = llm.complete(f"""
Generate a question that can be answered using this document, along with the correct answer.

Document: {doc[:3000]}

Return JSON: {{"question": "...", "answer": "...", "evidence": "quote from doc"}}""")
        
        data = json.loads(qa)
        data["source_document"] = doc
        eval_set.append(data)
    
    return eval_set

Optimization Strategies

Chunk Size Tuning

Test different chunk sizes against your evaluation set:

python.py
def tune_chunk_size(documents: list[str], eval_set: list[dict]) -> dict:
    results = {}
    
    for chunk_size in [256, 512, 768, 1024, 1536]:
        # Re-index with this chunk size
        chunks = chunk_all_documents(documents, chunk_size=chunk_size)
        embeddings = embed_chunks(chunks)
        index = build_index(embeddings)
        
        # Evaluate
        scores = []
        for item in eval_set:
            retrieved = retrieve(item["question"], index)
            score = calculate_recall(retrieved, item["source_document"])
            scores.append(score)
        
        results[chunk_size] = {
            "mean_recall": np.mean(scores),
            "std_recall": np.std(scores),
            "num_chunks": len(chunks)
        }
    
    return results

Embedding Model Comparison

python.py
def compare_embedding_models(eval_set: list[dict]) -> dict:
    models = [
        "text-embedding-3-small",
        "text-embedding-3-large",
        "voyage-large-2",
        "bge-large-en-v1.5"
    ]
    
    results = {}
    
    for model in models:
        # Re-embed with this model
        embeddings = embed_with_model(chunks, model)
        index = build_index(embeddings)
        
        # Evaluate
        scores = evaluate_retrieval(index, eval_set)
        results[model] = scores
    
    return results

Hyperparameter Search

python.py
from itertools import product

def grid_search_rag_params(eval_set: list[dict]) -> dict:
    param_grid = {
        "chunk_size": [512, 768, 1024],
        "chunk_overlap": [0, 50, 100],
        "top_k": [3, 5, 10],
        "rerank": [True, False],
        "hybrid_alpha": [0.5, 0.7, 0.9]
    }
    
    best_score = 0
    best_params = None
    
    for params in product(*param_grid.values()):
        param_dict = dict(zip(param_grid.keys(), params))
        
        # Build system with these params
        rag = build_rag_system(**param_dict)
        
        # Evaluate
        score = evaluate_end_to_end(rag, eval_set)
        
        if score > best_score:
            best_score = score
            best_params = param_dict
    
    return {"best_params": best_params, "best_score": best_score}

Part VII: Production Considerations

Moving RAG from prototype to production introduces engineering challenges beyond core retrieval logic.

Scalability

Horizontal Scaling

Vector databases support clustering for scale:

# Qdrant cluster configuration
storage:
  performance:
    memmap_threshold: 50000  # Use memory mapping for large segments
    
cluster:
  enabled: true
  p2p:
    port: 6335
  consensus:
    tick_period_ms: 100

Sharding Strategies

For very large corpora, partition data across multiple indexes:

python.py
class ShardedRetriever:
    def __init__(self, shard_configs: list[dict]):
        self.shards = [
            VectorDB(config) for config in shard_configs
        ]
        self.router = self._build_router()
    
    def retrieve(self, query: str, top_k: int = 5) -> list[dict]:
        # Determine relevant shards
        relevant_shards = self.router.route(query)
        
        # Query each shard
        all_results = []
        for shard in relevant_shards:
            results = shard.search(query, top_k=top_k)
            all_results.extend(results)
        
        # Merge and re-rank
        return self._merge_results(all_results, top_k)

Latency Optimization

Caching

Cache frequent queries and their results:

python.py
from functools import lru_cache
import hashlib

class CachedRetriever:
    def __init__(self, retriever, cache_size: int = 10000):
        self.retriever = retriever
        self.cache = {}
        self.cache_size = cache_size
    
    def retrieve(self, query: str, top_k: int = 5) -> list[dict]:
        cache_key = hashlib.sha256(f"{query}:{top_k}".encode()).hexdigest()
        
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        results = self.retriever.retrieve(query, top_k)
        
        # LRU eviction
        if len(self.cache) >= self.cache_size:
            oldest_key = next(iter(self.cache))
            del self.cache[oldest_key]
        
        self.cache[cache_key] = results
        return results

Async Processing

Parallelize independent operations:

python.py
import asyncio

async def async_retrieve(query: str) -> dict:
    # Parallel embedding and query expansion
    embedding_task = asyncio.create_task(async_embed(query))
    expansion_task = asyncio.create_task(async_expand_query(query))
    
    query_embedding, expanded_queries = await asyncio.gather(
        embedding_task, 
        expansion_task
    )
    
    # Parallel retrieval across expanded queries
    retrieval_tasks = [
        asyncio.create_task(vector_search(eq))
        for eq in expanded_queries
    ]
    
    all_results = await asyncio.gather(*retrieval_tasks)
    
    return merge_and_dedupe(all_results)

Monitoring and Observability

Key Metrics to Track

python.py
from prometheus_client import Counter, Histogram, Gauge

# Request metrics
query_count = Counter('rag_queries_total', 'Total RAG queries')
query_latency = Histogram('rag_query_duration_seconds', 'Query duration')

# Retrieval metrics
retrieval_count = Histogram('rag_docs_retrieved', 'Documents retrieved per query')
retrieval_scores = Histogram('rag_retrieval_scores', 'Similarity scores')

# Quality metrics
answer_feedback = Counter('rag_answer_feedback', 'User feedback', ['rating'])
hallucination_detected = Counter('rag_hallucinations', 'Detected hallucinations')

@query_latency.time()
def monitored_retrieve(query: str) -> dict:
    query_count.inc()
    
    results = retrieve(query)
    
    retrieval_count.observe(len(results))
    for r in results:
        retrieval_scores.observe(r['score'])
    
    return results

Logging for Debugging

python.py
import structlog

logger = structlog.get_logger()

def logged_rag_pipeline(query: str) -> str:
    request_id = generate_request_id()
    log = logger.bind(request_id=request_id)
    
    log.info("rag_query_received", query=query)
    
    # Retrieval
    start = time.time()
    results = retrieve(query)
    retrieval_time = time.time() - start
    
    log.info("retrieval_complete", 
             num_results=len(results),
             top_score=results[0]['score'] if results else None,
             duration_ms=retrieval_time * 1000)
    
    # Generation
    start = time.time()
    answer = generate(query, results)
    generation_time = time.time() - start
    
    log.info("generation_complete",
             answer_length=len(answer),
             duration_ms=generation_time * 1000)
    
    return answer

Security Considerations

Access Control

Implement document-level permissions:

python.py
class SecureRetriever:
    def __init__(self, retriever, auth_service):
        self.retriever = retriever
        self.auth_service = auth_service
    
    def retrieve(self, query: str, user_id: str, top_k: int = 5) -> list[dict]:
        # Get user's accessible document IDs
        accessible_docs = self.auth_service.get_accessible_documents(user_id)
        
        # Filter retrieval to accessible documents
        results = self.retriever.retrieve(
            query,
            top_k=top_k * 3,  # Over-retrieve to account for filtering
            filter={"document_id": {"$in": accessible_docs}}
        )
        
        return results[:top_k]

Prompt Injection Defense

Sanitize retrieved content before including in prompts:

python.py
def sanitize_for_prompt(content: str) -> str:
    # Remove potential injection patterns
    dangerous_patterns = [
        r"ignore previous instructions",
        r"disregard.*above",
        r"new instructions:",
        r"system:",
        r"<\|.*\|>"
    ]
    
    sanitized = content
    for pattern in dangerous_patterns:
        sanitized = re.sub(pattern, "[FILTERED]", sanitized, flags=re.IGNORECASE)
    
    return sanitized

def safe_generate(query: str, results: list[dict]) -> str:
    sanitized_context = "\n\n".join([
        sanitize_for_prompt(r["content"]) 
        for r in results
    ])
    
    return llm.complete(f"Context: {sanitized_context}\n\nQuestion: {query}\n\nAnswer:")

Document Freshness

Keep indexes synchronized with source documents:

python.py
class IncrementalIndexer:
    def __init__(self, vector_db, document_store):
        self.vector_db = vector_db
        self.document_store = document_store
    
    def sync(self):
        # Get documents modified since last sync
        last_sync = self.get_last_sync_time()
        modified_docs = self.document_store.get_modified_since(last_sync)
        
        for doc in modified_docs:
            if doc.deleted:
                # Remove from index
                self.vector_db.delete(filter={"document_id": doc.id})
            else:
                # Re-chunk and re-embed
                chunks = chunk_document(doc.content)
                embeddings = embed_chunks(chunks)
                
                # Delete old chunks
                self.vector_db.delete(filter={"document_id": doc.id})
                
                # Insert new chunks
                self.vector_db.upsert([
                    {
                        "id": f"{doc.id}_{i}",
                        "embedding": emb,
                        "payload": {
                            "content": chunk,
                            "document_id": doc.id,
                            "updated_at": doc.updated_at
                        }
                    }
                    for i, (chunk, emb) in enumerate(zip(chunks, embeddings))
                ])
        
        self.set_last_sync_time(datetime.now())

Part VIII: Common Pitfalls and Solutions

Pitfall 1: Chunks Lack Context

Problem: A chunk stating “The increase was 15%” is meaningless without knowing what increased.

Solution: Include contextual information in chunks:

python.py
def contextualized_chunking(document: str, metadata: dict) -> list[dict]:
    chunks = chunk_document(document)
    
    return [
        {
            "content": f"Document: {metadata['title']}\n"
                      f"Section: {detect_section(chunk, document)}\n\n"
                      f"{chunk}",
            "metadata": metadata
        }
        for chunk in chunks
    ]

Pitfall 2: Semantic Search Misses Keywords

Problem: Searching for “HIPAA compliance” doesn’t find documents mentioning “Health Insurance Portability and Accountability Act.”

Solution: Use hybrid search combining vector and keyword approaches, or expand acronyms during indexing:

python.py
def expand_acronyms(text: str, acronym_map: dict) -> str:
    expanded = text
    for acronym, expansion in acronym_map.items():
        expanded = re.sub(
            rf'\b{acronym}\b',
            f'{acronym} ({expansion})',
            expanded
        )
    return expanded

Pitfall 3: Retrieved Context Contradicts Itself

Problem: Different documents contain conflicting information, confusing the LLM.

Solution: Add timestamp awareness and conflict detection:

python.py
def detect_conflicts(results: list[dict], query: str) -> list[dict]:
    conflict_check = llm.complete(f"""
Check if these passages contain contradictory information.

Query: {query}

Passages:
{format_results(results)}

Return JSON: {{"has_conflicts": true/false, "conflicts": [...]}}""")
    
    conflicts = json.loads(conflict_check)
    
    if conflicts["has_conflicts"]:
        # Sort by recency, prefer newer information
        results = sorted(results, key=lambda x: x["metadata"]["date"], reverse=True)
        
        # Add conflict notice to prompt
        results[0]["conflict_notice"] = "Note: Some sources contain older, potentially outdated information."
    
    return results

Pitfall 4: Long Documents Lose Information

Problem: Important information in the middle of long documents gets “lost.”

Solution: Use hierarchical indexing with summaries:

python.py
def hierarchical_index(document: str) -> list[dict]:
    # Create document summary
    summary = llm.complete(f"Summarize this document in 3 paragraphs:\n\n{document[:10000]}")
    
    # Create section summaries
    sections = split_into_sections(document)
    section_summaries = [
        llm.complete(f"Summarize in 2 sentences:\n\n{section}")
        for section in sections
    ]
    
    # Create detailed chunks
    chunks = chunk_document(document)
    
    # Index all levels
    index_entries = [
        {"content": summary, "level": "document", "type": "summary"},
        *[{"content": s, "level": "section", "type": "summary"} for s in section_summaries],
        *[{"content": c, "level": "chunk", "type": "detail"} for c in chunks]
    ]
    
    return index_entries

Pitfall 5: The System Hallucinates Confidently

Problem: The LLM generates plausible-sounding but incorrect information not in the context.

Solution: Implement faithfulness checking and uncertainty quantification:

python.py
def generate_with_uncertainty(query: str, context: str) -> dict:
    answer = llm.complete(f"""
Answer based ONLY on the provided context. If the context doesn't contain 
enough information, say "I don't have enough information to answer this."

Context: {context}

Question: {query}

Answer:""")
    
    # Verify faithfulness
    verification = llm.complete(f"""
Verify each claim in this answer against the context.

Answer: {answer}
Context: {context}

For each claim, quote the supporting text from context or mark as UNSUPPORTED.
Return JSON: {{"claims": [{{"claim": "...", "support": "quote or UNSUPPORTED"}}]}}""")
    
    claims = json.loads(verification)
    unsupported = [c for c in claims["claims"] if c["support"] == "UNSUPPORTED"]
    
    confidence = 1 - (len(unsupported) / len(claims["claims"]))
    
    return {
        "answer": answer,
        "confidence": confidence,
        "unsupported_claims": unsupported
    }

Conclusion

Retrieval-Augmented Generation transforms large language models from isolated reasoning engines into knowledge-connected systems capable of grounding their responses in specific, verifiable information. The architecture presented in this guide (spanning document ingestion, intelligent chunking, vector storage, sophisticated retrieval, and faithful generation)provides a foundation for building production-grade RAG systems.

Success requires attention to every stage of both pipelines. Poor chunking produces poor embeddings. Weak retrieval surfaces irrelevant context. Insufficient prompt engineering leads to hallucinated responses. Yet when each component is carefully designed and continuously evaluated, RAG systems achieve remarkable capability: answering questions from vast document collections with accuracy and specificity that neither pure LLMs nor traditional search can match.

The field continues advancing rapidly. Techniques like contextual retrieval, graph RAG, and agentic orchestration push the boundaries of what these systems can accomplish. As embedding models improve, vector databases scale, and LLMs become more capable, RAG architectures will remain central to practical AI applications bridging the gap between massive knowledge repositories and intelligent, conversational interfaces.

Building effective RAG is an engineering discipline requiring systematic experimentation, rigorous evaluation, and continuous optimization. The techniques and patterns in this guide provide the tools; applying them to your specific domain and use case is where the real work begins.

Watch the Complete RAG Systems Training Video

Start a Free Session Without Registering, We Only Use Access Keys For Data Security

Start a secure session with a unique encrypted key and analyze sensitive and lengthy files without user registration

 

Subscribe to our newsletter

Get up to date with our product