Skip to main content

RAG Integration Guide

Learn how to integrate Cortex with Retrieval-Augmented Generation (RAG) systems to provide your AI applications with real-time, verified knowledge.

🎯 What is RAG + Cortex?

RAG systems traditionally rely on static knowledge bases that become outdated quickly. Cortex enhances RAG by providing:

  • Real-time knowledge - Fresh information from the live web
  • Source verification - Trusted, cited information
  • Content validation - Cross-verified facts across multiple sources
  • Structured extraction - Clean, AI-ready content format

🏗️ Architecture Overview

User Query

[ Query Router ]

[ Cortex Search ] ← Real-time web search

[ Content Extraction ] ← Clean, structured content

[ Vector Embedding ] ← Embed fresh content

[ Vector Database ] ← Store temporarily or permanently

[ Retrieval ] ← Find relevant chunks

[ LLM Generation ] ← Generate response with fresh context

Response with Citations

LangChain Integration

from langchain.retrievers import CortexRetriever
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
from cortex import CortexClient

# Initialize Cortex client
cortex_client = CortexClient(api_key="your_cortex_key")

# Create custom Cortex retriever
class CortexRetriever:
def __init__(self, cortex_client, top_k=5):
self.cortex = cortex_client
self.top_k = top_k

def get_relevant_documents(self, query):
# Search with Cortex
result = self.cortex.search(
query=query,
max_results=self.top_k,
recency="week"
)

# Convert to LangChain Document format
documents = []
for source in result.sources:
# Extract full content if URL is available
content = self.cortex.extract(source.url)

doc = Document(
page_content=content.text if content.success else source.snippet,
metadata={
"source": source.url,
"title": source.title,
"confidence": source.confidence,
"published_date": source.published_date,
"cortex_verified": True
}
)
documents.append(doc)

return documents

# Set up RAG chain
retriever = CortexRetriever(cortex_client, top_k=8)
llm = OpenAI(temperature=0)

qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True
)

# Query with real-time knowledge
result = qa_chain({
"query": "What are the latest developments in quantum computing?"
})

print(result["result"])
print(f"Sources: {len(result['source_documents'])}")

LlamaIndex Integration

from llama_index import Document, GPTVectorStoreIndex, ServiceContext
from llama_index.retrievers import BaseRetriever
from cortex import CortexClient
import openai

class CortexRetriever(BaseRetriever):
def __init__(self, cortex_client, top_k=5):
self.cortex = cortex_client
self.top_k = top_k
super().__init__()

def _retrieve(self, query_bundle):
# Search with Cortex
result = self.cortex.search(
query=query_bundle.query_str,
max_results=self.top_k,
recency="month"
)

# Create LlamaIndex nodes
nodes = []
for i, source in enumerate(result.sources):
# Extract full content
content = self.cortex.extract(source.url)

if content.success:
node = NodeWithScore(
node=TextNode(
text=content.text,
metadata={
"source": source.url,
"title": source.title,
"confidence": source.confidence,
"cortex_verified": True
}
),
score=source.confidence
)
nodes.append(node)

return nodes

# Initialize
cortex_client = CortexClient(api_key="your_cortex_key")
retriever = CortexRetriever(cortex_client, top_k=10)

# Create index with real-time retriever
service_context = ServiceContext.from_defaults()
index = GPTVectorStoreIndex([], service_context=service_context)

# Query with real-time knowledge
query_engine = index.as_query_engine(retriever=retriever)
response = query_engine.query("Latest AI safety research findings")

print(response.response)
print(f"Sources: {len(response.source_nodes)}")

Haystack Integration

from haystack import Pipeline
from haystack.nodes import PromptNode, AnswerParser
from haystack.schema import Document
from cortex import CortexClient

class CortexRetriever:
def __init__(self, cortex_client):
self.cortex = cortex_client

def retrieve(self, query: str, top_k: int = 5) -> List[Document]:
# Real-time search
result = self.cortex.search(
query=query,
max_results=top_k,
recency="week"
)

documents = []
for source in result.sources:
# Extract full content
content = self.cortex.extract(source.url)

