Audit Logs
Cortex provides comprehensive audit logging to help you monitor API usage, track security events, and maintain compliance. All requests and responses are logged with detailed metadata for analysis and troubleshooting.
Overview
Audit logs capture every interaction with the Cortex API, including:
- API Requests: Query details, parameters, and metadata
- Responses: Results, sources, confidence scores
- Security Events: Authentication failures, rate limits, suspicious activity
- System Events: Key rotations, configuration changes
Log Retention by Plan
| Plan | Retention Period | Search History | Export Options |
|---|---|---|---|
| Free | 7 days | Last 100 requests | JSON download |
| Starter | 30 days | Last 1,000 requests | JSON, CSV |
| Pro | 90 days | Last 10,000 requests | JSON, CSV, API |
| Enterprise | Custom (up to 7 years) | Unlimited | All formats + SIEM |
Accessing Audit Logs
Dashboard
- Log into your Cortex Dashboard
- Navigate to Audit Logs
- Use filters to find specific events
- Export logs for external analysis
API Access (Pro+)
import cortex
client = cortex.Client(api_key="your_api_key")
# Get recent audit logs
logs = client.audit.get_logs(
start_date="2025-01-01",
end_date="2025-01-31",
event_types=["api_request", "security_event"],
limit=1000
)
for log in logs:
print(f"Event: {log.event_type}")
print(f"Timestamp: {log.timestamp}")
print(f"Details: {log.details}")
Real-time Streaming (Enterprise)
# Stream audit logs in real-time
def handle_audit_event(event):
if event.type == "security_event":
send_alert(event)
store_in_siem(event)
# Subscribe to audit stream
client.audit.stream_logs(callback=handle_audit_event)
Log Structure
Standard Request Log
{
"id": "log_1234567890abcdef",
"timestamp": "2025-01-15T10:30:45.123Z",
"event_type": "api_request",
"api_key_id": "key_abcdef123456",
"endpoint": "/v1/search",
"method": "POST",
"request": {
"query": "latest AI developments",
"max_results": 5,
"profile": "tech_news",
"user_agent": "cortex-python-sdk/1.0.0"
},
"response": {
"status_code": 200,
"response_time_ms": 245,
"sources_found": 8,
"sources_used": 5,
"confidence_score": 0.94,
"tokens_used": 1247
},
"metadata": {
"ip_address": "192.168.1.100",
"user_id": "user_123",
"session_id": "sess_456",
"trace_id": "trace_789"
}
}
Security Event Log
{
"id": "log_9876543210fedcba",
"timestamp": "2025-01-15T10:31:02.456Z",
"event_type": "security_event",
"severity": "medium",
"category": "authentication",
"event": "invalid_api_key",
"details": {
"api_key_prefix": "ctx_invalid...",
"ip_address": "203.0.113.42",
"user_agent": "suspicious-bot/1.0",
"attempted_endpoint": "/v1/search"
},
"response": {
"status_code": 401,
"error_message": "Invalid API key provided"
}
}
Event Types
API Events
api_request- Successful API callsapi_error- Failed API requestsrate_limit_hit- Rate limit exceededquota_exceeded- Monthly quota reached
Security Events
authentication_failed- Invalid credentialssuspicious_activity- Unusual usage patternsprompt_injection_detected- Malicious input blockedcontent_filtered- Harmful content detected
Account Events
api_key_created- New API key generatedapi_key_rotated- Key rotation performedapi_key_revoked- Key manually revokedsettings_changed- Account configuration updated
System Events
service_error- Internal system errorsmaintenance_mode- Planned maintenanceperformance_alert- Slow response times
Filtering and Search
Time-based Filtering
# Get logs from last 24 hours
logs = client.audit.get_logs(
start_date=datetime.now() - timedelta(days=1),
end_date=datetime.now()
)
# Get logs for specific date range
logs = client.audit.get_logs(
start_date="2025-01-01T00:00:00Z",
end_date="2025-01-31T23:59:59Z"
)
Event Type Filtering
# Only security events
security_logs = client.audit.get_logs(
event_types=["security_event"]
)
# Multiple event types
important_logs = client.audit.get_logs(
event_types=["security_event", "api_error", "rate_limit_hit"]
)
Advanced Filtering
# Complex query with multiple filters
logs = client.audit.get_logs(
start_date="2025-01-01",
end_date="2025-01-31",
event_types=["api_request"],
filters={
"endpoint": "/v1/search",
"status_code": [200, 201],
"response_time_ms": {"gt": 1000}, # Slow requests
"ip_address": "192.168.1.100"
},
sort_by="timestamp",
sort_order="desc",
limit=500
)
Analytics and Insights
Usage Analytics
# Get usage statistics
stats = client.audit.get_usage_stats(
start_date="2025-01-01",
end_date="2025-01-31",
group_by="day"
)
print(f"Total Requests: {stats.total_requests}")
print(f"Average Response Time: {stats.avg_response_time_ms}ms")
print(f"Success Rate: {stats.success_rate}%")
Error Analysis
# Analyze error patterns
error_analysis = client.audit.analyze_errors(
start_date="2025-01-01",
end_date="2025-01-31"
)
for error in error_analysis.top_errors:
print(f"Error: {error.type}")
print(f"Count: {error.count}")
print(f"Percentage: {error.percentage}%")
Security Insights
# Security event summary
security_summary = client.audit.get_security_summary(
start_date="2025-01-01",
end_date="2025-01-31"
)
print(f"Total Security Events: {security_summary.total}")
print(f"High Severity: {security_summary.high_severity}")
print(f"Blocked IPs: {len(security_summary.blocked_ips)}")
Exporting Logs
Dashboard Export
- Navigate to Audit Logs in dashboard
- Apply desired filters
- Click Export button
- Choose format (JSON, CSV, Excel)
- Download file
API Export
# Export logs to file
logs = client.audit.get_logs(
start_date="2025-01-01",
end_date="2025-01-31"
)
# Save as JSON
with open('audit_logs.json', 'w') as f:
json.dump([log.to_dict() for log in logs], f, indent=2)
# Save as CSV
import pandas as pd
df = pd.DataFrame([log.to_dict() for log in logs])
df.to_csv('audit_logs.csv', index=False)
Automated Export (Enterprise)
# Schedule daily exports
client.audit.schedule_export(
schedule="daily",
time="02:00",
format="json",
destination="s3://your-bucket/audit-logs/",
filters={
"event_types": ["api_request", "security_event"]
}
)
Integration with SIEM
Splunk Integration
# Forward logs to Splunk
splunk_config = {
"host": "splunk.company.com",
"port": 8088,
"token": "your-hec-token",
"index": "cortex_audit"
}
client.audit.configure_forwarding("splunk", splunk_config)
ELK Stack Integration
# Send logs to Elasticsearch
elk_config = {
"host": "elasticsearch.company.com",
"port": 9200,
"index": "cortex-audit-logs",
"username": "elastic",
"password": "your-password"
}
client.audit.configure_forwarding("elasticsearch", elk_config)
Custom Webhook
# Forward to custom endpoint
webhook_config = {
"url": "https://your-siem.company.com/webhook",
"headers": {
"Authorization": "Bearer your-token",
"Content-Type": "application/json"
},
"batch_size": 100,
"retry_count": 3
}
client.audit.configure_forwarding("webhook", webhook_config)
Real-time Monitoring
Setting Up Alerts
# Create alert rules
alert_rule = client.audit.create_alert(
name="High Error Rate",
condition={
"event_type": "api_error",
"count": {"gt": 10},
"window": "5m"
},
actions=[
{
"type": "email",
"recipients": ["admin@company.com"]
},
{
"type": "slack",
"webhook": "https://hooks.slack.com/your-webhook"
}
]
)
Custom Monitoring
# Monitor specific patterns
def monitor_unusual_activity():
recent_logs = client.audit.get_logs(
start_date=datetime.now() - timedelta(minutes=5),
event_types=["api_request"]
)
# Check for unusual patterns
ip_counts = {}
for log in recent_logs:
ip = log.metadata.ip_address
ip_counts[ip] = ip_counts.get(ip, 0) + 1
# Alert on suspicious activity
for ip, count in ip_counts.items():
if count > 100: # More than 100 requests in 5 minutes
send_security_alert(f"Suspicious activity from {ip}: {count} requests")
# Run monitoring every 5 minutes
schedule.every(5).minutes.do(monitor_unusual_activity)
Compliance Features
GDPR Compliance
# Request user data deletion
client.audit.delete_user_data(
user_id="user_123",
reason="GDPR deletion request"
)
# Export user's audit history
user_logs = client.audit.get_user_logs(
user_id="user_123",
include_deleted=False
)
SOX Compliance (Enterprise)
- Immutable audit trails
- Digital signatures on log entries
- Segregation of duties tracking
- Financial data access monitoring
HIPAA Compliance (Enterprise)
- PHI access logging
- Minimum necessary principle tracking
- Breach notification automation
- Audit trail integrity verification
Best Practices
Regular Review
# Weekly security review
def weekly_security_review():
# Get security events from last week
security_events = client.audit.get_logs(
start_date=datetime.now() - timedelta(days=7),
event_types=["security_event"]
)
# Analyze patterns
analysis = analyze_security_patterns(security_events)
# Generate report
report = generate_security_report(analysis)
# Send to security team
send_report(report, recipients=["security@company.com"])
schedule.every().monday.at("09:00").do(weekly_security_review)
Log Retention Strategy
# Configure appropriate retention periods
retention_config = {
"api_requests": "90d", # Standard business records
"security_events": "2y", # Security incidents
"authentication": "1y", # Access records
"configuration": "7y" # Compliance requirements
}
client.audit.configure_retention(retention_config)
Performance Monitoring
# Monitor API performance trends
def analyze_performance():
logs = client.audit.get_logs(
start_date=datetime.now() - timedelta(days=30),
event_types=["api_request"]
)
# Calculate metrics
response_times = [log.response.response_time_ms for log in logs]
avg_response_time = sum(response_times) / len(response_times)
# Alert on degradation
if avg_response_time > 1000: # 1 second threshold
send_performance_alert(f"Average response time: {avg_response_time}ms")
For advanced audit log configuration, contact enterprise@usecortex.co