Skip to main content
Prerequisites: Understanding of Core Concepts and Architecture Overview

Overview

Deploying RunAgent agents to production requires careful consideration of performance, reliability, security, and scalability. This guide covers the essential aspects of production deployment.

Performance Optimization

1. Agent Performance

Efficient Entrypoints

# Good: Efficient, focused function
def process_data(data: List[Dict]) -> Dict[str, Any]:
    """Process data efficiently"""
    result = {}
    for item in data:
        result[item['id']] = item['value'] * 2
    return result

# Bad: Inefficient, does too much
def process_data_slow(data: List[Dict]) -> Dict[str, Any]:
    """Inefficient processing"""
    result = {}
    for item in data:
        # Unnecessary sleep
        time.sleep(0.1)
        # Redundant operations
        temp = item.copy()
        temp['processed'] = True
        result[item['id']] = temp['value'] * 2
    return result

Caching Strategies

from functools import lru_cache
import hashlib

@lru_cache(maxsize=1000)
def expensive_computation(data_hash: str) -> str:
    """Cache expensive computations"""
    # Expensive computation here
    return f"result_for_{data_hash}"

def process_with_cache(data: str) -> str:
    """Process data with caching"""
    data_hash = hashlib.md5(data.encode()).hexdigest()
    return expensive_computation(data_hash)

Memory Management

import gc
from typing import Iterator

def process_large_dataset(data: Iterator[Dict]) -> Iterator[Dict]:
    """Process large datasets efficiently"""
    for item in data:
        # Process item
        result = process_item(item)
        yield result
        
        # Clean up periodically
        if random.random() < 0.1:  # 10% chance
            gc.collect()

2. Network Performance

Connection Pooling

from runagent import RunAgentClient
import asyncio

class OptimizedAgentClient:
    def __init__(self, agent_id: str, pool_size: int = 10):
        self.pool = asyncio.Queue(maxsize=pool_size)
        self.agent_id = agent_id
        self._initialize_pool()
    
    async def _initialize_pool(self):
        """Initialize connection pool"""
        for _ in range(self.pool.maxsize):
            client = RunAgentClient(
                agent_id=self.agent_id,
                entrypoint_tag="main",
                local=False
            )
            await self.pool.put(client)
    
    async def get_client(self):
        """Get client from pool"""
        return await self.pool.get()
    
    async def return_client(self, client):
        """Return client to pool"""
        await self.pool.put(client)

Request Batching

async def batch_requests(requests: List[Dict]) -> List[Dict]:
    """Batch multiple requests together"""
    client = RunAgentClient(agent_id="your_agent", entrypoint_tag="batch")
    
    # Send batched request
    result = await client.run(requests=requests)
    return result['responses']

3. Database Optimization

Connection Management

import asyncpg
from contextlib import asynccontextmanager

class DatabaseManager:
    def __init__(self, connection_string: str, pool_size: int = 20):
        self.connection_string = connection_string
        self.pool_size = pool_size
        self.pool = None
    
    async def initialize(self):
        """Initialize connection pool"""
        self.pool = await asyncpg.create_pool(
            self.connection_string,
            min_size=5,
            max_size=self.pool_size
        )
    
    @asynccontextmanager
    async def get_connection(self):
        """Get database connection"""
        async with self.pool.acquire() as connection:
            yield connection

Reliability and Fault Tolerance

1. Error Handling

Comprehensive Error Handling

import logging
from typing import Dict, Any, Optional

logger = logging.getLogger(__name__)

def robust_agent(data: Dict[str, Any]) -> Dict[str, Any]:
    """Agent with comprehensive error handling"""
    try:
        # Validate input
        if not data.get('message'):
            return {
                'error': 'Missing required field: message',
                'status': 'error'
            }
        
        # Process data
        result = process_data(data)
        
        return {
            'result': result,
            'status': 'success'
        }
        
    except ValueError as e:
        logger.error(f"Validation error: {e}")
        return {
            'error': f'Validation error: {str(e)}',
            'status': 'error'
        }
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return {
            'error': 'Internal server error',
            'status': 'error'
        }

Retry Logic

import asyncio
from typing import Callable, Any

async def retry_with_backoff(
    func: Callable,
    max_retries: int = 3,
    base_delay: float = 1.0
) -> Any:
    """Retry function with exponential backoff"""
    for attempt in range(max_retries):
        try:
            return await func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise e
            
            delay = base_delay * (2 ** attempt)
            logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s")
            await asyncio.sleep(delay)

2. Health Checks

