Building Production-Ready RAG Systems
Most RAG tutorials stop at 'chunk your docs and query.' Real production systems need hybrid retrieval, re-ranking, semantic caching, and graceful fallbacks. Here's the architecture I use to build RAG pipelines that actually work at scale.
Most RAG tutorials stop at “chunk your docs and query.” If only production were that simple. After building RAG systems that serve thousands of queries daily, I’ve learned that the gap between a working demo and a reliable production system is vast.
Here’s the architecture I use—and the failures that taught me what not to do.
The Naive Approach (And Why It Fails)
The typical RAG tutorial goes something like this:
from langchain import VectorStore, OpenAI
# Load and chunk documents
docs = load_documents("./data")
chunks = chunk_documents(docs, chunk_size=1000)
# Create embeddings and store
vectorstore = VectorStore.from_documents(chunks)
# Query
retriever = vectorstore.as_retriever()
qa_chain = RetrievalQA(llm=OpenAI(), retriever=retriever)
answer = qa_chain.run("What is the return policy?")
This works great in a Jupyter notebook. Then you deploy it, and users start asking:
- Questions that span multiple documents
- Questions with typos or informal language
- Questions about information that doesn’t exist
- The same question 1000 times per hour
Your nice demo falls apart. Let’s fix that.
The Production Architecture
1. Hybrid Retrieval
Dense embeddings are powerful but not sufficient. They miss exact keyword matches and struggle with technical terms, product codes, and proper nouns.
from qdrant_client import QdrantClient
from qdrant_client.models import SearchRequest, NamedVector
class HybridRetriever:
def __init__(self, collection_name: str):
self.client = QdrantClient(host="localhost", port=6333)
self.collection = collection_name
def search(self, query: str, limit: int = 10) -> list[Document]:
# Dense search with semantic embeddings
dense_results = self.client.search(
collection_name=self.collection,
query_vector=NamedVector(
name="dense",
vector=self.embed_dense(query)
),
limit=limit
)
# Sparse search with BM25
sparse_results = self.client.search(
collection_name=self.collection,
query_vector=NamedVector(
name="sparse",
vector=self.embed_sparse(query)
),
limit=limit
)
# Reciprocal Rank Fusion
return self.rrf_merge(dense_results, sparse_results)
The key insight: use BM25 for precision, dense vectors for recall, then merge with RRF. I’ve seen this boost relevance by 15-30% in domain-specific applications.
2. Semantic Double-Merging Chunking
Fixed-size chunking destroys context. A 1000-token chunk might split a paragraph mid-sentence, separating the premise from the conclusion.
Instead, I use semantic chunking with overlap:
from sentence_transformers import SentenceTransformer
class SemanticChunker:
def __init__(self, similarity_threshold: float = 0.75):
self.model = SentenceTransformer("all-MiniLM-L6-v2")
self.threshold = similarity_threshold
def chunk(self, text: str) -> list[Chunk]:
sentences = self.split_sentences(text)
embeddings = self.model.encode(sentences)
chunks = []
current_chunk = [sentences[0]]
for i in range(1, len(sentences)):
similarity = cosine_similarity(
embeddings[i-1], embeddings[i]
)
if similarity < self.threshold:
# Semantic boundary detected
chunks.append(self.merge_with_context(current_chunk))
current_chunk = []
current_chunk.append(sentences[i])
return chunks
The “double-merging” part: after initial chunking, I merge adjacent chunks that have high semantic similarity. This prevents over-fragmentation while respecting natural topic boundaries.
3. Re-ranking
The initial retrieval returns candidates. Re-ranking sorts them by actual relevance to the query.
from sentence_transformers import CrossEncoder
class Reranker:
def __init__(self):
self.model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
def rerank(
self,
query: str,
documents: list[Document],
top_k: int = 5
) -> list[Document]:
pairs = [(query, doc.content) for doc in documents]
scores = self.model.predict(pairs)
ranked = sorted(
zip(documents, scores),
key=lambda x: x[1],
reverse=True
)
return [doc for doc, score in ranked[:top_k]]
Cross-encoders are slower than bi-encoders but significantly more accurate. Use them on the top 20-50 candidates from initial retrieval.
Semantic Caching
Here’s a production secret: most queries are variations of the same questions.
from redis import Redis
import hashlib
class SemanticCache:
def __init__(self, similarity_threshold: float = 0.95):
self.redis = Redis(host="localhost", port=6379)
self.threshold = similarity_threshold
self.embedder = SentenceTransformer("all-MiniLM-L6-v2")
def get(self, query: str) -> str | None:
query_embedding = self.embedder.encode(query)
# Check for semantically similar cached queries
for key in self.redis.scan_iter("cache:*"):
cached = json.loads(self.redis.get(key))
similarity = cosine_similarity(
query_embedding,
cached["embedding"]
)
if similarity > self.threshold:
return cached["response"]
return None
def set(self, query: str, response: str, ttl: int = 3600):
embedding = self.embedder.encode(query).tolist()
cache_key = f"cache:{hashlib.md5(query.encode()).hexdigest()}"
self.redis.setex(
cache_key,
ttl,
json.dumps({"embedding": embedding, "response": response})
)
This reduces LLM calls by 40-60% in most applications. The ROI is immediate.
Graceful Fallbacks
When retrieval fails—and it will—don’t hallucinate. Be honest.
class RAGPipeline:
def query(self, question: str) -> Response:
# Check cache first
cached = self.cache.get(question)
if cached:
return Response(answer=cached, source="cache")
# Retrieve and rerank
documents = self.retriever.search(question)
ranked = self.reranker.rerank(question, documents)
# Check relevance threshold
if not ranked or ranked[0].score < self.relevance_threshold:
return Response(
answer="I don't have enough information to answer this question accurately.",
source="fallback",
confidence=0.0
)
# Generate answer with citations
answer = self.generate_with_citations(question, ranked)
# Cache for future queries
self.cache.set(question, answer)
return Response(
answer=answer,
source="rag",
documents=ranked,
confidence=ranked[0].score
)
The best RAG systems know their limits. A confident “I don’t know” is infinitely better than a plausible-sounding hallucination.
Monitoring & Observability
You can’t improve what you don’t measure. Every production RAG system needs:
- Retrieval metrics: Hit rate, MRR, NDCG
- Latency percentiles: p50, p95, p99
- Cache hit rates: Measure that ROI
- User feedback loops: Thumbs up/down on answers
import logfire
class InstrumentedRAG:
@logfire.instrument("rag_query")
def query(self, question: str) -> Response:
with logfire.span("retrieval"):
documents = self.retriever.search(question)
logfire.metric("retrieval.count", len(documents))
with logfire.span("reranking"):
ranked = self.reranker.rerank(question, documents)
logfire.metric("rerank.top_score", ranked[0].score)
with logfire.span("generation"):
response = self.generate(question, ranked)
return response
Lessons Learned
After shipping RAG to production multiple times, here’s what I’ve learned:
-
Chunking matters more than you think. Bad chunks = bad retrieval = bad answers. Invest time here.
-
Hybrid retrieval is almost always worth it. The complexity cost is low; the relevance gain is high.
-
Re-ranking is cheap insurance. A few hundred milliseconds for significantly better results.
-
Cache aggressively. Your users ask similar questions. Your wallet will thank you.
-
Fail gracefully. Hallucinations erode trust faster than anything else.
Building production RAG isn’t about finding the perfect embedding model or the optimal chunk size. It’s about building robust systems that handle the messy reality of user queries while being honest about their limitations.
The architecture I’ve outlined here isn’t novel—it’s battle-tested. Adapt it to your domain, measure everything, and iterate.
Have questions about RAG architecture? Reach out—I’m always happy to discuss production AI systems.