Back to Blog

Building Production-Ready RAG Systems

Most RAG tutorials stop at 'chunk your docs and query.' Real production systems need hybrid retrieval, re-ranking, semantic caching, and graceful fallbacks. Here's the architecture I use to build RAG pipelines that actually work at scale.

Most RAG tutorials stop at “chunk your docs and query.” If only production were that simple. After building RAG systems that serve thousands of queries daily, I’ve learned that the gap between a working demo and a reliable production system is vast.

Here’s the architecture I use—and the failures that taught me what not to do.

The Naive Approach (And Why It Fails)

The typical RAG tutorial goes something like this:

from langchain import VectorStore, OpenAI

# Load and chunk documents
docs = load_documents("./data")
chunks = chunk_documents(docs, chunk_size=1000)

# Create embeddings and store
vectorstore = VectorStore.from_documents(chunks)

# Query
retriever = vectorstore.as_retriever()
qa_chain = RetrievalQA(llm=OpenAI(), retriever=retriever)
answer = qa_chain.run("What is the return policy?")

This works great in a Jupyter notebook. Then you deploy it, and users start asking:

  • Questions that span multiple documents
  • Questions with typos or informal language
  • Questions about information that doesn’t exist
  • The same question 1000 times per hour

Your nice demo falls apart. Let’s fix that.

The Production Architecture

1. Hybrid Retrieval

Dense embeddings are powerful but not sufficient. They miss exact keyword matches and struggle with technical terms, product codes, and proper nouns.

from qdrant_client import QdrantClient
from qdrant_client.models import SearchRequest, NamedVector

class HybridRetriever:
    def __init__(self, collection_name: str):
        self.client = QdrantClient(host="localhost", port=6333)
        self.collection = collection_name

    def search(self, query: str, limit: int = 10) -> list[Document]:
        # Dense search with semantic embeddings
        dense_results = self.client.search(
            collection_name=self.collection,
            query_vector=NamedVector(
                name="dense",
                vector=self.embed_dense(query)
            ),
            limit=limit
        )

        # Sparse search with BM25
        sparse_results = self.client.search(
            collection_name=self.collection,
            query_vector=NamedVector(
                name="sparse",
                vector=self.embed_sparse(query)
            ),
            limit=limit
        )

        # Reciprocal Rank Fusion
        return self.rrf_merge(dense_results, sparse_results)

The key insight: use BM25 for precision, dense vectors for recall, then merge with RRF. I’ve seen this boost relevance by 15-30% in domain-specific applications.

2. Semantic Double-Merging Chunking

Fixed-size chunking destroys context. A 1000-token chunk might split a paragraph mid-sentence, separating the premise from the conclusion.

Instead, I use semantic chunking with overlap:

from sentence_transformers import SentenceTransformer

class SemanticChunker:
    def __init__(self, similarity_threshold: float = 0.75):
        self.model = SentenceTransformer("all-MiniLM-L6-v2")
        self.threshold = similarity_threshold

    def chunk(self, text: str) -> list[Chunk]:
        sentences = self.split_sentences(text)
        embeddings = self.model.encode(sentences)

        chunks = []
        current_chunk = [sentences[0]]

        for i in range(1, len(sentences)):
            similarity = cosine_similarity(
                embeddings[i-1], embeddings[i]
            )

            if similarity < self.threshold:
                # Semantic boundary detected
                chunks.append(self.merge_with_context(current_chunk))
                current_chunk = []

            current_chunk.append(sentences[i])

        return chunks

The “double-merging” part: after initial chunking, I merge adjacent chunks that have high semantic similarity. This prevents over-fragmentation while respecting natural topic boundaries.

3. Re-ranking

The initial retrieval returns candidates. Re-ranking sorts them by actual relevance to the query.

from sentence_transformers import CrossEncoder

class Reranker:
    def __init__(self):
        self.model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

    def rerank(
        self,
        query: str,
        documents: list[Document],
        top_k: int = 5
    ) -> list[Document]:
        pairs = [(query, doc.content) for doc in documents]
        scores = self.model.predict(pairs)

        ranked = sorted(
            zip(documents, scores),
            key=lambda x: x[1],
            reverse=True
        )

        return [doc for doc, score in ranked[:top_k]]