doc = Document(
content=content.text if content.success else source.snippet,
meta={
"name": source.title,
"url": source.url,
"confidence": source.confidence,
"cortex_verified": True
}
)
documents.append(doc)

return documents

# Set up pipeline
cortex_client = CortexClient(api_key="your_cortex_key")
retriever = CortexRetriever(cortex_client)

prompt_node = PromptNode(
model_name_or_path="gpt-3.5-turbo",
api_key="your_openai_key",
max_length=200
)

# Create RAG pipeline
pipeline = Pipeline()
pipeline.add_node(component=retriever, name="CortexRetriever", inputs=["Query"])
pipeline.add_node(component=prompt_node, name="PromptNode", inputs=["CortexRetriever"])

# Query pipeline
result = pipeline.run(
query="What are the latest trends in renewable energy?",
params={
"CortexRetriever": {"top_k": 8},
"PromptNode": {
"prompt_template": """
Based on the following real-time information, answer the question:

Context: {join(documents)}

Question: {query}

Answer:
"""
}
}
)

print(result["answers"][0].answer)

🔄 Real-time Knowledge Updates

Automatic Knowledge Refresh

import schedule
import time
from datetime import datetime, timedelta
from cortex import CortexClient
import chromadb

class RealTimeRAG:
def __init__(self, cortex_api_key, openai_api_key):
self.cortex = CortexClient(api_key=cortex_api_key)
self.chroma_client = chromadb.Client()
self.collection = self.chroma_client.create_collection("real_time_knowledge")
self.topics = []

def add_topic(self, topic, refresh_interval_hours=6):
"""Add a topic to monitor"""
self.topics.append({
"topic": topic,
"refresh_interval": refresh_interval_hours,
"last_updated": None
})

def update_knowledge(self, topic):
"""Update knowledge for a specific topic"""
print(f"Updating knowledge for: {topic}")

# Search for latest information
result = self.cortex.search(
query=f"latest {topic} developments",
max_results=10,
recency="day"
)

if result.success:
# Extract content from sources
documents = []
for source in result.sources:
content = self.cortex.extract(source.url)
if content.success:
documents.append({
"text": content.text,
"metadata": {
"topic": topic,
"url": source.url,
"title": source.title,
"confidence": source.confidence,
"updated_at": datetime.now().isoformat(),
"cortex_verified": True
}
})

# Add to vector database
if documents:
self.collection.add(
documents=[doc["text"] for doc in documents],
metadatas=[doc["metadata"] for doc in documents],
ids=[f"{topic}_{i}_{int(time.time())}" for i in range(len(documents))]
)

print(f"Added {len(documents)} documents for {topic}")

return result.success

def query_knowledge(self, question, top_k=5):
"""Query the knowledge base"""
# Search vector database
results = self.collection.query(
query_texts=[question],
n_results=top_k
)

if results["documents"]:
# Format context for LLM
context = "\n\n".join([
f"Source: {meta['title']} ({meta['url']})\n{doc}"
for doc, meta in zip(results["documents"][0], results["metadatas"][0])
])

return {
"context": context,
"sources": results["metadatas"][0],
"question": question
}

return None

def schedule_updates(self):
"""Schedule regular knowledge updates"""
for topic_info in self.topics:
topic = topic_info["topic"]
interval = topic_info["refresh_interval"]

# Schedule updates
schedule.every(interval).hours.do(self.update_knowledge, topic)

# Run scheduler
while True:
schedule.run_pending()
time.sleep(60)

# Usage
rag_system = RealTimeRAG(
cortex_api_key="your_cortex_key",
openai_api_key="your_openai_key"
)

# Add topics to monitor
rag_system.add_topic("artificial intelligence", refresh_interval_hours=4)
rag_system.add_topic("quantum computing", refresh_interval_hours=8)
rag_system.add_topic("renewable energy", refresh_interval_hours=12)

# Start scheduled updates (run in background)
import threading
scheduler_thread = threading.Thread(target=rag_system.schedule_updates)
scheduler_thread.daemon = True
scheduler_thread.start()

# Query the system
result = rag_system.query_knowledge("What are the latest AI safety developments?")
if result:
print("Context:", result["context"])
print("Sources:", len(result["sources"]))