Agent Health Monitoring

import time
from typing import Dict, Any

class HealthMonitor:
    def __init__(self):
        self.start_time = time.time()
        self.request_count = 0
        self.error_count = 0
    
    def health_check(self) -> Dict[str, Any]:
        """Comprehensive health check"""
        uptime = time.time() - self.start_time
        error_rate = self.error_count / max(self.request_count, 1)
        
        return {
            'status': 'healthy' if error_rate < 0.1 else 'unhealthy',
            'uptime': uptime,
            'request_count': self.request_count,
            'error_count': self.error_count,
            'error_rate': error_rate,
            'timestamp': time.time()
        }
    
    def record_request(self, success: bool):
        """Record request for health monitoring"""
        self.request_count += 1
        if not success:
            self.error_count += 1

3. Circuit Breaker Pattern

import asyncio
from enum import Enum
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, timeout: float = 60.0):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    async def call(self, func: Callable) -> Any:
        """Call function with circuit breaker protection"""
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is open")
        
        try:
            result = await func()
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise e
    
    def _on_success(self):
        """Handle successful call"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        """Handle failed call"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

Security Considerations

1. Input Validation

Comprehensive Input Validation

from typing import Dict, Any, List
import re

def validate_input(data: Dict[str, Any]) -> Dict[str, Any]:
    """Validate and sanitize input data"""
    errors = []
    
    # Validate required fields
    if 'message' not in data:
        errors.append("Missing required field: message")
    
    # Validate message content
    if 'message' in data:
        message = data['message']
        if not isinstance(message, str):
            errors.append("Message must be a string")
        elif len(message) > 10000:
            errors.append("Message too long")
        elif not re.match(r'^[a-zA-Z0-9\s.,!?]+$', message):
            errors.append("Message contains invalid characters")
    
    # Validate user_id
    if 'user_id' in data:
        user_id = data['user_id']
        if not isinstance(user_id, str):
            errors.append("User ID must be a string")
        elif not re.match(r'^[a-zA-Z0-9_-]+$', user_id):
            errors.append("User ID contains invalid characters")
    
    if errors:
        return {'valid': False, 'errors': errors}
    
    return {'valid': True, 'data': data}

2. Authentication and Authorization

API Key Management

import hashlib
import hmac
from typing import Optional

class APIKeyManager:
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
    
    def generate_key(self, user_id: str) -> str:
        """Generate API key for user"""
        message = f"user:{user_id}:{int(time.time())}"
        signature = hmac.new(
            self.secret_key.encode(),
            message.encode(),
            hashlib.sha256
        ).hexdigest()
        return f"{user_id}:{signature}"
    
    def validate_key(self, api_key: str) -> Optional[str]:
        """Validate API key and return user ID"""
        try:
            user_id, signature = api_key.split(':', 1)
            # Validate signature
            # Implementation details...
            return user_id
        except:
            return None

3. Data Protection

Data Encryption

from cryptography.fernet import Fernet
import base64

class DataEncryption:
    def __init__(self, key: bytes):
        self.cipher = Fernet(key)
    
    def encrypt(self, data: str) -> str:
        """Encrypt sensitive data"""
        encrypted = self.cipher.encrypt(data.encode())
        return base64.b64encode(encrypted).decode()
    
    def decrypt(self, encrypted_data: str) -> str:
        """Decrypt sensitive data"""
        encrypted = base64.b64decode(encrypted_data.encode())
        return self.cipher.decrypt(encrypted).decode()

Monitoring and Observability

1. Logging

Structured Logging

import logging
import json
from datetime import datetime

class StructuredLogger:
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # Configure JSON formatter
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(message)s')
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def log_request(self, request_id: str, user_id: str, message: str):
        """Log request with structured data"""
        self.logger.info(json.dumps({
            'timestamp': datetime.utcnow().isoformat(),
            'level': 'INFO',
            'event': 'request',
            'request_id': request_id,
            'user_id': user_id,
            'message_length': len(message)
        }))
    
    def log_response(self, request_id: str, status: str, duration: float):
        """Log response with structured data"""
        self.logger.info(json.dumps({
            'timestamp': datetime.utcnow().isoformat(),
            'level': 'INFO',
            'event': 'response',
            'request_id': request_id,
            'status': status,
            'duration_ms': duration * 1000
        }))

2. Metrics Collection

Custom Metrics

import time
from typing import Dict, Any

