Skip to main content

Audit Logs

Cortex provides comprehensive audit logging to help you monitor API usage, track security events, and maintain compliance. All requests and responses are logged with detailed metadata for analysis and troubleshooting.

Overview

Audit logs capture every interaction with the Cortex API, including:

  • API Requests: Query details, parameters, and metadata
  • Responses: Results, sources, confidence scores
  • Security Events: Authentication failures, rate limits, suspicious activity
  • System Events: Key rotations, configuration changes

Log Retention by Plan

PlanRetention PeriodSearch HistoryExport Options
Free7 daysLast 100 requestsJSON download
Starter30 daysLast 1,000 requestsJSON, CSV
Pro90 daysLast 10,000 requestsJSON, CSV, API
EnterpriseCustom (up to 7 years)UnlimitedAll formats + SIEM

Accessing Audit Logs

Dashboard

  1. Log into your Cortex Dashboard
  2. Navigate to Audit Logs
  3. Use filters to find specific events
  4. Export logs for external analysis

API Access (Pro+)

import cortex

client = cortex.Client(api_key="your_api_key")

# Get recent audit logs
logs = client.audit.get_logs(
start_date="2025-01-01",
end_date="2025-01-31",
event_types=["api_request", "security_event"],
limit=1000
)

for log in logs:
print(f"Event: {log.event_type}")
print(f"Timestamp: {log.timestamp}")
print(f"Details: {log.details}")

Real-time Streaming (Enterprise)

# Stream audit logs in real-time
def handle_audit_event(event):
if event.type == "security_event":
send_alert(event)

store_in_siem(event)

# Subscribe to audit stream
client.audit.stream_logs(callback=handle_audit_event)

Log Structure

Standard Request Log

{
"id": "log_1234567890abcdef",
"timestamp": "2025-01-15T10:30:45.123Z",
"event_type": "api_request",
"api_key_id": "key_abcdef123456",
"endpoint": "/v1/search",
"method": "POST",
"request": {
"query": "latest AI developments",
"max_results": 5,
"profile": "tech_news",
"user_agent": "cortex-python-sdk/1.0.0"
},
"response": {
"status_code": 200,
"response_time_ms": 245,
"sources_found": 8,
"sources_used": 5,
"confidence_score": 0.94,
"tokens_used": 1247
},
"metadata": {
"ip_address": "192.168.1.100",
"user_id": "user_123",
"session_id": "sess_456",
"trace_id": "trace_789"
}
}

Security Event Log

{
"id": "log_9876543210fedcba",
"timestamp": "2025-01-15T10:31:02.456Z",
"event_type": "security_event",
"severity": "medium",
"category": "authentication",
"event": "invalid_api_key",
"details": {
"api_key_prefix": "ctx_invalid...",
"ip_address": "203.0.113.42",
"user_agent": "suspicious-bot/1.0",
"attempted_endpoint": "/v1/search"
},
"response": {
"status_code": 401,
"error_message": "Invalid API key provided"
}
}

Event Types

API Events

  • api_request - Successful API calls
  • api_error - Failed API requests
  • rate_limit_hit - Rate limit exceeded
  • quota_exceeded - Monthly quota reached

Security Events

  • authentication_failed - Invalid credentials
  • suspicious_activity - Unusual usage patterns
  • prompt_injection_detected - Malicious input blocked
  • content_filtered - Harmful content detected

Account Events

  • api_key_created - New API key generated
  • api_key_rotated - Key rotation performed
  • api_key_revoked - Key manually revoked
  • settings_changed - Account configuration updated

System Events

  • service_error - Internal system errors
  • maintenance_mode - Planned maintenance
  • performance_alert - Slow response times

Time-based Filtering

# Get logs from last 24 hours
logs = client.audit.get_logs(
start_date=datetime.now() - timedelta(days=1),
end_date=datetime.now()
)

# Get logs for specific date range
logs = client.audit.get_logs(
start_date="2025-01-01T00:00:00Z",
end_date="2025-01-31T23:59:59Z"
)

Event Type Filtering

# Only security events
security_logs = client.audit.get_logs(
event_types=["security_event"]
)

# Multiple event types
important_logs = client.audit.get_logs(
event_types=["security_event", "api_error", "rate_limit_hit"]
)

Advanced Filtering

# Complex query with multiple filters
logs = client.audit.get_logs(
start_date="2025-01-01",
end_date="2025-01-31",
event_types=["api_request"],
filters={
"endpoint": "/v1/search",
"status_code": [200, 201],
"response_time_ms": {"gt": 1000}, # Slow requests
"ip_address": "192.168.1.100"
},
sort_by="timestamp",
sort_order="desc",
limit=500
)

Analytics and Insights

Usage Analytics

# Get usage statistics
stats = client.audit.get_usage_stats(
start_date="2025-01-01",
end_date="2025-01-31",
group_by="day"
)

print(f"Total Requests: {stats.total_requests}")
print(f"Average Response Time: {stats.avg_response_time_ms}ms")
print(f"Success Rate: {stats.success_rate}%")

Error Analysis

# Analyze error patterns
error_analysis = client.audit.analyze_errors(
start_date="2025-01-01",
end_date="2025-01-31"
)