Incremental Updates

class IncrementalRAG:
def __init__(self, cortex_client, vector_db):
self.cortex = cortex_client
self.vector_db = vector_db
self.change_tracker = {}

def track_sources(self, query, source_urls):
"""Track sources for change detection"""
self.change_tracker[query] = {
"urls": source_urls,
"last_check": datetime.now(),
"checksums": {}
}

# Store content checksums
for url in source_urls:
content = self.cortex.extract(url)
if content.success:
checksum = hashlib.md5(content.text.encode()).hexdigest()
self.change_tracker[query]["checksums"][url] = checksum

def check_for_updates(self, query):
"""Check if tracked sources have changed"""
if query not in self.change_tracker:
return False

tracker = self.change_tracker[query]
changes_detected = False

for url in tracker["urls"]:
content = self.cortex.extract(url)
if content.success:
new_checksum = hashlib.md5(content.text.encode()).hexdigest()
old_checksum = tracker["checksums"].get(url)

if new_checksum != old_checksum:
print(f"Change detected in {url}")
changes_detected = True

# Update vector database
self.update_document(url, content.text)
tracker["checksums"][url] = new_checksum

tracker["last_check"] = datetime.now()
return changes_detected

def update_document(self, url, new_content):
"""Update document in vector database"""
# Remove old version
self.vector_db.delete(where={"url": url})

# Add new version
self.vector_db.add(
documents=[new_content],
metadatas=[{"url": url, "updated_at": datetime.now().isoformat()}],
ids=[f"doc_{url}_{int(time.time())}"]
)

# Usage
incremental_rag = IncrementalRAG(cortex_client, vector_db)

# Initial search and tracking
result = cortex_client.search("AI safety research")
source_urls = [source.url for source in result.sources]
incremental_rag.track_sources("AI safety research", source_urls)

# Periodically check for updates
schedule.every(30).minutes.do(
incremental_rag.check_for_updates,
"AI safety research"
)

🎯 Domain-Specific RAG

Scientific Research RAG

class ScientificRAG:
def __init__(self, cortex_client):
self.cortex = cortex_client

def search_papers(self, query, limit=10):
"""Search for scientific papers"""
result = self.cortex.search(
query=f"{query} research papers",
max_results=limit,
domain_filter={
"include": [
"arxiv.org", "nature.com", "science.org",
"cell.com", "nejm.org", "pubmed.ncbi.nlm.nih.gov"
]
},
recency="year"
)

# Extract and validate papers
papers = []
for source in result.sources:
content = self.cortex.extract(source.url)
if content.success:
# Validate that it's actually a research paper
validation = self.cortex.validate(
claim=f"This is a peer-reviewed research paper about {query}",
sources=[source.url],
context={"domain": "science", "claim_type": "factual"}
)

if validation.confidence_score > 0.7:
papers.append({
"title": source.title,
"url": source.url,
"content": content.text,
"confidence": source.confidence,
"validation_score": validation.confidence_score,
"published_date": source.published_date
})

return papers

def generate_literature_review(self, topic):
"""Generate a literature review section"""
papers = self.search_papers(f"{topic} recent advances", limit=15)

# Group papers by subtopics
subtopics = {}
for paper in papers:
# Use first sentence as subtopic indicator
first_sentence = paper["content"].split('.')[0]
key_terms = self.extract_key_terms(first_sentence)

for term in key_terms:
if term not in subtopics:
subtopics[term] = []
subtopics[term].append(paper)

return {
"topic": topic,
"papers_found": len(papers),
"subtopics": subtopics,
"generated_at": datetime.now().isoformat()
}

# Usage
sci_rag = ScientificRAG(cortex_client)
review = sci_rag.generate_literature_review("machine learning interpretability")

News and Current Events RAG

class NewsRAG:
def __init__(self, cortex_client):
self.cortex = cortex_client
self.news_sources = [
"reuters.com", "bloomberg.com", "bbc.com",
"cnn.com", "npr.org", "wsj.com"
]

def get_breaking_news(self, topic):
"""Get breaking news about a topic"""
result = self.cortex.search(
query=f"breaking news {topic}",
max_results=8,
recency="day",
domain_filter={"include": self.news_sources}
)