class MetricsCollector:
    def __init__(self):
        self.metrics = {
            'request_count': 0,
            'error_count': 0,
            'total_duration': 0.0,
            'last_request_time': None
        }
    
    def record_request(self, duration: float, success: bool):
        """Record request metrics"""
        self.metrics['request_count'] += 1
        self.metrics['total_duration'] += duration
        self.metrics['last_request_time'] = time.time()
        
        if not success:
            self.metrics['error_count'] += 1
    
    def get_metrics(self) -> Dict[str, Any]:
        """Get current metrics"""
        avg_duration = (
            self.metrics['total_duration'] / 
            max(self.metrics['request_count'], 1)
        )
        
        return {
            'request_count': self.metrics['request_count'],
            'error_count': self.metrics['error_count'],
            'error_rate': (
                self.metrics['error_count'] / 
                max(self.metrics['request_count'], 1)
            ),
            'average_duration': avg_duration,
            'last_request_time': self.metrics['last_request_time']
        }

3. Alerting

Alert Configuration

class AlertManager:
    def __init__(self, alert_thresholds: Dict[str, float]):
        self.thresholds = alert_thresholds
        self.alerts_sent = set()
    
    def check_metrics(self, metrics: Dict[str, Any]):
        """Check metrics against thresholds"""
        # Check error rate
        if metrics['error_rate'] > self.thresholds['error_rate']:
            self._send_alert('high_error_rate', metrics)
        
        # Check response time
        if metrics['average_duration'] > self.thresholds['response_time']:
            self._send_alert('slow_response', metrics)
    
    def _send_alert(self, alert_type: str, metrics: Dict[str, Any]):
        """Send alert if not already sent"""
        alert_key = f"{alert_type}_{int(time.time() / 300)}"  # 5-minute buckets
        
        if alert_key not in self.alerts_sent:
            # Send alert (email, Slack, etc.)
            self._send_notification(alert_type, metrics)
            self.alerts_sent.add(alert_key)

Scalability

1. Horizontal Scaling

Load Balancing

import random
from typing import List

class LoadBalancer:
    def __init__(self, agent_instances: List[str]):
        self.instances = agent_instances
        self.current_index = 0
    
    def get_instance(self) -> str:
        """Get next available instance"""
        instance = self.instances[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.instances)
        return instance
    
    def get_random_instance(self) -> str:
        """Get random instance"""
        return random.choice(self.instances)

2. Auto-scaling

Scaling Logic

class AutoScaler:
    def __init__(self, min_instances: int = 1, max_instances: int = 10):
        self.min_instances = min_instances
        self.max_instances = max_instances
        self.current_instances = min_instances
    
    def should_scale_up(self, metrics: Dict[str, Any]) -> bool:
        """Determine if should scale up"""
        return (
            metrics['request_count'] > 1000 and
            metrics['average_duration'] > 2.0 and
            self.current_instances < self.max_instances
        )
    
    def should_scale_down(self, metrics: Dict[str, Any]) -> bool:
        """Determine if should scale down"""
        return (
            metrics['request_count'] < 100 and
            metrics['average_duration'] < 0.5 and
            self.current_instances > self.min_instances
        )

Deployment Best Practices

1. Environment Configuration

Environment-Specific Settings

import os
from typing import Dict, Any

class EnvironmentConfig:
    def __init__(self):
        self.env = os.getenv('ENVIRONMENT', 'development')
        self.config = self._load_config()
    
    def _load_config(self) -> Dict[str, Any]:
        """Load environment-specific configuration"""
        configs = {
            'development': {
                'debug': True,
                'log_level': 'DEBUG',
                'max_instances': 1,
                'timeout': 30
            },
            'staging': {
                'debug': False,
                'log_level': 'INFO',
                'max_instances': 3,
                'timeout': 60
            },
            'production': {
                'debug': False,
                'log_level': 'WARNING',
                'max_instances': 10,
                'timeout': 120
            }
        }
        return configs.get(self.env, configs['development'])

2. Health Checks

Comprehensive Health Checks

async def health_check() -> Dict[str, Any]:
    """Comprehensive health check"""
    checks = {
        'database': await check_database(),
        'cache': await check_cache(),
        'external_apis': await check_external_apis(),
        'disk_space': check_disk_space(),
        'memory': check_memory()
    }
    
    overall_status = 'healthy' if all(checks.values()) else 'unhealthy'
    
    return {
        'status': overall_status,
        'checks': checks,
        'timestamp': datetime.utcnow().isoformat()
    }

Next Steps

🎉 Great work! You now understand the essential considerations for deploying RunAgent agents to production. These practices will help you build reliable, scalable, and secure AI agent systems!