Cross-encoders are slower than bi-encoders but significantly more accurate. Use them on the top 20-50 candidates from initial retrieval.

Semantic Caching

Here’s a production secret: most queries are variations of the same questions.

from redis import Redis
import hashlib

class SemanticCache:
    def __init__(self, similarity_threshold: float = 0.95):
        self.redis = Redis(host="localhost", port=6379)
        self.threshold = similarity_threshold
        self.embedder = SentenceTransformer("all-MiniLM-L6-v2")

    def get(self, query: str) -> str | None:
        query_embedding = self.embedder.encode(query)

        # Check for semantically similar cached queries
        for key in self.redis.scan_iter("cache:*"):
            cached = json.loads(self.redis.get(key))
            similarity = cosine_similarity(
                query_embedding,
                cached["embedding"]
            )

            if similarity > self.threshold:
                return cached["response"]

        return None

    def set(self, query: str, response: str, ttl: int = 3600):
        embedding = self.embedder.encode(query).tolist()
        cache_key = f"cache:{hashlib.md5(query.encode()).hexdigest()}"

        self.redis.setex(
            cache_key,
            ttl,
            json.dumps({"embedding": embedding, "response": response})
        )

This reduces LLM calls by 40-60% in most applications. The ROI is immediate.

Graceful Fallbacks

When retrieval fails—and it will—don’t hallucinate. Be honest.

class RAGPipeline:
    def query(self, question: str) -> Response:
        # Check cache first
        cached = self.cache.get(question)
        if cached:
            return Response(answer=cached, source="cache")

        # Retrieve and rerank
        documents = self.retriever.search(question)
        ranked = self.reranker.rerank(question, documents)

        # Check relevance threshold
        if not ranked or ranked[0].score < self.relevance_threshold:
            return Response(
                answer="I don't have enough information to answer this question accurately.",
                source="fallback",
                confidence=0.0
            )

        # Generate answer with citations
        answer = self.generate_with_citations(question, ranked)

        # Cache for future queries
        self.cache.set(question, answer)

        return Response(
            answer=answer,
            source="rag",
            documents=ranked,
            confidence=ranked[0].score
        )

The best RAG systems know their limits. A confident “I don’t know” is infinitely better than a plausible-sounding hallucination.

Monitoring & Observability

You can’t improve what you don’t measure. Every production RAG system needs:

  1. Retrieval metrics: Hit rate, MRR, NDCG
  2. Latency percentiles: p50, p95, p99
  3. Cache hit rates: Measure that ROI
  4. User feedback loops: Thumbs up/down on answers
import logfire

class InstrumentedRAG:
    @logfire.instrument("rag_query")
    def query(self, question: str) -> Response:
        with logfire.span("retrieval"):
            documents = self.retriever.search(question)
            logfire.metric("retrieval.count", len(documents))

        with logfire.span("reranking"):
            ranked = self.reranker.rerank(question, documents)
            logfire.metric("rerank.top_score", ranked[0].score)

        with logfire.span("generation"):
            response = self.generate(question, ranked)

        return response

Lessons Learned

After shipping RAG to production multiple times, here’s what I’ve learned:

  1. Chunking matters more than you think. Bad chunks = bad retrieval = bad answers. Invest time here.

  2. Hybrid retrieval is almost always worth it. The complexity cost is low; the relevance gain is high.

  3. Re-ranking is cheap insurance. A few hundred milliseconds for significantly better results.

  4. Cache aggressively. Your users ask similar questions. Your wallet will thank you.

  5. Fail gracefully. Hallucinations erode trust faster than anything else.


Building production RAG isn’t about finding the perfect embedding model or the optimal chunk size. It’s about building robust systems that handle the messy reality of user queries while being honest about their limitations.

The architecture I’ve outlined here isn’t novel—it’s battle-tested. Adapt it to your domain, measure everything, and iterate.

Have questions about RAG architecture? Reach out—I’m always happy to discuss production AI systems.