# Validate news claims
validated_news = []
for source in result.sources:
content = self.cortex.extract(source.url)
if content.success:
# Extract key claims from the article
sentences = content.text.split('.')[:5] # First 5 sentences

validated_claims = []
for sentence in sentences:
if len(sentence.strip()) > 20:
validation = self.cortex.validate(
claim=sentence.strip(),
context={"domain": "news", "claim_type": "event"}
)

validated_claims.append({
"claim": sentence.strip(),
"validation_result": validation.validation_result,
"confidence": validation.confidence_score
})

validated_news.append({
"title": source.title,
"url": source.url,
"published_date": source.published_date,
"validated_claims": validated_claims,
"overall_confidence": source.confidence
})

return validated_news

def create_news_summary(self, topics, hours=24):
"""Create a comprehensive news summary"""
all_news = {}

for topic in topics:
news_items = self.get_breaking_news(f"{topic} past {hours} hours")
all_news[topic] = news_items

return {
"summary_period": f"Last {hours} hours",
"topics": all_news,
"generated_at": datetime.now().isoformat(),
"total_articles": sum(len(items) for items in all_news.values())
}

# Usage
news_rag = NewsRAG(cortex_client)
summary = news_rag.create_news_summary([
"artificial intelligence",
"climate change",
"space exploration"
], hours=24)

🚀 Production Deployment

Caching Strategy

class ProductionRAG:
def __init__(self, cortex_client, cache_client):
self.cortex = cortex_client
self.cache = cache_client
self.cache_ttl = {
"search": 3600, # 1 hour
"extract": 7200, # 2 hours
"validate": 1800 # 30 minutes
}

def cached_search(self, query, **kwargs):
"""Search with intelligent caching"""
cache_key = f"search:{hashlib.md5(query.encode()).hexdigest()}"

# Check cache first
cached_result = self.cache.get(cache_key)
if cached_result:
return json.loads(cached_result)

# Perform search
result = self.cortex.search(query, **kwargs)

# Cache result
self.cache.setex(
cache_key,
self.cache_ttl["search"],
json.dumps(result.__dict__)
)

return result

def batch_process(self, queries, max_workers=5):
"""Process multiple queries efficiently"""
import concurrent.futures

results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all queries
future_to_query = {
executor.submit(self.cached_search, query): query
for query in queries
}

# Collect results
for future in concurrent.futures.as_completed(future_to_query):
query = future_to_query[future]
try:
result = future.result()
results[query] = result
except Exception as exc:
print(f"Query {query} generated an exception: {exc}")
results[query] = None

return results

# Usage with Redis cache
import redis
cache_client = redis.Redis(host='localhost', port=6379, db=0)
prod_rag = ProductionRAG(cortex_client, cache_client)

Error Handling and Fallbacks

class RobustRAG:
def __init__(self, cortex_client, fallback_retriever=None):
self.cortex = cortex_client
self.fallback_retriever = fallback_retriever
self.max_retries = 3

def robust_search(self, query, **kwargs):
"""Search with error handling and fallbacks"""
for attempt in range(self.max_retries):
try:
result = self.cortex.search(query, **kwargs)
if result.success:
return result
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")

if attempt < self.max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue

# Use fallback if available
if self.fallback_retriever:
print("Using fallback retriever")
return self.fallback_retriever.search(query)

# Return empty result if all fails
return self.create_empty_result(query)

def create_empty_result(self, query):
"""Create empty result structure"""
return {
"query": query,
"summary": f"Unable to find current information about '{query}'. Please try again later.",
"sources": [],
"success": False,
"error": "All retrieval methods failed"
}

# Usage with static fallback
from your_existing_rag import ExistingRetriever
fallback = ExistingRetriever()
robust_rag = RobustRAG(cortex_client, fallback)

📊 Performance Monitoring

RAG Performance Metrics

class RAGMonitor:
def __init__(self):
self.metrics = {
"queries": 0,
"successful_queries": 0,
"avg_response_time": 0,
"cache_hits": 0,
"source_quality": []
}

