Prerequisites: Understanding of Core Concepts and Architecture Overview
Overview
Deploying RunAgent agents to production requires careful consideration of performance, reliability, security, and scalability. This guide covers the essential aspects of production deployment.Performance Optimization
1. Agent Performance
Efficient Entrypoints
Copy
# Good: Efficient, focused function
def process_data(data: List[Dict]) -> Dict[str, Any]:
"""Process data efficiently"""
result = {}
for item in data:
result[item['id']] = item['value'] * 2
return result
# Bad: Inefficient, does too much
def process_data_slow(data: List[Dict]) -> Dict[str, Any]:
"""Inefficient processing"""
result = {}
for item in data:
# Unnecessary sleep
time.sleep(0.1)
# Redundant operations
temp = item.copy()
temp['processed'] = True
result[item['id']] = temp['value'] * 2
return result
Caching Strategies
Copy
from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def expensive_computation(data_hash: str) -> str:
"""Cache expensive computations"""
# Expensive computation here
return f"result_for_{data_hash}"
def process_with_cache(data: str) -> str:
"""Process data with caching"""
data_hash = hashlib.md5(data.encode()).hexdigest()
return expensive_computation(data_hash)
Memory Management
Copy
import gc
from typing import Iterator
def process_large_dataset(data: Iterator[Dict]) -> Iterator[Dict]:
"""Process large datasets efficiently"""
for item in data:
# Process item
result = process_item(item)
yield result
# Clean up periodically
if random.random() < 0.1: # 10% chance
gc.collect()
2. Network Performance
Connection Pooling
Copy
from runagent import RunAgentClient
import asyncio
class OptimizedAgentClient:
def __init__(self, agent_id: str, pool_size: int = 10):
self.pool = asyncio.Queue(maxsize=pool_size)
self.agent_id = agent_id
self._initialize_pool()
async def _initialize_pool(self):
"""Initialize connection pool"""
for _ in range(self.pool.maxsize):
client = RunAgentClient(
agent_id=self.agent_id,
entrypoint_tag="main",
local=False
)
await self.pool.put(client)
async def get_client(self):
"""Get client from pool"""
return await self.pool.get()
async def return_client(self, client):
"""Return client to pool"""
await self.pool.put(client)
Request Batching
Copy
async def batch_requests(requests: List[Dict]) -> List[Dict]:
"""Batch multiple requests together"""
client = RunAgentClient(agent_id="your_agent", entrypoint_tag="batch")
# Send batched request
result = await client.run(requests=requests)
return result['responses']
3. Database Optimization
Connection Management
Copy
import asyncpg
from contextlib import asynccontextmanager
class DatabaseManager:
def __init__(self, connection_string: str, pool_size: int = 20):
self.connection_string = connection_string
self.pool_size = pool_size
self.pool = None
async def initialize(self):
"""Initialize connection pool"""
self.pool = await asyncpg.create_pool(
self.connection_string,
min_size=5,
max_size=self.pool_size
)
@asynccontextmanager
async def get_connection(self):
"""Get database connection"""
async with self.pool.acquire() as connection:
yield connection
Reliability and Fault Tolerance
1. Error Handling
Comprehensive Error Handling
Copy
import logging
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
def robust_agent(data: Dict[str, Any]) -> Dict[str, Any]:
"""Agent with comprehensive error handling"""
try:
# Validate input
if not data.get('message'):
return {
'error': 'Missing required field: message',
'status': 'error'
}
# Process data
result = process_data(data)
return {
'result': result,
'status': 'success'
}
except ValueError as e:
logger.error(f"Validation error: {e}")
return {
'error': f'Validation error: {str(e)}',
'status': 'error'
}
except Exception as e:
logger.error(f"Unexpected error: {e}")
return {
'error': 'Internal server error',
'status': 'error'
}
Retry Logic
Copy
import asyncio
from typing import Callable, Any
async def retry_with_backoff(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0
) -> Any:
"""Retry function with exponential backoff"""
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
if attempt == max_retries - 1:
raise e
delay = base_delay * (2 ** attempt)
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s")
await asyncio.sleep(delay)
2. Health Checks
Agent Health Monitoring
Copy
import time
from typing import Dict, Any
class HealthMonitor:
def __init__(self):
self.start_time = time.time()
self.request_count = 0
self.error_count = 0
def health_check(self) -> Dict[str, Any]:
"""Comprehensive health check"""
uptime = time.time() - self.start_time
error_rate = self.error_count / max(self.request_count, 1)
return {
'status': 'healthy' if error_rate < 0.1 else 'unhealthy',
'uptime': uptime,
'request_count': self.request_count,
'error_count': self.error_count,
'error_rate': error_rate,
'timestamp': time.time()
}
def record_request(self, success: bool):
"""Record request for health monitoring"""
self.request_count += 1
if not success:
self.error_count += 1
3. Circuit Breaker Pattern
Copy
import asyncio
from enum import Enum
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout: float = 60.0):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func: Callable) -> Any:
"""Call function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is open")
try:
result = await func()
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _on_success(self):
"""Handle successful call"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Security Considerations
1. Input Validation
Comprehensive Input Validation
Copy
from typing import Dict, Any, List
import re
def validate_input(data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and sanitize input data"""
errors = []
# Validate required fields
if 'message' not in data:
errors.append("Missing required field: message")
# Validate message content
if 'message' in data:
message = data['message']
if not isinstance(message, str):
errors.append("Message must be a string")
elif len(message) > 10000:
errors.append("Message too long")
elif not re.match(r'^[a-zA-Z0-9\s.,!?]+$', message):
errors.append("Message contains invalid characters")
# Validate user_id
if 'user_id' in data:
user_id = data['user_id']
if not isinstance(user_id, str):
errors.append("User ID must be a string")
elif not re.match(r'^[a-zA-Z0-9_-]+$', user_id):
errors.append("User ID contains invalid characters")
if errors:
return {'valid': False, 'errors': errors}
return {'valid': True, 'data': data}
2. Authentication and Authorization
API Key Management
Copy
import hashlib
import hmac
from typing import Optional
class APIKeyManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
def generate_key(self, user_id: str) -> str:
"""Generate API key for user"""
message = f"user:{user_id}:{int(time.time())}"
signature = hmac.new(
self.secret_key.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
return f"{user_id}:{signature}"
def validate_key(self, api_key: str) -> Optional[str]:
"""Validate API key and return user ID"""
try:
user_id, signature = api_key.split(':', 1)
# Validate signature
# Implementation details...
return user_id
except:
return None
3. Data Protection
Data Encryption
Copy
from cryptography.fernet import Fernet
import base64
class DataEncryption:
def __init__(self, key: bytes):
self.cipher = Fernet(key)
def encrypt(self, data: str) -> str:
"""Encrypt sensitive data"""
encrypted = self.cipher.encrypt(data.encode())
return base64.b64encode(encrypted).decode()
def decrypt(self, encrypted_data: str) -> str:
"""Decrypt sensitive data"""
encrypted = base64.b64decode(encrypted_data.encode())
return self.cipher.decrypt(encrypted).decode()
Monitoring and Observability
1. Logging
Structured Logging
Copy
import logging
import json
from datetime import datetime
class StructuredLogger:
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# Configure JSON formatter
handler = logging.StreamHandler()
formatter = logging.Formatter('%(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_request(self, request_id: str, user_id: str, message: str):
"""Log request with structured data"""
self.logger.info(json.dumps({
'timestamp': datetime.utcnow().isoformat(),
'level': 'INFO',
'event': 'request',
'request_id': request_id,
'user_id': user_id,
'message_length': len(message)
}))
def log_response(self, request_id: str, status: str, duration: float):
"""Log response with structured data"""
self.logger.info(json.dumps({
'timestamp': datetime.utcnow().isoformat(),
'level': 'INFO',
'event': 'response',
'request_id': request_id,
'status': status,
'duration_ms': duration * 1000
}))
2. Metrics Collection
Custom Metrics
Copy
import time
from typing import Dict, Any
class MetricsCollector:
def __init__(self):
self.metrics = {
'request_count': 0,
'error_count': 0,
'total_duration': 0.0,
'last_request_time': None
}
def record_request(self, duration: float, success: bool):
"""Record request metrics"""
self.metrics['request_count'] += 1
self.metrics['total_duration'] += duration
self.metrics['last_request_time'] = time.time()
if not success:
self.metrics['error_count'] += 1
def get_metrics(self) -> Dict[str, Any]:
"""Get current metrics"""
avg_duration = (
self.metrics['total_duration'] /
max(self.metrics['request_count'], 1)
)
return {
'request_count': self.metrics['request_count'],
'error_count': self.metrics['error_count'],
'error_rate': (
self.metrics['error_count'] /
max(self.metrics['request_count'], 1)
),
'average_duration': avg_duration,
'last_request_time': self.metrics['last_request_time']
}
3. Alerting
Alert Configuration
Copy
class AlertManager:
def __init__(self, alert_thresholds: Dict[str, float]):
self.thresholds = alert_thresholds
self.alerts_sent = set()
def check_metrics(self, metrics: Dict[str, Any]):
"""Check metrics against thresholds"""
# Check error rate
if metrics['error_rate'] > self.thresholds['error_rate']:
self._send_alert('high_error_rate', metrics)
# Check response time
if metrics['average_duration'] > self.thresholds['response_time']:
self._send_alert('slow_response', metrics)
def _send_alert(self, alert_type: str, metrics: Dict[str, Any]):
"""Send alert if not already sent"""
alert_key = f"{alert_type}_{int(time.time() / 300)}" # 5-minute buckets
if alert_key not in self.alerts_sent:
# Send alert (email, Slack, etc.)
self._send_notification(alert_type, metrics)
self.alerts_sent.add(alert_key)
Scalability
1. Horizontal Scaling
Load Balancing
Copy
import random
from typing import List
class LoadBalancer:
def __init__(self, agent_instances: List[str]):
self.instances = agent_instances
self.current_index = 0
def get_instance(self) -> str:
"""Get next available instance"""
instance = self.instances[self.current_index]
self.current_index = (self.current_index + 1) % len(self.instances)
return instance
def get_random_instance(self) -> str:
"""Get random instance"""
return random.choice(self.instances)
2. Auto-scaling
Scaling Logic
Copy
class AutoScaler:
def __init__(self, min_instances: int = 1, max_instances: int = 10):
self.min_instances = min_instances
self.max_instances = max_instances
self.current_instances = min_instances
def should_scale_up(self, metrics: Dict[str, Any]) -> bool:
"""Determine if should scale up"""
return (
metrics['request_count'] > 1000 and
metrics['average_duration'] > 2.0 and
self.current_instances < self.max_instances
)
def should_scale_down(self, metrics: Dict[str, Any]) -> bool:
"""Determine if should scale down"""
return (
metrics['request_count'] < 100 and
metrics['average_duration'] < 0.5 and
self.current_instances > self.min_instances
)
Deployment Best Practices
1. Environment Configuration
Environment-Specific Settings
Copy
import os
from typing import Dict, Any
class EnvironmentConfig:
def __init__(self):
self.env = os.getenv('ENVIRONMENT', 'development')
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load environment-specific configuration"""
configs = {
'development': {
'debug': True,
'log_level': 'DEBUG',
'max_instances': 1,
'timeout': 30
},
'staging': {
'debug': False,
'log_level': 'INFO',
'max_instances': 3,
'timeout': 60
},
'production': {
'debug': False,
'log_level': 'WARNING',
'max_instances': 10,
'timeout': 120
}
}
return configs.get(self.env, configs['development'])
2. Health Checks
Comprehensive Health Checks
Copy
async def health_check() -> Dict[str, Any]:
"""Comprehensive health check"""
checks = {
'database': await check_database(),
'cache': await check_cache(),
'external_apis': await check_external_apis(),
'disk_space': check_disk_space(),
'memory': check_memory()
}
overall_status = 'healthy' if all(checks.values()) else 'unhealthy'
return {
'status': overall_status,
'checks': checks,
'timestamp': datetime.utcnow().isoformat()
}
Next Steps
Security
Learn about RunAgent’s security model
Cloud Deployment
Deploy your agents to the cloud
Advanced Tasks
Learn advanced production patterns
Monitoring
Set up comprehensive monitoring
🎉 Great work! You now understand the essential considerations for deploying RunAgent agents to production. These practices will help you build reliable, scalable, and secure AI agent systems!
Still have a question?
- Join our Discord Community
- Email us: [email protected]
- Follow us on X
- New here? Sign up