for error in error_analysis.top_errors:
print(f"Error: {error.type}")
print(f"Count: {error.count}")
print(f"Percentage: {error.percentage}%")

Security Insights

# Security event summary
security_summary = client.audit.get_security_summary(
start_date="2025-01-01",
end_date="2025-01-31"
)

print(f"Total Security Events: {security_summary.total}")
print(f"High Severity: {security_summary.high_severity}")
print(f"Blocked IPs: {len(security_summary.blocked_ips)}")

Exporting Logs

Dashboard Export

  1. Navigate to Audit Logs in dashboard
  2. Apply desired filters
  3. Click Export button
  4. Choose format (JSON, CSV, Excel)
  5. Download file

API Export

# Export logs to file
logs = client.audit.get_logs(
start_date="2025-01-01",
end_date="2025-01-31"
)

# Save as JSON
with open('audit_logs.json', 'w') as f:
json.dump([log.to_dict() for log in logs], f, indent=2)

# Save as CSV
import pandas as pd
df = pd.DataFrame([log.to_dict() for log in logs])
df.to_csv('audit_logs.csv', index=False)

Automated Export (Enterprise)

# Schedule daily exports
client.audit.schedule_export(
schedule="daily",
time="02:00",
format="json",
destination="s3://your-bucket/audit-logs/",
filters={
"event_types": ["api_request", "security_event"]
}
)

Integration with SIEM

Splunk Integration

# Forward logs to Splunk
splunk_config = {
"host": "splunk.company.com",
"port": 8088,
"token": "your-hec-token",
"index": "cortex_audit"
}

client.audit.configure_forwarding("splunk", splunk_config)

ELK Stack Integration

# Send logs to Elasticsearch
elk_config = {
"host": "elasticsearch.company.com",
"port": 9200,
"index": "cortex-audit-logs",
"username": "elastic",
"password": "your-password"
}

client.audit.configure_forwarding("elasticsearch", elk_config)

Custom Webhook

# Forward to custom endpoint
webhook_config = {
"url": "https://your-siem.company.com/webhook",
"headers": {
"Authorization": "Bearer your-token",
"Content-Type": "application/json"
},
"batch_size": 100,
"retry_count": 3
}

client.audit.configure_forwarding("webhook", webhook_config)

Real-time Monitoring

Setting Up Alerts

# Create alert rules
alert_rule = client.audit.create_alert(
name="High Error Rate",
condition={
"event_type": "api_error",
"count": {"gt": 10},
"window": "5m"
},
actions=[
{
"type": "email",
"recipients": ["admin@company.com"]
},
{
"type": "slack",
"webhook": "https://hooks.slack.com/your-webhook"
}
]
)

Custom Monitoring

# Monitor specific patterns
def monitor_unusual_activity():
recent_logs = client.audit.get_logs(
start_date=datetime.now() - timedelta(minutes=5),
event_types=["api_request"]
)

# Check for unusual patterns
ip_counts = {}
for log in recent_logs:
ip = log.metadata.ip_address
ip_counts[ip] = ip_counts.get(ip, 0) + 1

# Alert on suspicious activity
for ip, count in ip_counts.items():
if count > 100: # More than 100 requests in 5 minutes
send_security_alert(f"Suspicious activity from {ip}: {count} requests")

# Run monitoring every 5 minutes
schedule.every(5).minutes.do(monitor_unusual_activity)

Compliance Features

GDPR Compliance

# Request user data deletion
client.audit.delete_user_data(
user_id="user_123",
reason="GDPR deletion request"
)

# Export user's audit history
user_logs = client.audit.get_user_logs(
user_id="user_123",
include_deleted=False
)

SOX Compliance (Enterprise)

  • Immutable audit trails
  • Digital signatures on log entries
  • Segregation of duties tracking
  • Financial data access monitoring

HIPAA Compliance (Enterprise)

  • PHI access logging
  • Minimum necessary principle tracking
  • Breach notification automation
  • Audit trail integrity verification

Best Practices

Regular Review

# Weekly security review
def weekly_security_review():
# Get security events from last week
security_events = client.audit.get_logs(
start_date=datetime.now() - timedelta(days=7),
event_types=["security_event"]
)

# Analyze patterns
analysis = analyze_security_patterns(security_events)

# Generate report
report = generate_security_report(analysis)

# Send to security team
send_report(report, recipients=["security@company.com"])

schedule.every().monday.at("09:00").do(weekly_security_review)

Log Retention Strategy

# Configure appropriate retention periods
retention_config = {
"api_requests": "90d", # Standard business records
"security_events": "2y", # Security incidents
"authentication": "1y", # Access records
"configuration": "7y" # Compliance requirements
}

client.audit.configure_retention(retention_config)

Performance Monitoring

# Monitor API performance trends
def analyze_performance():
logs = client.audit.get_logs(
start_date=datetime.now() - timedelta(days=30),
event_types=["api_request"]
)

# Calculate metrics
response_times = [log.response.response_time_ms for log in logs]
avg_response_time = sum(response_times) / len(response_times)

# Alert on degradation
if avg_response_time > 1000: # 1 second threshold
send_performance_alert(f"Average response time: {avg_response_time}ms")

For advanced audit log configuration, contact enterprise@usecortex.co