def track_query(self, query, result, response_time, cache_hit=False):
"""Track query performance"""
self.metrics["queries"] += 1

if result.success:
self.metrics["successful_queries"] += 1

# Track source quality
if hasattr(result, 'sources'):
for source in result.sources:
self.metrics["source_quality"].append(source.confidence)

# Update average response time
self.metrics["avg_response_time"] = (
(self.metrics["avg_response_time"] * (self.metrics["queries"] - 1) + response_time)
/ self.metrics["queries"]
)

if cache_hit:
self.metrics["cache_hits"] += 1

def get_performance_report(self):
"""Generate performance report"""
success_rate = (
self.metrics["successful_queries"] / self.metrics["queries"]
if self.metrics["queries"] > 0 else 0
)

cache_hit_rate = (
self.metrics["cache_hits"] / self.metrics["queries"]
if self.metrics["queries"] > 0 else 0
)

avg_source_quality = (
sum(self.metrics["source_quality"]) / len(self.metrics["source_quality"])
if self.metrics["source_quality"] else 0
)

return {
"total_queries": self.metrics["queries"],
"success_rate": success_rate,
"avg_response_time": self.metrics["avg_response_time"],
"cache_hit_rate": cache_hit_rate,
"avg_source_quality": avg_source_quality
}

# Usage
monitor = RAGMonitor()

# In your RAG query function
start_time = time.time()
result = cortex_client.search(query)
response_time = time.time() - start_time

monitor.track_query(query, result, response_time)

# Generate report
report = monitor.get_performance_report()
print(f"Success rate: {report['success_rate']:.2%}")
print(f"Avg response time: {report['avg_response_time']:.2f}s")

💡 Best Practices

1. Query Optimization

def optimize_query_for_rag(user_query, context=None):
"""Optimize user query for better RAG results"""

# Add temporal context for current information
if any(word in user_query.lower() for word in ["latest", "recent", "current", "new"]):
optimized_query = f"{user_query} 2024 2025"
else:
optimized_query = user_query

# Add domain context if available
if context and context.get("domain"):
optimized_query = f"{context['domain']} {optimized_query}"

# Add source type hints
if "research" in user_query.lower():
optimized_query = f"{optimized_query} study paper research"
elif "news" in user_query.lower():
optimized_query = f"{optimized_query} breaking news"

return optimized_query

2. Source Quality Filtering

def filter_high_quality_sources(sources, min_confidence=0.7):
"""Filter sources based on quality metrics"""

high_quality_sources = []
for source in sources:
# Check confidence score
if source.confidence < min_confidence:
continue

# Check domain authority
trusted_domains = [
"nature.com", "science.org", "arxiv.org",
"reuters.com", "bloomberg.com", "bbc.com"
]

if any(domain in source.url for domain in trusted_domains):
source.confidence += 0.1 # Boost trusted domains

# Check publication date (prefer recent)
if source.published_date:
days_old = (datetime.now() - source.published_date).days
if days_old > 365: # Older than 1 year
source.confidence -= 0.1

high_quality_sources.append(source)

return sorted(high_quality_sources, key=lambda x: x.confidence, reverse=True)

3. Citation Management

def format_citations(sources, style="academic"):
"""Format citations in different styles"""

citations = []
for i, source in enumerate(sources, 1):
if style == "academic":
citation = f"[{i}] {source.title}. {source.url} (Accessed: {datetime.now().strftime('%Y-%m-%d')})"
elif style == "web":
citation = f"[{i}] {source.title} - {source.url}"
elif style == "footnote":
citation = f"[^{i}]: {source.title} ({source.url})"

citations.append(citation)

return citations

def add_citations_to_response(response_text, sources):
"""Add citations to generated response"""

# Simple citation injection (you can make this more sophisticated)
sentences = response_text.split('.')
cited_response = []

for i, sentence in enumerate(sentences):
if sentence.strip():
# Add citation to every other sentence
if i % 2 == 0 and i // 2 < len(sources):
citation_num = i // 2 + 1
sentence = f"{sentence.strip()} [{citation_num}]"

cited_response.append(sentence)

return '. '.join(cited_response)

Next: LangChain Setup → - Complete LangChain integration guide