Best Practices Guide
Production-ready guidelines for building reliable, efficient, and scalable applications with Cortex.
🚀 Performance Optimization
Query Optimization
✅ Do:
- Use specific, targeted queries
- Include temporal context when needed
- Filter by domain for focused results
- Use appropriate
max_results(5-10 for most cases)
# Good: Specific and targeted
result = client.search(
query="Tesla Q4 2024 earnings financial results",
max_results=8,
recency="week",
domain_filter={"include": ["reuters.com", "bloomberg.com", "sec.gov"]}
)
# Good: Include temporal context
result = client.search("iPhone 15 Pro reviews benchmarks 2024")
❌ Avoid:
- Overly broad queries
- Single-word searches
- Excessive
max_results(>15) - Queries without context
# Bad: Too broad
result = client.search("AI")
# Bad: No context
result = client.search("stock price", max_results=50)
Caching Strategies
import hashlib
import time
from typing import Optional, Dict, Any
class SmartCache:
def __init__(self, default_ttl: int = 3600):
self.cache: Dict[str, tuple] = {}
self.default_ttl = default_ttl
def get_cache_key(self, query: str, **params) -> str:
"""Generate consistent cache key"""
cache_data = f"{query}_{str(sorted(params.items()))}"
return hashlib.md5(cache_data.encode()).hexdigest()
def get(self, key: str) -> Optional[Any]:
"""Get cached result if still valid"""
if key in self.cache:
result, timestamp, ttl = self.cache[key]
if time.time() - timestamp < ttl:
return result
else:
del self.cache[key]
return None
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
"""Cache result with TTL"""
ttl = ttl or self.default_ttl
self.cache[key] = (value, time.time(), ttl)
def invalidate_pattern(self, pattern: str) -> None:
"""Invalidate cache entries matching pattern"""
keys_to_delete = [key for key in self.cache.keys() if pattern in key]
for key in keys_to_delete:
del self.cache[key]
class CachedCortexClient:
def __init__(self, cortex_client, cache_ttl_strategies=None):
self.client = cortex_client
self.cache = SmartCache()
# Default TTL strategies based on content type
self.ttl_strategies = cache_ttl_strategies or {
"news": 1800, # 30 minutes
"financial": 3600, # 1 hour
"research": 7200, # 2 hours
"reference": 86400, # 24 hours
"default": 3600 # 1 hour
}
def smart_search(self, query: str, content_type: str = "default", **kwargs):
"""Search with intelligent caching"""
cache_key = self.cache.get_cache_key(query, **kwargs)
# Check cache first
cached_result = self.cache.get(cache_key)
if cached_result:
return cached_result
# Perform search
result = self.client.search(query, **kwargs)
# Cache with appropriate TTL
ttl = self.ttl_strategies.get(content_type, self.ttl_strategies["default"])
if result.success:
self.cache.set(cache_key, result, ttl)
return result
# Usage
cached_client = CachedCortexClient(cortex_client)
# News searches cached for 30 minutes
news_result = cached_client.smart_search(
"breaking tech news today",
content_type="news"
)
# Research searches cached for 2 hours
research_result = cached_client.smart_search(
"quantum computing research papers",
content_type="research"
)
Batch Processing
import asyncio
import concurrent.futures
from typing import List, Dict, Any
class BatchProcessor:
def __init__(self, cortex_client, max_workers: int = 5, rate_limit: float = 1.0):
self.client = cortex_client
self.max_workers = max_workers
self.rate_limit = rate_limit # Seconds between requests
def process_queries_parallel(self, queries: List[str]) -> Dict[str, Any]:
"""Process multiple queries in parallel with rate limiting"""
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all queries
future_to_query = {}
for i, query in enumerate(queries):
# Stagger requests to respect rate limits
time.sleep(i * self.rate_limit / self.max_workers)
future = executor.submit(self._safe_search, query)
future_to_query[future] = query
# Collect results
for future in concurrent.futures.as_completed(future_to_query):
query = future_to_query[future]
try:
result = future.result(timeout=30)
results[query] = result
except Exception as e:
results[query] = {"error": str(e), "success": False}
return results
def _safe_search(self, query: str) -> Dict[str, Any]:
"""Safe search with error handling"""
try:
result = self.client.search(query, max_results=5)
return {
"success": result.success,
"summary": result.summary if result.success else None,
"sources": result.sources if result.success else [],
"error": None
}
except Exception as e:
return {"success": False, "error": str(e)}
async def process_queries_async(self, queries: List[str]) -> Dict[str, Any]:
"""Async processing for better concurrency"""
semaphore = asyncio.Semaphore(self.max_workers)
async def search_with_semaphore(query: str):
async with semaphore:
await asyncio.sleep(self.rate_limit) # Rate limiting
return await self._async_search(query)
tasks = [search_with_semaphore(query) for query in queries]
results = await asyncio.gather(*tasks, return_exceptions=True)
return dict(zip(queries, results))
async def _async_search(self, query: str):
"""Async search wrapper"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._safe_search, query)
# Usage
processor = BatchProcessor(cortex_client, max_workers=3)
queries = [
"AI developments 2024",
"quantum computing news",
"renewable energy trends",
"blockchain adoption",
"space technology updates"
]
# Parallel processing
results = processor.process_queries_parallel(queries)
# Async processing
# results = asyncio.run(processor.process_queries_async(queries))
for query, result in results.items():
if result["success"]:
print(f"✅ {query}: {len(result['sources'])} sources")
else:
print(f"❌ {query}: {result['error']}")
🛡️ Error Handling & Resilience
Retry Strategies
import time
import random
import logging
from functools import wraps
from typing import Callable, Any
def retry_with_exponential_backoff(
max_retries: int = 3,
initial_delay: float = 1.0,
exponential_base: float = 2.0,
jitter: bool = True,
exceptions: tuple = (Exception,)
):
"""Decorator for exponential backoff retry"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_retries:
break
# Calculate delay
delay = initial_delay * (exponential_base ** attempt)
# Add jitter to avoid thundering herd
if jitter:
delay *= (0.5 + random.random() / 2)
logging.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
time.sleep(delay)
# All retries exhausted
raise last_exception
return wrapper
return decorator
class ResilientCortexClient:
def __init__(self, cortex_client, circuit_breaker_threshold: int = 5):
self.client = cortex_client
self.circuit_breaker_threshold = circuit_breaker_threshold
self.failure_count = 0
self.last_failure_time = 0
self.circuit_open = False
def _is_circuit_open(self) -> bool:
"""Check if circuit breaker is open"""
if self.circuit_open:
# Try to close circuit after 60 seconds
if time.time() - self.last_failure_time > 60:
self.circuit_open = False
self.failure_count = 0
return False
return True
return False
def _record_success(self):
"""Record successful operation"""
self.failure_count = 0
self.circuit_open = False
def _record_failure(self):
"""Record failed operation"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.circuit_breaker_threshold:
self.circuit_open = True
logging.error("Circuit breaker opened due to repeated failures")
@retry_with_exponential_backoff(
max_retries=3,
exceptions=(Exception,)
)
def resilient_search(self, query: str, **kwargs):
"""Search with circuit breaker and retry logic"""
if self._is_circuit_open():
raise Exception("Circuit breaker is open - service temporarily unavailable")
try:
result = self.client.search(query, **kwargs)
if result.success:
self._record_success()
return result
else:
self._record_failure()
raise Exception(f"Search failed: {result.error}")
except Exception as e:
self._record_failure()
raise e
def get_health_status(self) -> Dict[str, Any]:
"""Get client health status"""
return {
"circuit_open": self.circuit_open,
"failure_count": self.failure_count,
"last_failure_time": self.last_failure_time,
"healthy": not self.circuit_open and self.failure_count < 3
}
# Usage
resilient_client = ResilientCortexClient(cortex_client)
try:
result = resilient_client.resilient_search("AI developments")
print("Search successful:", result.summary[:100])
except Exception as e:
print(f"Search failed: {e}")
# Check health status
health = resilient_client.get_health_status()
print(f"Client health: {health}")
Graceful Degradation
class GracefulCortexClient:
def __init__(self, cortex_client, fallback_sources=None):
self.client = cortex_client
self.fallback_sources = fallback_sources or []
self.performance_metrics = {
"avg_response_time": 0,
"success_rate": 1.0,
"total_requests": 0,
"successful_requests": 0
}
def search_with_fallback(self, query: str, **kwargs):
"""Search with multiple fallback strategies"""
start_time = time.time()
self.performance_metrics["total_requests"] += 1
try:
# Primary search with timeout
result = self._search_with_timeout(query, timeout=10, **kwargs)
if result and result.success:
self._update_metrics(start_time, success=True)
return result
except Exception as e:
logging.warning(f"Primary search failed: {e}")
# Fallback 1: Retry with reduced parameters
try:
simplified_result = self._search_with_timeout(
query,
timeout=15,
max_results=3,
**{k: v for k, v in kwargs.items() if k not in ['domain_filter', 'recency']}
)
if simplified_result and simplified_result.success:
self._update_metrics(start_time, success=True)
return self._mark_as_fallback(simplified_result, "simplified_search")
except Exception as e:
logging.warning(f"Simplified search failed: {e}")
# Fallback 2: Use cached or static sources
if self.fallback_sources:
try:
fallback_result = self._search_fallback_sources(query)
if fallback_result:
self._update_metrics(start_time, success=True)
return fallback_result
except Exception as e:
logging.warning(f"Fallback sources failed: {e}")
# Final fallback: Return minimal response
self._update_metrics(start_time, success=False)
return self._create_minimal_response(query)
def _search_with_timeout(self, query: str, timeout: int, **kwargs):
"""Search with timeout"""
import signal
def timeout_handler(signum, frame):
raise TimeoutError("Search timeout")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
try:
result = self.client.search(query, **kwargs)
signal.alarm(0) # Cancel alarm
return result
except TimeoutError:
signal.alarm(0)
raise TimeoutError(f"Search timed out after {timeout}s")
def _search_fallback_sources(self, query: str):
"""Search using fallback sources"""
# Implementation would depend on your fallback sources
# This is a placeholder for custom fallback logic
return None
def _create_minimal_response(self, query: str):
"""Create minimal response when all methods fail"""
return {
"query": query,
"summary": f"Unable to retrieve current information about '{query}'. Please try again later or check your connection.",
"sources": [],
"success": False,
"fallback_used": "minimal_response",
"metadata": {
"processing_time": 0,
"sources_analyzed": 0,
"error": "All search methods failed"
}
}
def _mark_as_fallback(self, result, fallback_type: str):
"""Mark result as coming from fallback"""
if hasattr(result, '__dict__'):
result.fallback_used = fallback_type
return result
def _update_metrics(self, start_time: float, success: bool):
"""Update performance metrics"""
response_time = time.time() - start_time
if success:
self.performance_metrics["successful_requests"] += 1
# Update average response time
total = self.performance_metrics["total_requests"]
current_avg = self.performance_metrics["avg_response_time"]
self.performance_metrics["avg_response_time"] = (
(current_avg * (total - 1) + response_time) / total
)
# Update success rate
self.performance_metrics["success_rate"] = (
self.performance_metrics["successful_requests"] / total
)
def get_performance_report(self):
"""Get performance report"""
return self.performance_metrics
# Usage
graceful_client = GracefulCortexClient(cortex_client)
result = graceful_client.search_with_fallback("latest AI developments")
print("Search result:", result.get("summary", "No summary available"))
if hasattr(result, 'fallback_used'):
print(f"Used fallback: {result.fallback_used}")
# Check performance
performance = graceful_client.get_performance_report()
print(f"Success rate: {performance['success_rate']:.2%}")
print(f"Avg response time: {performance['avg_response_time']:.2f}s")
💰 Cost Optimization
Usage Monitoring
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
class CostTracker:
def __init__(self, cortex_client, budget_limits: Optional[Dict] = None):
self.client = cortex_client
self.budget_limits = budget_limits or {
"daily_calls": 100,
"monthly_calls": 2000,
"daily_cost_usd": 10.0
}
self.usage_log = []
self.alerts_sent = set()
def track_request(self, operation: str, cost_estimate: float = 0.01):
"""Track API request with cost estimation"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"operation": operation,
"cost_estimate": cost_estimate
}
self.usage_log.append(log_entry)
# Check budget limits
self._check_budget_limits()
def get_usage_report(self, period: str = "today") -> Dict:
"""Get usage report for specified period"""
if period == "today":
start_time = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
elif period == "week":
start_time = datetime.now() - timedelta(days=7)
elif period == "month":
start_time = datetime.now() - timedelta(days=30)
else:
start_time = datetime.min
filtered_logs = [
log for log in self.usage_log
if datetime.fromisoformat(log["timestamp"]) >= start_time
]
total_calls = len(filtered_logs)
total_cost = sum(log["cost_estimate"] for log in filtered_logs)
operation_breakdown = {}
for log in filtered_logs:
op = log["operation"]
operation_breakdown[op] = operation_breakdown.get(op, 0) + 1
return {
"period": period,
"total_calls": total_calls,
"total_cost_estimate": total_cost,
"operation_breakdown": operation_breakdown,
"average_cost_per_call": total_cost / total_calls if total_calls > 0 else 0
}
def _check_budget_limits(self):
"""Check if budget limits are exceeded"""
today_report = self.get_usage_report("today")
month_report = self.get_usage_report("month")
# Daily limits
if (today_report["total_calls"] >= self.budget_limits["daily_calls"] and
"daily_calls" not in self.alerts_sent):
self._send_alert("daily_calls", today_report["total_calls"])
if (today_report["total_cost_estimate"] >= self.budget_limits["daily_cost_usd"] and
"daily_cost" not in self.alerts_sent):
self._send_alert("daily_cost", today_report["total_cost_estimate"])
# Monthly limits
if (month_report["total_calls"] >= self.budget_limits["monthly_calls"] and
"monthly_calls" not in self.alerts_sent):
self._send_alert("monthly_calls", month_report["total_calls"])
def _send_alert(self, alert_type: str, current_value: float):
"""Send budget alert"""
print(f"🚨 BUDGET ALERT: {alert_type} limit exceeded - Current: {current_value}")
self.alerts_sent.add(alert_type)
def export_usage_data(self, filename: str):
"""Export usage data to JSON"""
with open(filename, 'w') as f:
json.dump(self.usage_log, f, indent=2)
class CostOptimizedClient:
def __init__(self, cortex_client, cost_tracker: CostTracker):
self.client = cortex_client
self.tracker = cost_tracker
# Cost estimates per operation (adjust based on your plan)
self.operation_costs = {
"search": 0.01,
"extract": 0.005,
"validate": 0.015,
"cache_query": 0.001
}
def smart_search(self, query: str, force_search: bool = False, **kwargs):
"""Cost-optimized search with budget checking"""
# Check if we're approaching budget limits
today_usage = self.tracker.get_usage_report("today")
if (today_usage["total_calls"] >= self.tracker.budget_limits["daily_calls"] * 0.9
and not force_search):
return self._create_budget_limited_response(query)
# Check cache first (much cheaper)
cache_result = self._check_cache(query)
if cache_result:
self.tracker.track_request("cache_hit", 0.001)
return cache_result
# Perform search
try:
result = self.client.search(query, **kwargs)
self.tracker.track_request("search", self.operation_costs["search"])
# Cache result for future use
self._store_in_cache(query, result)
return result
except Exception as e:
self.tracker.track_request("search_failed", 0)
raise e
def _check_cache(self, query: str):
"""Check if query is cached (placeholder)"""
# Implement your caching logic here
return None
def _store_in_cache(self, query: str, result):
"""Store result in cache (placeholder)"""
# Implement your caching logic here
pass
def _create_budget_limited_response(self, query: str):
"""Create response when budget limit is approached"""
return {
"query": query,
"summary": f"Daily API limit approaching. Unable to process new search for '{query}'. Please try again tomorrow or upgrade your plan.",
"sources": [],
"success": False,
"budget_limited": True
}
# Usage
cost_tracker = CostTracker(cortex_client, {
"daily_calls": 50,
"monthly_calls": 1000,
"daily_cost_usd": 5.0
})
cost_optimized_client = CostOptimizedClient(cortex_client, cost_tracker)
# Use cost-optimized search
result = cost_optimized_client.smart_search("AI developments")
# Get usage report
usage_report = cost_tracker.get_usage_report("today")
print(f"Today's usage: {usage_report['total_calls']} calls, ${usage_report['total_cost_estimate']:.2f}")
Query Efficiency
class QueryOptimizer:
def __init__(self):
self.common_terms = {
"latest", "recent", "current", "new", "breaking",
"update", "development", "trend", "analysis"
}
self.domain_keywords = {
"technology": ["AI", "machine learning", "software", "tech", "digital"],
"finance": ["stock", "market", "investment", "trading", "financial"],
"science": ["research", "study", "discovery", "experiment", "paper"],
"news": ["breaking", "report", "announcement", "statement", "news"]
}
def optimize_query(self, query: str, context: Optional[Dict] = None) -> str:
"""Optimize query for better results and lower cost"""
# Remove redundant words
words = query.lower().split()
filtered_words = []
for word in words:
if word not in ["the", "a", "an", "and", "or", "but", "in", "on", "at"]:
filtered_words.append(word)
optimized_query = " ".join(filtered_words)
# Add temporal context if missing
if any(term in optimized_query for term in ["latest", "recent", "current"]):
if "2024" not in optimized_query and "2025" not in optimized_query:
optimized_query = f"{optimized_query} 2024"
# Add domain context if specified
if context and context.get("domain"):
domain = context["domain"]
if domain in self.domain_keywords:
domain_terms = self.domain_keywords[domain]
if not any(term in optimized_query.lower() for term in domain_terms):
optimized_query = f"{domain} {optimized_query}"
return optimized_query
def estimate_query_cost(self, query: str, max_results: int = 5) -> float:
"""Estimate query cost based on complexity"""
base_cost = 0.01
# More results = higher cost
result_multiplier = max_results / 5
# Complex queries cost more
complexity_score = len(query.split()) / 10
complexity_multiplier = 1 + complexity_score
return base_cost * result_multiplier * complexity_multiplier
def suggest_alternatives(self, query: str) -> List[str]:
"""Suggest more efficient query alternatives"""
alternatives = []
# Shorter version
words = query.split()
if len(words) > 5:
short_version = " ".join(words[:5])
alternatives.append({
"query": short_version,
"reason": "Shorter query for faster results",
"cost_saving": "20%"
})
# More specific version
if not any(year in query for year in ["2024", "2025", "2023"]):
specific_version = f"{query} 2024"
alternatives.append({
"query": specific_version,
"reason": "Added year for more relevant results",
"cost_saving": "10%"
})
return alternatives
# Usage
optimizer = QueryOptimizer()
# Optimize query
original_query = "What are the latest developments in artificial intelligence and machine learning"
optimized_query = optimizer.optimize_query(original_query, {"domain": "technology"})
print(f"Original: {original_query}")
print(f"Optimized: {optimized_query}")
# Estimate cost
cost_estimate = optimizer.estimate_query_cost(optimized_query, max_results=8)
print(f"Estimated cost: ${cost_estimate:.3f}")
# Get alternatives
alternatives = optimizer.suggest_alternatives(original_query)
for alt in alternatives:
print(f"Alternative: {alt['query']} (Saves: {alt['cost_saving']})")
🔒 Security Best Practices
API Key Management
import os
import json
import keyring
from cryptography.fernet import Fernet
from pathlib import Path
class SecureCredentialManager:
def __init__(self, app_name: str = "cortex-app"):
self.app_name = app_name
self.config_dir = Path.home() / f".{app_name}"
self.config_dir.mkdir(exist_ok=True)
def store_api_key(self, api_key: str, use_system_keyring: bool = True):
"""Securely store API key"""
if use_system_keyring:
try:
keyring.set_password(self.app_name, "cortex_api_key", api_key)
print("API key stored in system keyring")
return
except Exception as e:
print(f"Failed to use system keyring: {e}")
# Fallback: encrypted file storage
self._store_encrypted_key(api_key)
def get_api_key(self, use_system_keyring: bool = True) -> str:
"""Retrieve API key securely"""
if use_system_keyring:
try:
key = keyring.get_password(self.app_name, "cortex_api_key")
if key:
return key
except Exception as e:
print(f"Failed to retrieve from keyring: {e}")
# Fallback: encrypted file
return self._get_encrypted_key()
def _store_encrypted_key(self, api_key: str):
"""Store API key in encrypted file"""
# Generate or load encryption key
key_file = self.config_dir / "key.bin"
if key_file.exists():
with open(key_file, "rb") as f:
encryption_key = f.read()
else:
encryption_key = Fernet.generate_key()
with open(key_file, "wb") as f:
f.write(encryption_key)
os.chmod(key_file, 0o600) # Read-only for owner
# Encrypt and store API key
fernet = Fernet(encryption_key)
encrypted_key = fernet.encrypt(api_key.encode())
config_file = self.config_dir / "config.enc"
with open(config_file, "wb") as f:
f.write(encrypted_key)
os.chmod(config_file, 0o600)
def _get_encrypted_key(self) -> str:
"""Retrieve API key from encrypted file"""
key_file = self.config_dir / "key.bin"
config_file = self.config_dir / "config.enc"
if not key_file.exists() or not config_file.exists():
raise ValueError("No stored API key found")
with open(key_file, "rb") as f:
encryption_key = f.read()
with open(config_file, "rb") as f:
encrypted_key = f.read()
fernet = Fernet(encryption_key)
return fernet.decrypt(encrypted_key).decode()
class SecureCortexClient:
def __init__(self, app_name: str = "my-cortex-app"):
self.credential_manager = SecureCredentialManager(app_name)
self.client = None
self._initialize_client()
def _initialize_client(self):
"""Initialize Cortex client with secure credentials"""
try:
api_key = self.credential_manager.get_api_key()
self.client = CortexClient(api_key=api_key)
except ValueError:
print("No API key found. Please set your API key first.")
self._prompt_for_api_key()
def _prompt_for_api_key(self):
"""Prompt user for API key and store securely"""
import getpass
api_key = getpass.getpass("Please enter your Cortex API key: ")
if api_key.startswith("cortex_sk_"):
self.credential_manager.store_api_key(api_key)
self.client = CortexClient(api_key=api_key)
print("API key stored securely")
else:
raise ValueError("Invalid API key format")
def search(self, query: str, **kwargs):
"""Secure search with input validation"""
if not self.client:
raise ValueError("Client not initialized")
# Input validation
if not query or len(query.strip()) == 0:
raise ValueError("Query cannot be empty")
if len(query) > 500:
raise ValueError("Query too long (max 500 characters)")
# Sanitize query (remove potential injection attempts)
sanitized_query = self._sanitize_input(query)
return self.client.search(sanitized_query, **kwargs)
def _sanitize_input(self, input_string: str) -> str:
"""Sanitize user input"""
# Remove potential script injections
dangerous_patterns = [
"<script", "</script>", "javascript:", "vbscript:",
"onload=", "onerror=", "onclick=", "data:"
]
sanitized = input_string
for pattern in dangerous_patterns:
sanitized = sanitized.replace(pattern, "")
return sanitized.strip()
# Usage
secure_client = SecureCortexClient("my-app")
# First run will prompt for API key
result = secure_client.search("AI developments")
Input Validation & Sanitization
import re
import html
from typing import Dict, Any, Optional
class InputValidator:
def __init__(self):
self.max_lengths = {
"query": 500,
"url": 2000,
"claim": 1000
}
# Patterns for potentially malicious input
self.dangerous_patterns = [
r'<script[^>]*>.*?</script>',
r'javascript:',
r'vbscript:',
r'data:text\/html',
r'onload\s*=',
r'onerror\s*=',
r'onclick\s*=',
r'<iframe[^>]*>',
r'<object[^>]*>',
r'<embed[^>]*>'
]
def validate_query(self, query: str) -> str:
"""Validate and sanitize search query"""
if not query:
raise ValueError("Query cannot be empty")
if not isinstance(query, str):
raise ValueError("Query must be a string")
# Check length
if len(query) > self.max_lengths["query"]:
raise ValueError(f"Query too long (max {self.max_lengths['query']} characters)")
# Sanitize
sanitized = self._sanitize_string(query)
# Final validation
if len(sanitized.strip()) == 0:
raise ValueError("Query cannot be empty after sanitization")
return sanitized
def validate_url(self, url: str) -> str:
"""Validate and sanitize URL"""
if not url:
raise ValueError("URL cannot be empty")
if len(url) > self.max_lengths["url"]:
raise ValueError(f"URL too long (max {self.max_lengths['url']} characters)")
# Basic URL format validation
url_pattern = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
if not url_pattern.match(url):
raise ValueError("Invalid URL format")
# Sanitize
sanitized = self._sanitize_string(url)
return sanitized
def validate_parameters(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Validate API parameters"""
validated = {}
for key, value in params.items():
if key == "max_results":
if not isinstance(value, int) or value < 1 or value > 20:
raise ValueError("max_results must be integer between 1 and 20")
validated[key] = value
elif key == "language":
if not isinstance(value, str) or len(value) > 10:
raise ValueError("Invalid language parameter")
validated[key] = self._sanitize_string(value)
elif key == "country":
if not isinstance(value, str) or len(value) > 10:
raise ValueError("Invalid country parameter")
validated[key] = self._sanitize_string(value)
elif key == "recency":
valid_recency = ["day", "week", "month", "year", "auto"]
if value not in valid_recency:
raise ValueError(f"recency must be one of: {valid_recency}")
validated[key] = value
elif key == "format":
valid_formats = ["json", "markdown", "text", "html"]
if value not in valid_formats:
raise ValueError(f"format must be one of: {valid_formats}")
validated[key] = value
else:
# For other parameters, basic sanitization
if isinstance(value, str):
validated[key] = self._sanitize_string(value)
else:
validated[key] = value
return validated
def _sanitize_string(self, input_string: str) -> str:
"""Sanitize string input"""
# HTML escape
sanitized = html.escape(input_string)
# Remove dangerous patterns
for pattern in self.dangerous_patterns:
sanitized = re.sub(pattern, '', sanitized, flags=re.IGNORECASE)
# Remove null bytes and control characters
sanitized = ''.join(char for char in sanitized if ord(char) >= 32 or char in '\t\n\r')
return sanitized.strip()
class ValidatedCortexClient:
def __init__(self, cortex_client):
self.client = cortex_client
self.validator = InputValidator()
def search(self, query: str, **kwargs):
"""Search with input validation"""
# Validate query
validated_query = self.validator.validate_query(query)
# Validate parameters
validated_params = self.validator.validate_parameters(kwargs)
# Perform search
return self.client.search(validated_query, **validated_params)
def extract(self, url: str, **kwargs):
"""Extract with URL validation"""
# Validate URL
validated_url = self.validator.validate_url(url)
# Validate parameters
validated_params = self.validator.validate_parameters(kwargs)
# Perform extraction
return self.client.extract(validated_url, **validated_params)
# Usage
validator = InputValidator()
validated_client = ValidatedCortexClient(cortex_client)
try:
# This will validate input and sanitize it
result = validated_client.search("What are the latest AI developments?", max_results=8)
print("Search successful")
except ValueError as e:
print(f"Validation error: {e}")
📊 Monitoring & Observability
Performance Monitoring
import time
import logging
import statistics
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime, timedelta
@dataclass
class RequestMetrics:
timestamp: datetime
operation: str
duration: float
success: bool
error_type: Optional[str] = None
response_size: int = 0
cache_hit: bool = False
class PerformanceMonitor:
def __init__(self, retention_days: int = 7):
self.metrics: List[RequestMetrics] = []
self.retention_days = retention_days
self.logger = self._setup_logger()
def _setup_logger(self):
logger = logging.getLogger("cortex_performance")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def record_request(self, operation: str, duration: float, success: bool,
error_type: str = None, response_size: int = 0, cache_hit: bool = False):
"""Record request metrics"""
metric = RequestMetrics(
timestamp=datetime.now(),
operation=operation,
duration=duration,
success=success,
error_type=error_type,
response_size=response_size,
cache_hit=cache_hit
)
self.metrics.append(metric)
# Log significant events
if not success:
self.logger.error(f"Request failed: {operation} - {error_type}")
elif duration > 10: # Slow request threshold
self.logger.warning(f"Slow request: {operation} took {duration:.2f}s")
# Clean old metrics
self._cleanup_old_metrics()
def _cleanup_old_metrics(self):
"""Remove metrics older than retention period"""
cutoff = datetime.now() - timedelta(days=self.retention_days)
self.metrics = [m for m in self.metrics if m.timestamp > cutoff]
def get_performance_summary(self, hours: int = 24) -> Dict:
"""Get performance summary for the last N hours"""
cutoff = datetime.now() - timedelta(hours=hours)
recent_metrics = [m for m in self.metrics if m.timestamp > cutoff]
if not recent_metrics:
return {"error": "No metrics available"}
total_requests = len(recent_metrics)
successful_requests = sum(1 for m in recent_metrics if m.success)
cache_hits = sum(1 for m in recent_metrics if m.cache_hit)
durations = [m.duration for m in recent_metrics]
response_sizes = [m.response_size for m in recent_metrics if m.response_size > 0]
# Error breakdown
errors = {}
for m in recent_metrics:
if not m.success and m.error_type:
errors[m.error_type] = errors.get(m.error_type, 0) + 1
# Operation breakdown
operations = {}
for m in recent_metrics:
operations[m.operation] = operations.get(m.operation, 0) + 1
return {
"period_hours": hours,
"total_requests": total_requests,
"success_rate": successful_requests / total_requests,
"cache_hit_rate": cache_hits / total_requests,
"avg_response_time": statistics.mean(durations),
"median_response_time": statistics.median(durations),
"p95_response_time": statistics.quantiles(durations, n=20)[18] if len(durations) > 20 else max(durations),
"avg_response_size": statistics.mean(response_sizes) if response_sizes else 0,
"error_breakdown": errors,
"operation_breakdown": operations
}
def get_alerts(self) -> List[Dict]:
"""Check for performance alerts"""
alerts = []
summary = self.get_performance_summary(hours=1) # Last hour
if summary.get("success_rate", 1.0) < 0.95:
alerts.append({
"type": "high_error_rate",
"message": f"Success rate dropped to {summary['success_rate']:.1%}",
"severity": "high"
})
if summary.get("avg_response_time", 0) > 5:
alerts.append({
"type": "slow_response",
"message": f"Average response time is {summary['avg_response_time']:.2f}s",
"severity": "medium"
})
if summary.get("cache_hit_rate", 1.0) < 0.3:
alerts.append({
"type": "low_cache_hit_rate",
"message": f"Cache hit rate is only {summary['cache_hit_rate']:.1%}",
"severity": "low"
})
return alerts
class MonitoredCortexClient:
def __init__(self, cortex_client):
self.client = cortex_client
self.monitor = PerformanceMonitor()
def search(self, query: str, **kwargs):
"""Search with performance monitoring"""
start_time = time.time()
operation = "search"
try:
result = self.client.search(query, **kwargs)
duration = time.time() - start_time
success = result.success if hasattr(result, 'success') else True
response_size = len(str(result)) if result else 0
self.monitor.record_request(
operation=operation,
duration=duration,
success=success,
response_size=response_size
)
return result
except Exception as e:
duration = time.time() - start_time
self.monitor.record_request(
operation=operation,
duration=duration,
success=False,
error_type=type(e).__name__
)
raise e
def get_performance_report(self):
"""Get comprehensive performance report"""
summary = self.monitor.get_performance_summary()
alerts = self.monitor.get_alerts()
return {
"performance_summary": summary,
"active_alerts": alerts,
"recommendations": self._get_recommendations(summary, alerts)
}
def _get_recommendations(self, summary: Dict, alerts: List[Dict]) -> List[str]:
"""Generate performance recommendations"""
recommendations = []
if summary.get("cache_hit_rate", 1.0) < 0.5:
recommendations.append("Consider implementing better caching strategies")
if summary.get("avg_response_time", 0) > 3:
recommendations.append("Consider reducing max_results or using domain filtering")
if summary.get("success_rate", 1.0) < 0.9:
recommendations.append("Implement retry logic and fallback mechanisms")
if any(alert["type"] == "high_error_rate" for alert in alerts):
recommendations.append("Check API key validity and rate limits")
return recommendations
# Usage
monitored_client = MonitoredCortexClient(cortex_client)
# Use monitored client
result = monitored_client.search("AI developments")
# Get performance report
report = monitored_client.get_performance_report()
print("Performance Summary:")
for key, value in report["performance_summary"].items():
print(f" {key}: {value}")
if report["active_alerts"]:
print("\nActive Alerts:")
for alert in report["active_alerts"]:
print(f" 🚨 {alert['message']} (Severity: {alert['severity']})")
if report["recommendations"]:
print("\nRecommendations:")
for rec in report["recommendations"]:
print(f" 💡 {rec}")
🚀 Production Deployment
Health Checks
from typing import Dict, List
import requests
import time
class HealthChecker:
def __init__(self, cortex_client):
self.client = cortex_client
self.health_endpoints = [
"https://api.usecortex.co/v1/health",
"https://status.usecortex.co"
]
def check_api_health(self) -> Dict:
"""Comprehensive API health check"""
health_results = {
"overall_status": "healthy",
"checks": {},
"timestamp": time.time()
}
# Test basic connectivity
try:
start_time = time.time()
test_result = self.client.search("test query", max_results=1)
response_time = time.time() - start_time
health_results["checks"]["api_connectivity"] = {
"status": "healthy" if response_time < 10 else "degraded",
"response_time": response_time,
"details": "API responding normally"
}
except Exception as e:
health_results["checks"]["api_connectivity"] = {
"status": "unhealthy",
"error": str(e),
"details": "API connectivity failed"
}
health_results["overall_status"] = "unhealthy"
# Check service status endpoints
for endpoint in self.health_endpoints:
try:
response = requests.get(endpoint, timeout=5)
health_results["checks"][f"endpoint_{endpoint}"] = {
"status": "healthy" if response.status_code == 200 else "degraded",
"status_code": response.status_code,
"response_time": response.elapsed.total_seconds()
}
except Exception as e:
health_results["checks"][f"endpoint_{endpoint}"] = {
"status": "unhealthy",
"error": str(e)
}
# Determine overall status
unhealthy_checks = [check for check in health_results["checks"].values()
if check["status"] == "unhealthy"]
if unhealthy_checks:
health_results["overall_status"] = "unhealthy"
elif any(check["status"] == "degraded" for check in health_results["checks"].values()):
health_results["overall_status"] = "degraded"
return health_results
def continuous_health_monitoring(self, interval: int = 60, callback=None):
"""Continuous health monitoring"""
while True:
health_status = self.check_api_health()
if callback:
callback(health_status)
if health_status["overall_status"] != "healthy":
print(f"⚠️ Health check failed: {health_status['overall_status']}")
for check_name, check_result in health_status["checks"].items():
if check_result["status"] != "healthy":
print(f" - {check_name}: {check_result}")
time.sleep(interval)
# Usage
health_checker = HealthChecker(cortex_client)
# One-time health check
health_status = health_checker.check_api_health()
print(f"Overall status: {health_status['overall_status']}")
# Continuous monitoring (run in background thread)
import threading
def health_alert_callback(health_status):
if health_status["overall_status"] != "healthy":
# Send alert (email, Slack, etc.)
print(f"🚨 HEALTH ALERT: System status is {health_status['overall_status']}")
monitor_thread = threading.Thread(
target=health_checker.continuous_health_monitoring,
args=(60, health_alert_callback),
daemon=True
)
monitor_thread.start()
📋 Testing Strategies
Unit Testing
import unittest
from unittest.mock import Mock, patch
import pytest
class TestCortexIntegration(unittest.TestCase):
def setUp(self):
self.mock_client = Mock()
self.mock_result = Mock()
self.mock_result.success = True
self.mock_result.summary = "Test summary"
self.mock_result.sources = []
def test_successful_search(self):
"""Test successful search operation"""
self.mock_client.search.return_value = self.mock_result
result = self.mock_client.search("test query")
self.assertTrue(result.success)
self.assertEqual(result.summary, "Test summary")
self.mock_client.search.assert_called_once_with("test query")
def test_search_with_parameters(self):
"""Test search with additional parameters"""
self.mock_client.search.return_value = self.mock_result
result = self.mock_client.search(
"test query",
max_results=10,
recency="week"
)
self.assertTrue(result.success)
self.mock_client.search.assert_called_once_with(
"test query",
max_results=10,
recency="week"
)
def test_search_failure(self):
"""Test search failure handling"""
self.mock_client.search.side_effect = Exception("API Error")
with self.assertRaises(Exception) as context:
self.mock_client.search("test query")
self.assertIn("API Error", str(context.exception))
@patch('time.sleep') # Mock sleep to speed up tests
def test_retry_logic(self, mock_sleep):
"""Test retry logic with failures"""
self.mock_client.search.side_effect = [
Exception("Temporary error"),
Exception("Another error"),
self.mock_result # Success on third try
]
# Your retry logic here
result = None
for attempt in range(3):
try:
result = self.mock_client.search("test query")
break
except Exception:
if attempt < 2:
continue
raise
self.assertIsNotNone(result)
self.assertEqual(self.mock_client.search.call_count, 3)
# Pytest version
@pytest.fixture
def mock_cortex_client():
client = Mock()
result = Mock()
result.success = True
result.summary = "Test summary"
result.sources = []
client.search.return_value = result
return client
def test_search_integration(mock_cortex_client):
"""Test search integration with pytest"""
result = mock_cortex_client.search("test query")
assert result.success
assert result.summary == "Test summary"
mock_cortex_client.search.assert_called_once_with("test query")
def test_error_handling(mock_cortex_client):
"""Test error handling"""
mock_cortex_client.search.side_effect = Exception("API Error")
with pytest.raises(Exception, match="API Error"):
mock_cortex_client.search("test query")
# Integration tests
class TestCortexLiveAPI(unittest.TestCase):
"""Integration tests with real API (use carefully)"""
@classmethod
def setUpClass(cls):
# Only run if API key is available
cls.api_key = os.getenv("CORTEX_API_KEY_TEST")
if cls.api_key:
cls.client = CortexClient(api_key=cls.api_key)
else:
cls.skipTest("No test API key available")
def test_basic_search(self):
"""Test basic search functionality"""
if not self.api_key:
self.skipTest("No API key")
result = self.client.search("test search query", max_results=3)
self.assertIsNotNone(result)
# Add appropriate assertions based on your API response structure
@unittest.skip("Expensive test - run manually")
def test_large_batch(self):
"""Test batch processing (expensive - skip in normal runs)"""
queries = [f"test query {i}" for i in range(10)]
results = []
for query in queries:
try:
result = self.client.search(query, max_results=1)
results.append(result)
time.sleep(1) # Rate limiting
except Exception as e:
results.append(None)
success_count = sum(1 for r in results if r and hasattr(r, 'success') and r.success)
self.assertGreater(success_count, 7) # At least 70% success rate
if __name__ == "__main__":
unittest.main()
This comprehensive Best Practices guide covers the essential aspects of production-ready Cortex integration. Next, let's move on to the Use Cases section to show practical applications.
Next: AI Agents → - Building intelligent agents with real-time knowledge