Prerequisites: Understanding of Core Concepts and Architecture Overview
Overview
RunAgent is designed with security as a fundamental principle. This guide covers RunAgent’s security model, best practices, and how to secure your agents and data.Security Architecture
1. Multi-Layer Security Model
2. Security Boundaries
- Network Boundary: API Gateway with authentication and rate limiting
- Process Boundary: Firecracker microVMs with complete isolation
- Data Boundary: Encrypted data transmission and storage
- Code Boundary: Sandboxed execution environment
Authentication and Authorization
1. API Key Management
API Key Structure
import hashlib
import hmac
import time
from typing import Optional
class APIKeyManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
def generate_key(self, user_id: str, permissions: List[str]) -> str:
"""Generate API key with permissions"""
timestamp = int(time.time())
message = f"{user_id}:{':'.join(permissions)}:{timestamp}"
signature = hmac.new(
self.secret_key.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
return f"{user_id}:{signature}:{timestamp}"
def validate_key(self, api_key: str) -> Optional[Dict[str, Any]]:
"""Validate API key and return user info"""
try:
user_id, signature, timestamp = api_key.split(':')
# Check if key is expired (24 hours)
if time.time() - int(timestamp) > 86400:
return None
# Validate signature
expected_message = f"{user_id}:{timestamp}"
expected_signature = hmac.new(
self.secret_key.encode(),
expected_message.encode(),
hashlib.sha256
).hexdigest()
if hmac.compare_digest(signature, expected_signature):
return {
'user_id': user_id,
'permissions': ['read', 'write'], # Default permissions
'expires_at': int(timestamp) + 86400
}
return None
except:
return None
Role-Based Access Control
from enum import Enum
from typing import List, Dict, Any
class Permission(Enum):
READ = "read"
WRITE = "write"
ADMIN = "admin"
STREAM = "stream"
class Role:
def __init__(self, name: str, permissions: List[Permission]):
self.name = name
self.permissions = permissions
def has_permission(self, permission: Permission) -> bool:
return permission in self.permissions
class User:
def __init__(self, user_id: str, role: Role):
self.user_id = user_id
self.role = role
def can_access(self, permission: Permission) -> bool:
return self.role.has_permission(permission)
# Define roles
ROLES = {
'viewer': Role('viewer', [Permission.READ]),
'user': Role('user', [Permission.READ, Permission.WRITE]),
'admin': Role('admin', [Permission.READ, Permission.WRITE, Permission.ADMIN]),
'streamer': Role('streamer', [Permission.READ, Permission.STREAM])
}
2. OAuth 2.0 Integration
OAuth Provider Integration
import requests
from typing import Optional, Dict, Any
class OAuthProvider:
def __init__(self, client_id: str, client_secret: str, provider_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.provider_url = provider_url
def validate_token(self, access_token: str) -> Optional[Dict[str, Any]]:
"""Validate OAuth access token"""
try:
response = requests.get(
f"{self.provider_url}/userinfo",
headers={'Authorization': f'Bearer {access_token}'}
)
if response.status_code == 200:
user_info = response.json()
return {
'user_id': user_info.get('sub'),
'email': user_info.get('email'),
'name': user_info.get('name'),
'permissions': self._get_permissions(user_info)
}
return None
except:
return None
def _get_permissions(self, user_info: Dict[str, Any]) -> List[str]:
"""Extract permissions from user info"""
# Implementation depends on OAuth provider
return ['read', 'write']
Input Validation and Sanitization
1. Comprehensive Input Validation
import re
import json
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
@dataclass
class ValidationRule:
field: str
required: bool = False
max_length: Optional[int] = None
pattern: Optional[str] = None
data_type: type = str
class InputValidator:
def __init__(self, rules: List[ValidationRule]):
self.rules = {rule.field: rule for rule in rules}
def validate(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate input data against rules"""
errors = []
sanitized_data = {}
for field, rule in self.rules.items():
value = data.get(field)
# Check required fields
if rule.required and value is None:
errors.append(f"Required field '{field}' is missing")
continue
if value is None:
continue
# Check data type
if not isinstance(value, rule.data_type):
errors.append(f"Field '{field}' must be of type {rule.data_type.__name__}")
continue
# Check max length
if rule.max_length and len(str(value)) > rule.max_length:
errors.append(f"Field '{field}' exceeds maximum length of {rule.max_length}")
continue
# Check pattern
if rule.pattern and not re.match(rule.pattern, str(value)):
errors.append(f"Field '{field}' does not match required pattern")
continue
# Sanitize value
sanitized_value = self._sanitize_value(value, rule.data_type)
sanitized_data[field] = sanitized_value
if errors:
return {
'valid': False,
'errors': errors,
'data': None
}
return {
'valid': True,
'errors': [],
'data': sanitized_data
}
def _sanitize_value(self, value: Any, data_type: type) -> Any:
"""Sanitize value based on data type"""
if data_type == str:
# Remove potentially dangerous characters
return re.sub(r'[<>"\']', '', str(value))
elif data_type == int:
return int(value)
elif data_type == float:
return float(value)
elif data_type == bool:
return bool(value)
else:
return value
# Define validation rules
VALIDATION_RULES = [
ValidationRule('message', required=True, max_length=10000),
ValidationRule('user_id', required=True, pattern=r'^[a-zA-Z0-9_-]+$'),
ValidationRule('context', required=False, data_type=dict),
ValidationRule('stream', required=False, data_type=bool)
]
2. SQL Injection Prevention
import sqlite3
from typing import List, Dict, Any
class SecureDatabase:
def __init__(self, db_path: str):
self.db_path = db_path
def execute_query(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
"""Execute query with parameterized statements"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def get_user_data(self, user_id: str) -> List[Dict[str, Any]]:
"""Get user data safely"""
# Use parameterized query to prevent SQL injection
query = "SELECT * FROM users WHERE user_id = ?"
return self.execute_query(query, (user_id,))
3. XSS Prevention
import html
import re
from typing import str
class XSSPrevention:
@staticmethod
def sanitize_html(text: str) -> str:
"""Sanitize HTML to prevent XSS"""
# Escape HTML characters
sanitized = html.escape(text)
# Remove script tags
sanitized = re.sub(r'<script.*?</script>', '', sanitized, flags=re.DOTALL)
# Remove javascript: URLs
sanitized = re.sub(r'javascript:', '', sanitized, flags=re.IGNORECASE)
return sanitized
@staticmethod
def validate_url(url: str) -> bool:
"""Validate URL to prevent malicious redirects"""
# Only allow http and https protocols
if not re.match(r'^https?://', url):
return False
# Check for suspicious patterns
suspicious_patterns = [
r'javascript:',
r'data:',
r'vbscript:',
r'file:'
]
for pattern in suspicious_patterns:
if re.search(pattern, url, re.IGNORECASE):
return False
return True
Data Encryption
1. Data in Transit
TLS/SSL Configuration
import ssl
from typing import Dict, Any
class TLSConfig:
def __init__(self):
self.ssl_context = ssl.create_default_context()
self.ssl_context.check_hostname = True
self.ssl_context.verify_mode = ssl.CERT_REQUIRED
def get_secure_context(self) -> ssl.SSLContext:
"""Get secure SSL context"""
return self.ssl_context
def configure_ciphers(self, ciphers: List[str]):
"""Configure allowed ciphers"""
self.ssl_context.set_ciphers(':'.join(ciphers))
def set_certificate(self, cert_file: str, key_file: str):
"""Set certificate and key files"""
self.ssl_context.load_cert_chain(cert_file, key_file)
2. Data at Rest
Encryption for Sensitive Data
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
class DataEncryption:
def __init__(self, password: str):
self.key = self._derive_key(password)
self.cipher = Fernet(self.key)
def _derive_key(self, password: str) -> bytes:
"""Derive encryption key from password"""
salt = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key
def encrypt(self, data: str) -> str:
"""Encrypt sensitive data"""
encrypted = self.cipher.encrypt(data.encode())
return base64.b64encode(encrypted).decode()
def decrypt(self, encrypted_data: str) -> str:
"""Decrypt sensitive data"""
encrypted = base64.b64decode(encrypted_data.encode())
return self.cipher.decrypt(encrypted).decode()
Sandboxing and Isolation
1. Firecracker Security Model
Resource Limits
from typing import Dict, Any
class ResourceLimits:
def __init__(self):
self.limits = {
'cpu_quota': 100000, # CPU quota in microseconds
'memory_limit': 512 * 1024 * 1024, # 512MB memory limit
'disk_limit': 1024 * 1024 * 1024, # 1GB disk limit
'network_bandwidth': 10 * 1024 * 1024, # 10MB/s network limit
'max_processes': 100,
'max_file_descriptors': 1024
}
def get_limits(self) -> Dict[str, Any]:
"""Get resource limits for agent"""
return self.limits
def apply_limits(self, agent_id: str):
"""Apply resource limits to agent"""
# Implementation would apply limits to Firecracker VM
pass
Network Isolation
class NetworkIsolation:
def __init__(self):
self.allowed_hosts = [
'api.openai.com',
'api.anthropic.com',
'api.cohere.ai'
]
self.blocked_ports = [22, 23, 25, 53, 80, 443, 993, 995]
def is_allowed_host(self, host: str) -> bool:
"""Check if host is allowed"""
return host in self.allowed_hosts
def is_allowed_port(self, port: int) -> bool:
"""Check if port is allowed"""
return port not in self.blocked_ports
def create_network_policy(self, agent_id: str) -> Dict[str, Any]:
"""Create network policy for agent"""
return {
'agent_id': agent_id,
'allowed_hosts': self.allowed_hosts,
'blocked_ports': self.blocked_ports,
'max_connections': 10
}
2. Process Isolation
Sandbox Configuration
import subprocess
from typing import List, Dict, Any
class SandboxConfig:
def __init__(self):
self.restrictions = {
'no_network': False,
'no_file_system': False,
'no_process_creation': True,
'no_system_calls': True,
'read_only_root': True
}
def create_sandbox(self, agent_id: str) -> Dict[str, Any]:
"""Create sandbox configuration for agent"""
return {
'agent_id': agent_id,
'restrictions': self.restrictions,
'working_directory': f'/tmp/agent_{agent_id}',
'environment_variables': {
'PATH': '/usr/local/bin:/usr/bin:/bin',
'HOME': f'/tmp/agent_{agent_id}'
}
}
def execute_in_sandbox(self, command: List[str], sandbox_config: Dict[str, Any]) -> subprocess.CompletedProcess:
"""Execute command in sandbox"""
# Implementation would use Firecracker or similar
pass
Rate Limiting and DDoS Protection
1. Rate Limiting Implementation
import time
from typing import Dict, Any
from collections import defaultdict, deque
class RateLimiter:
def __init__(self, max_requests: int = 100, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(deque)
def is_allowed(self, client_id: str) -> bool:
"""Check if client is within rate limit"""
now = time.time()
client_requests = self.requests[client_id]
# Remove old requests outside window
while client_requests and client_requests[0] <= now - self.window_seconds:
client_requests.popleft()
# Check if under limit
if len(client_requests) < self.max_requests:
client_requests.append(now)
return True
return False
def get_remaining_requests(self, client_id: str) -> int:
"""Get remaining requests for client"""
now = time.time()
client_requests = self.requests[client_id]
# Remove old requests
while client_requests and client_requests[0] <= now - self.window_seconds:
client_requests.popleft()
return max(0, self.max_requests - len(client_requests))
2. DDoS Protection
class DDoSProtection:
def __init__(self):
self.suspicious_ips = set()
self.blocked_ips = set()
self.request_counts = defaultdict(int)
self.last_reset = time.time()
def check_request(self, ip: str, user_agent: str) -> bool:
"""Check if request should be allowed"""
now = time.time()
# Reset counters every minute
if now - self.last_reset > 60:
self.request_counts.clear()
self.last_reset = now
# Check if IP is blocked
if ip in self.blocked_ips:
return False
# Count requests from this IP
self.request_counts[ip] += 1
# Check for suspicious patterns
if self._is_suspicious(ip, user_agent):
self.suspicious_ips.add(ip)
return False
# Block IP if too many requests
if self.request_counts[ip] > 1000: # 1000 requests per minute
self.blocked_ips.add(ip)
return False
return True
def _is_suspicious(self, ip: str, user_agent: str) -> bool:
"""Check for suspicious request patterns"""
# Check for common bot patterns
bot_patterns = [
'python-requests',
'curl',
'wget',
'bot',
'crawler'
]
for pattern in bot_patterns:
if pattern.lower() in user_agent.lower():
return True
return False
Security Monitoring
1. Security Event Logging
import logging
import json
from datetime import datetime
from typing import Dict, Any
class SecurityLogger:
def __init__(self):
self.logger = logging.getLogger('security')
self.logger.setLevel(logging.INFO)
# Configure security log handler
handler = logging.FileHandler('security.log')
formatter = logging.Formatter('%(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_auth_failure(self, ip: str, user_id: str, reason: str):
"""Log authentication failure"""
self.logger.warning(json.dumps({
'timestamp': datetime.utcnow().isoformat(),
'event': 'auth_failure',
'ip': ip,
'user_id': user_id,
'reason': reason
}))
def log_rate_limit_exceeded(self, ip: str, user_id: str, limit: int):
"""Log rate limit exceeded"""
self.logger.warning(json.dumps({
'timestamp': datetime.utcnow().isoformat(),
'event': 'rate_limit_exceeded',
'ip': ip,
'user_id': user_id,
'limit': limit
}))
def log_suspicious_activity(self, ip: str, activity: str, details: Dict[str, Any]):
"""Log suspicious activity"""
self.logger.error(json.dumps({
'timestamp': datetime.utcnow().isoformat(),
'event': 'suspicious_activity',
'ip': ip,
'activity': activity,
'details': details
}))
2. Security Alerts
class SecurityAlertManager:
def __init__(self):
self.alert_thresholds = {
'auth_failures': 10, # 10 failures per minute
'rate_limit_exceeded': 5, # 5 rate limit violations per minute
'suspicious_activity': 3 # 3 suspicious activities per minute
}
self.alert_counts = defaultdict(int)
self.last_reset = time.time()
def check_alerts(self, event_type: str, ip: str):
"""Check if alert should be triggered"""
now = time.time()
# Reset counters every minute
if now - self.last_reset > 60:
self.alert_counts.clear()
self.last_reset = now
# Count events
key = f"{event_type}:{ip}"
self.alert_counts[key] += 1
# Check threshold
if self.alert_counts[key] >= self.alert_thresholds.get(event_type, 0):
self._send_alert(event_type, ip, self.alert_counts[key])
def _send_alert(self, event_type: str, ip: str, count: int):
"""Send security alert"""
# Implementation would send alert via email, Slack, etc.
print(f"SECURITY ALERT: {event_type} from {ip} ({count} occurrences)")
Compliance and Auditing
1. Audit Trail
class AuditTrail:
def __init__(self):
self.audit_log = []
def log_action(self, user_id: str, action: str, resource: str, details: Dict[str, Any]):
"""Log user action for auditing"""
audit_entry = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'action': action,
'resource': resource,
'details': details,
'ip_address': self._get_client_ip(),
'user_agent': self._get_user_agent()
}
self.audit_log.append(audit_entry)
def get_audit_log(self, user_id: str = None, start_date: str = None, end_date: str = None) -> List[Dict[str, Any]]:
"""Get audit log with optional filters"""
filtered_log = self.audit_log
if user_id:
filtered_log = [entry for entry in filtered_log if entry['user_id'] == user_id]
if start_date:
filtered_log = [entry for entry in filtered_log if entry['timestamp'] >= start_date]
if end_date:
filtered_log = [entry for entry in filtered_log if entry['timestamp'] <= end_date]
return filtered_log
2. Data Retention Policy
class DataRetentionManager:
def __init__(self, retention_days: int = 90):
self.retention_days = retention_days
def should_retain(self, timestamp: str) -> bool:
"""Check if data should be retained"""
data_time = datetime.fromisoformat(timestamp)
cutoff_time = datetime.utcnow() - timedelta(days=self.retention_days)
return data_time > cutoff_time
def cleanup_old_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Remove old data based on retention policy"""
return [entry for entry in data if self.should_retain(entry['timestamp'])]
Security Best Practices
1. Agent Development
# Secure agent development practices
def secure_agent(data: Dict[str, Any]) -> Dict[str, Any]:
"""Example of secure agent implementation"""
# 1. Validate input
validator = InputValidator(VALIDATION_RULES)
validation_result = validator.validate(data)
if not validation_result['valid']:
return {
'error': 'Invalid input',
'details': validation_result['errors']
}
# 2. Sanitize data
sanitized_data = validation_result['data']
# 3. Process data securely
try:
result = process_data_securely(sanitized_data)
# 4. Log action
audit_trail.log_action(
user_id=sanitized_data['user_id'],
action='agent_execution',
resource='data_processing',
details={'input_size': len(sanitized_data['message'])}
)
return {
'result': result,
'status': 'success'
}
except Exception as e:
# 5. Log error securely
security_logger.log_suspicious_activity(
ip=get_client_ip(),
activity='agent_error',
details={'error': str(e)}
)
return {
'error': 'Processing failed',
'status': 'error'
}
2. Configuration Security
# Secure configuration management
class SecureConfig:
def __init__(self):
self.config = self._load_secure_config()
def _load_secure_config(self) -> Dict[str, Any]:
"""Load configuration securely"""
return {
'api_key': os.getenv('RUNAGENT_API_KEY'),
'database_url': os.getenv('DATABASE_URL'),
'encryption_key': os.getenv('ENCRYPTION_KEY'),
'jwt_secret': os.getenv('JWT_SECRET')
}
def get_config(self, key: str) -> str:
"""Get configuration value securely"""
if key not in self.config:
raise ValueError(f"Configuration key '{key}' not found")
return self.config[key]
Next Steps
Production Considerations
Learn about production security requirements
Architecture Overview
Understand RunAgent’s security architecture
Advanced Tasks
Learn advanced security patterns
Compliance
Understand compliance requirements
🎉 Excellent! You now understand RunAgent’s security model and best practices. These security measures help protect your agents, data, and users from various threats!
Still have a question?
- Join our Discord Community
- Email us: [email protected]
- Follow us on X
- New here? Sign up