RAG Integration Guide
Learn how to integrate Cortex with Retrieval-Augmented Generation (RAG) systems to provide your AI applications with real-time, verified knowledge.
🎯 What is RAG + Cortex?
RAG systems traditionally rely on static knowledge bases that become outdated quickly. Cortex enhances RAG by providing:
- Real-time knowledge - Fresh information from the live web
- Source verification - Trusted, cited information
- Content validation - Cross-verified facts across multiple sources
- Structured extraction - Clean, AI-ready content format
🏗️ Architecture Overview
User Query
↓
[ Query Router ]
↓
[ Cortex Search ] ← Real-time web search
↓
[ Content Extraction ] ← Clean, structured content
↓
[ Vector Embedding ] ← Embed fresh content
↓
[ Vector Database ] ← Store temporarily or permanently
↓
[ Retrieval ] ← Find relevant chunks
↓
[ LLM Generation ] ← Generate response with fresh context
↓
Response with Citations
📚 Popular RAG Frameworks
LangChain Integration
from langchain.retrievers import CortexRetriever
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
from cortex import CortexClient
# Initialize Cortex client
cortex_client = CortexClient(api_key="your_cortex_key")
# Create custom Cortex retriever
class CortexRetriever:
def __init__(self, cortex_client, top_k=5):
self.cortex = cortex_client
self.top_k = top_k
def get_relevant_documents(self, query):
# Search with Cortex
result = self.cortex.search(
query=query,
max_results=self.top_k,
recency="week"
)
# Convert to LangChain Document format
documents = []
for source in result.sources:
# Extract full content if URL is available
content = self.cortex.extract(source.url)
doc = Document(
page_content=content.text if content.success else source.snippet,
metadata={
"source": source.url,
"title": source.title,
"confidence": source.confidence,
"published_date": source.published_date,
"cortex_verified": True
}
)
documents.append(doc)
return documents
# Set up RAG chain
retriever = CortexRetriever(cortex_client, top_k=8)
llm = OpenAI(temperature=0)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True
)
# Query with real-time knowledge
result = qa_chain({
"query": "What are the latest developments in quantum computing?"
})
print(result["result"])
print(f"Sources: {len(result['source_documents'])}")
LlamaIndex Integration
from llama_index import Document, GPTVectorStoreIndex, ServiceContext
from llama_index.retrievers import BaseRetriever
from cortex import CortexClient
import openai
class CortexRetriever(BaseRetriever):
def __init__(self, cortex_client, top_k=5):
self.cortex = cortex_client
self.top_k = top_k
super().__init__()
def _retrieve(self, query_bundle):
# Search with Cortex
result = self.cortex.search(
query=query_bundle.query_str,
max_results=self.top_k,
recency="month"
)
# Create LlamaIndex nodes
nodes = []
for i, source in enumerate(result.sources):
# Extract full content
content = self.cortex.extract(source.url)
if content.success:
node = NodeWithScore(
node=TextNode(
text=content.text,
metadata={
"source": source.url,
"title": source.title,
"confidence": source.confidence,
"cortex_verified": True
}
),
score=source.confidence
)
nodes.append(node)
return nodes
# Initialize
cortex_client = CortexClient(api_key="your_cortex_key")
retriever = CortexRetriever(cortex_client, top_k=10)
# Create index with real-time retriever
service_context = ServiceContext.from_defaults()
index = GPTVectorStoreIndex([], service_context=service_context)
# Query with real-time knowledge
query_engine = index.as_query_engine(retriever=retriever)
response = query_engine.query("Latest AI safety research findings")
print(response.response)
print(f"Sources: {len(response.source_nodes)}")
Haystack Integration
from haystack import Pipeline
from haystack.nodes import PromptNode, AnswerParser
from haystack.schema import Document
from cortex import CortexClient
class CortexRetriever:
def __init__(self, cortex_client):
self.cortex = cortex_client
def retrieve(self, query: str, top_k: int = 5) -> List[Document]:
# Real-time search
result = self.cortex.search(
query=query,
max_results=top_k,
recency="week"
)
documents = []
for source in result.sources:
# Extract full content
content = self.cortex.extract(source.url)
doc = Document(
content=content.text if content.success else source.snippet,
meta={
"name": source.title,
"url": source.url,
"confidence": source.confidence,
"cortex_verified": True
}
)
documents.append(doc)
return documents
# Set up pipeline
cortex_client = CortexClient(api_key="your_cortex_key")
retriever = CortexRetriever(cortex_client)
prompt_node = PromptNode(
model_name_or_path="gpt-3.5-turbo",
api_key="your_openai_key",
max_length=200
)
# Create RAG pipeline
pipeline = Pipeline()
pipeline.add_node(component=retriever, name="CortexRetriever", inputs=["Query"])
pipeline.add_node(component=prompt_node, name="PromptNode", inputs=["CortexRetriever"])
# Query pipeline
result = pipeline.run(
query="What are the latest trends in renewable energy?",
params={
"CortexRetriever": {"top_k": 8},
"PromptNode": {
"prompt_template": """
Based on the following real-time information, answer the question:
Context: {join(documents)}
Question: {query}
Answer:
"""
}
}
)
print(result["answers"][0].answer)
🔄 Real-time Knowledge Updates
Automatic Knowledge Refresh
import schedule
import time
from datetime import datetime, timedelta
from cortex import CortexClient
import chromadb
class RealTimeRAG:
def __init__(self, cortex_api_key, openai_api_key):
self.cortex = CortexClient(api_key=cortex_api_key)
self.chroma_client = chromadb.Client()
self.collection = self.chroma_client.create_collection("real_time_knowledge")
self.topics = []
def add_topic(self, topic, refresh_interval_hours=6):
"""Add a topic to monitor"""
self.topics.append({
"topic": topic,
"refresh_interval": refresh_interval_hours,
"last_updated": None
})
def update_knowledge(self, topic):
"""Update knowledge for a specific topic"""
print(f"Updating knowledge for: {topic}")
# Search for latest information
result = self.cortex.search(
query=f"latest {topic} developments",
max_results=10,
recency="day"
)
if result.success:
# Extract content from sources
documents = []
for source in result.sources:
content = self.cortex.extract(source.url)
if content.success:
documents.append({
"text": content.text,
"metadata": {
"topic": topic,
"url": source.url,
"title": source.title,
"confidence": source.confidence,
"updated_at": datetime.now().isoformat(),
"cortex_verified": True
}
})
# Add to vector database
if documents:
self.collection.add(
documents=[doc["text"] for doc in documents],
metadatas=[doc["metadata"] for doc in documents],
ids=[f"{topic}_{i}_{int(time.time())}" for i in range(len(documents))]
)
print(f"Added {len(documents)} documents for {topic}")
return result.success
def query_knowledge(self, question, top_k=5):
"""Query the knowledge base"""
# Search vector database
results = self.collection.query(
query_texts=[question],
n_results=top_k
)
if results["documents"]:
# Format context for LLM
context = "\n\n".join([
f"Source: {meta['title']} ({meta['url']})\n{doc}"
for doc, meta in zip(results["documents"][0], results["metadatas"][0])
])
return {
"context": context,
"sources": results["metadatas"][0],
"question": question
}
return None
def schedule_updates(self):
"""Schedule regular knowledge updates"""
for topic_info in self.topics:
topic = topic_info["topic"]
interval = topic_info["refresh_interval"]
# Schedule updates
schedule.every(interval).hours.do(self.update_knowledge, topic)
# Run scheduler
while True:
schedule.run_pending()
time.sleep(60)
# Usage
rag_system = RealTimeRAG(
cortex_api_key="your_cortex_key",
openai_api_key="your_openai_key"
)
# Add topics to monitor
rag_system.add_topic("artificial intelligence", refresh_interval_hours=4)
rag_system.add_topic("quantum computing", refresh_interval_hours=8)
rag_system.add_topic("renewable energy", refresh_interval_hours=12)
# Start scheduled updates (run in background)
import threading
scheduler_thread = threading.Thread(target=rag_system.schedule_updates)
scheduler_thread.daemon = True
scheduler_thread.start()
# Query the system
result = rag_system.query_knowledge("What are the latest AI safety developments?")
if result:
print("Context:", result["context"])
print("Sources:", len(result["sources"]))
Incremental Updates
class IncrementalRAG:
def __init__(self, cortex_client, vector_db):
self.cortex = cortex_client
self.vector_db = vector_db
self.change_tracker = {}
def track_sources(self, query, source_urls):
"""Track sources for change detection"""
self.change_tracker[query] = {
"urls": source_urls,
"last_check": datetime.now(),
"checksums": {}
}
# Store content checksums
for url in source_urls:
content = self.cortex.extract(url)
if content.success:
checksum = hashlib.md5(content.text.encode()).hexdigest()
self.change_tracker[query]["checksums"][url] = checksum
def check_for_updates(self, query):
"""Check if tracked sources have changed"""
if query not in self.change_tracker:
return False
tracker = self.change_tracker[query]
changes_detected = False
for url in tracker["urls"]:
content = self.cortex.extract(url)
if content.success:
new_checksum = hashlib.md5(content.text.encode()).hexdigest()
old_checksum = tracker["checksums"].get(url)
if new_checksum != old_checksum:
print(f"Change detected in {url}")
changes_detected = True
# Update vector database
self.update_document(url, content.text)
tracker["checksums"][url] = new_checksum
tracker["last_check"] = datetime.now()
return changes_detected
def update_document(self, url, new_content):
"""Update document in vector database"""
# Remove old version
self.vector_db.delete(where={"url": url})
# Add new version
self.vector_db.add(
documents=[new_content],
metadatas=[{"url": url, "updated_at": datetime.now().isoformat()}],
ids=[f"doc_{url}_{int(time.time())}"]
)
# Usage
incremental_rag = IncrementalRAG(cortex_client, vector_db)
# Initial search and tracking
result = cortex_client.search("AI safety research")
source_urls = [source.url for source in result.sources]
incremental_rag.track_sources("AI safety research", source_urls)
# Periodically check for updates
schedule.every(30).minutes.do(
incremental_rag.check_for_updates,
"AI safety research"
)
🎯 Domain-Specific RAG
Scientific Research RAG
class ScientificRAG:
def __init__(self, cortex_client):
self.cortex = cortex_client
def search_papers(self, query, limit=10):
"""Search for scientific papers"""
result = self.cortex.search(
query=f"{query} research papers",
max_results=limit,
domain_filter={
"include": [
"arxiv.org", "nature.com", "science.org",
"cell.com", "nejm.org", "pubmed.ncbi.nlm.nih.gov"
]
},
recency="year"
)
# Extract and validate papers
papers = []
for source in result.sources:
content = self.cortex.extract(source.url)
if content.success:
# Validate that it's actually a research paper
validation = self.cortex.validate(
claim=f"This is a peer-reviewed research paper about {query}",
sources=[source.url],
context={"domain": "science", "claim_type": "factual"}
)
if validation.confidence_score > 0.7:
papers.append({
"title": source.title,
"url": source.url,
"content": content.text,
"confidence": source.confidence,
"validation_score": validation.confidence_score,
"published_date": source.published_date
})
return papers
def generate_literature_review(self, topic):
"""Generate a literature review section"""
papers = self.search_papers(f"{topic} recent advances", limit=15)
# Group papers by subtopics
subtopics = {}
for paper in papers:
# Use first sentence as subtopic indicator
first_sentence = paper["content"].split('.')[0]
key_terms = self.extract_key_terms(first_sentence)
for term in key_terms:
if term not in subtopics:
subtopics[term] = []
subtopics[term].append(paper)
return {
"topic": topic,
"papers_found": len(papers),
"subtopics": subtopics,
"generated_at": datetime.now().isoformat()
}
# Usage
sci_rag = ScientificRAG(cortex_client)
review = sci_rag.generate_literature_review("machine learning interpretability")
News and Current Events RAG
class NewsRAG:
def __init__(self, cortex_client):
self.cortex = cortex_client
self.news_sources = [
"reuters.com", "bloomberg.com", "bbc.com",
"cnn.com", "npr.org", "wsj.com"
]
def get_breaking_news(self, topic):
"""Get breaking news about a topic"""
result = self.cortex.search(
query=f"breaking news {topic}",
max_results=8,
recency="day",
domain_filter={"include": self.news_sources}
)
# Validate news claims
validated_news = []
for source in result.sources:
content = self.cortex.extract(source.url)
if content.success:
# Extract key claims from the article
sentences = content.text.split('.')[:5] # First 5 sentences
validated_claims = []
for sentence in sentences:
if len(sentence.strip()) > 20:
validation = self.cortex.validate(
claim=sentence.strip(),
context={"domain": "news", "claim_type": "event"}
)
validated_claims.append({
"claim": sentence.strip(),
"validation_result": validation.validation_result,
"confidence": validation.confidence_score
})
validated_news.append({
"title": source.title,
"url": source.url,
"published_date": source.published_date,
"validated_claims": validated_claims,
"overall_confidence": source.confidence
})
return validated_news
def create_news_summary(self, topics, hours=24):
"""Create a comprehensive news summary"""
all_news = {}
for topic in topics:
news_items = self.get_breaking_news(f"{topic} past {hours} hours")
all_news[topic] = news_items
return {
"summary_period": f"Last {hours} hours",
"topics": all_news,
"generated_at": datetime.now().isoformat(),
"total_articles": sum(len(items) for items in all_news.values())
}
# Usage
news_rag = NewsRAG(cortex_client)
summary = news_rag.create_news_summary([
"artificial intelligence",
"climate change",
"space exploration"
], hours=24)
🚀 Production Deployment
Caching Strategy
class ProductionRAG:
def __init__(self, cortex_client, cache_client):
self.cortex = cortex_client
self.cache = cache_client
self.cache_ttl = {
"search": 3600, # 1 hour
"extract": 7200, # 2 hours
"validate": 1800 # 30 minutes
}
def cached_search(self, query, **kwargs):
"""Search with intelligent caching"""
cache_key = f"search:{hashlib.md5(query.encode()).hexdigest()}"
# Check cache first
cached_result = self.cache.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Perform search
result = self.cortex.search(query, **kwargs)
# Cache result
self.cache.setex(
cache_key,
self.cache_ttl["search"],
json.dumps(result.__dict__)
)
return result
def batch_process(self, queries, max_workers=5):
"""Process multiple queries efficiently"""
import concurrent.futures
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all queries
future_to_query = {
executor.submit(self.cached_search, query): query
for query in queries
}
# Collect results
for future in concurrent.futures.as_completed(future_to_query):
query = future_to_query[future]
try:
result = future.result()
results[query] = result
except Exception as exc:
print(f"Query {query} generated an exception: {exc}")
results[query] = None
return results
# Usage with Redis cache
import redis
cache_client = redis.Redis(host='localhost', port=6379, db=0)
prod_rag = ProductionRAG(cortex_client, cache_client)
Error Handling and Fallbacks
class RobustRAG:
def __init__(self, cortex_client, fallback_retriever=None):
self.cortex = cortex_client
self.fallback_retriever = fallback_retriever
self.max_retries = 3
def robust_search(self, query, **kwargs):
"""Search with error handling and fallbacks"""
for attempt in range(self.max_retries):
try:
result = self.cortex.search(query, **kwargs)
if result.success:
return result
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < self.max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
# Use fallback if available
if self.fallback_retriever:
print("Using fallback retriever")
return self.fallback_retriever.search(query)
# Return empty result if all fails
return self.create_empty_result(query)
def create_empty_result(self, query):
"""Create empty result structure"""
return {
"query": query,
"summary": f"Unable to find current information about '{query}'. Please try again later.",
"sources": [],
"success": False,
"error": "All retrieval methods failed"
}
# Usage with static fallback
from your_existing_rag import ExistingRetriever
fallback = ExistingRetriever()
robust_rag = RobustRAG(cortex_client, fallback)
📊 Performance Monitoring
RAG Performance Metrics
class RAGMonitor:
def __init__(self):
self.metrics = {
"queries": 0,
"successful_queries": 0,
"avg_response_time": 0,
"cache_hits": 0,
"source_quality": []
}
def track_query(self, query, result, response_time, cache_hit=False):
"""Track query performance"""
self.metrics["queries"] += 1
if result.success:
self.metrics["successful_queries"] += 1
# Track source quality
if hasattr(result, 'sources'):
for source in result.sources:
self.metrics["source_quality"].append(source.confidence)
# Update average response time
self.metrics["avg_response_time"] = (
(self.metrics["avg_response_time"] * (self.metrics["queries"] - 1) + response_time)
/ self.metrics["queries"]
)
if cache_hit:
self.metrics["cache_hits"] += 1
def get_performance_report(self):
"""Generate performance report"""
success_rate = (
self.metrics["successful_queries"] / self.metrics["queries"]
if self.metrics["queries"] > 0 else 0
)
cache_hit_rate = (
self.metrics["cache_hits"] / self.metrics["queries"]
if self.metrics["queries"] > 0 else 0
)
avg_source_quality = (
sum(self.metrics["source_quality"]) / len(self.metrics["source_quality"])
if self.metrics["source_quality"] else 0
)
return {
"total_queries": self.metrics["queries"],
"success_rate": success_rate,
"avg_response_time": self.metrics["avg_response_time"],
"cache_hit_rate": cache_hit_rate,
"avg_source_quality": avg_source_quality
}
# Usage
monitor = RAGMonitor()
# In your RAG query function
start_time = time.time()
result = cortex_client.search(query)
response_time = time.time() - start_time
monitor.track_query(query, result, response_time)
# Generate report
report = monitor.get_performance_report()
print(f"Success rate: {report['success_rate']:.2%}")
print(f"Avg response time: {report['avg_response_time']:.2f}s")
💡 Best Practices
1. Query Optimization
def optimize_query_for_rag(user_query, context=None):
"""Optimize user query for better RAG results"""
# Add temporal context for current information
if any(word in user_query.lower() for word in ["latest", "recent", "current", "new"]):
optimized_query = f"{user_query} 2024 2025"
else:
optimized_query = user_query
# Add domain context if available
if context and context.get("domain"):
optimized_query = f"{context['domain']} {optimized_query}"
# Add source type hints
if "research" in user_query.lower():
optimized_query = f"{optimized_query} study paper research"
elif "news" in user_query.lower():
optimized_query = f"{optimized_query} breaking news"
return optimized_query
2. Source Quality Filtering
def filter_high_quality_sources(sources, min_confidence=0.7):
"""Filter sources based on quality metrics"""
high_quality_sources = []
for source in sources:
# Check confidence score
if source.confidence < min_confidence:
continue
# Check domain authority
trusted_domains = [
"nature.com", "science.org", "arxiv.org",
"reuters.com", "bloomberg.com", "bbc.com"
]
if any(domain in source.url for domain in trusted_domains):
source.confidence += 0.1 # Boost trusted domains
# Check publication date (prefer recent)
if source.published_date:
days_old = (datetime.now() - source.published_date).days
if days_old > 365: # Older than 1 year
source.confidence -= 0.1
high_quality_sources.append(source)
return sorted(high_quality_sources, key=lambda x: x.confidence, reverse=True)
3. Citation Management
def format_citations(sources, style="academic"):
"""Format citations in different styles"""
citations = []
for i, source in enumerate(sources, 1):
if style == "academic":
citation = f"[{i}] {source.title}. {source.url} (Accessed: {datetime.now().strftime('%Y-%m-%d')})"
elif style == "web":
citation = f"[{i}] {source.title} - {source.url}"
elif style == "footnote":
citation = f"[^{i}]: {source.title} ({source.url})"
citations.append(citation)
return citations
def add_citations_to_response(response_text, sources):
"""Add citations to generated response"""
# Simple citation injection (you can make this more sophisticated)
sentences = response_text.split('.')
cited_response = []
for i, sentence in enumerate(sentences):
if sentence.strip():
# Add citation to every other sentence
if i % 2 == 0 and i // 2 < len(sources):
citation_num = i // 2 + 1
sentence = f"{sentence.strip()} [{citation_num}]"
cited_response.append(sentence)
return '. '.join(cited_response)
Next: LangChain Setup → - Complete LangChain integration guide