Overview

RunAgent implements rate limiting to ensure fair usage and platform stability. Rate limits apply per API key and vary by subscription tier.

Rate Limit Tiers

TierRequests/HourRequests/MinuteConcurrent Requests
Free100102
Pro1,00010010
Team5,00050025
EnterpriseCustomCustomCustom

Rate Limit Headers

Every API response includes rate limit information:

X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 999
X-RateLimit-Reset: 1640995200
X-RateLimit-Reset-After: 3600
HeaderDescription
X-RateLimit-LimitMaximum requests allowed
X-RateLimit-RemainingRequests remaining in window
X-RateLimit-ResetUnix timestamp when limit resets
X-RateLimit-Reset-AfterSeconds until limit resets

Rate Limit Response

When rate limited, you’ll receive a 429 response:

{
  "error": {
    "code": "RATE_LIMIT_EXCEEDED",
    "message": "API rate limit exceeded",
    "details": {
      "limit": 100,
      "remaining": 0,
      "reset_at": "2024-01-01T13:00:00Z",
      "retry_after": 3600
    },
    "status": 429
  }
}

Handling Rate Limits

Exponential Backoff

import time
import requests

def call_with_backoff(url, max_retries=5):
    for attempt in range(max_retries):
        response = requests.get(url, headers=headers)
        
        if response.status_code == 429:
            # Get retry time from header or use exponential backoff
            retry_after = int(response.headers.get('Retry-After', 2 ** attempt))
            print(f"Rate limited. Waiting {retry_after} seconds...")
            time.sleep(retry_after)
            continue
            
        return response
    
    raise Exception("Max retries exceeded")

Rate Limit Aware Client

class RateLimitedClient:
    def __init__(self, api_key, requests_per_hour=1000):
        self.api_key = api_key
        self.requests_per_hour = requests_per_hour
        self.request_times = []
    
    def _wait_if_needed(self):
        now = time.time()
        hour_ago = now - 3600
        
        # Remove old requests
        self.request_times = [t for t in self.request_times if t > hour_ago]
        
        # Check if at limit
        if len(self.request_times) >= self.requests_per_hour:
            sleep_time = self.request_times[0] + 3600 - now
            if sleep_time > 0:
                time.sleep(sleep_time)
    
    def request(self, method, url, **kwargs):
        self._wait_if_needed()
        self.request_times.append(time.time())
        
        return requests.request(method, url, **kwargs)

Quota Types

API Request Quotas

Standard rate limits for API calls:

  • Invoke endpoint: Standard rate limits apply
  • Stream endpoint: Counts as single request
  • Health endpoint: Not rate limited

Token Quotas

Monthly token usage limits:

TierTokens/Month
Free100,000
Pro2,000,000
Team10,000,000
EnterpriseCustom

Concurrent Request Limits

Maximum simultaneous requests:

import asyncio
from asyncio import Semaphore

class ConcurrentLimitedClient:
    def __init__(self, max_concurrent=10):
        self.semaphore = Semaphore(max_concurrent)
    
    async def request(self, url):
        async with self.semaphore:
            # Make request
            return await make_async_request(url)

Monitoring Usage

Check Current Usage

GET https://api.run-agent.ai/v1/usage
Authorization: Bearer YOUR_API_KEY

Response:

{
  "period": "2024-01-01T00:00:00Z",
  "requests": {
    "used": 523,
    "limit": 1000,
    "remaining": 477
  },
  "tokens": {
    "used": 45230,
    "limit": 2000000,
    "remaining": 1954770
  }
}

Usage Alerts

Set up alerts when approaching limits:

POST https://api.run-agent.ai/v1/alerts
{
  "type": "rate_limit",
  "threshold": 80,
  "webhook_url": "https://your-app.com/alerts"
}

Best Practices

1. Implement Retry Logic

async function requestWithRetry(url, options, maxRetries = 3) {
  for (let i = 0; i < maxRetries; i++) {
    try {
      const response = await fetch(url, options);
      
      if (response.status === 429) {
        const retryAfter = response.headers.get('Retry-After') || 60;
        await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
        continue;
      }
      
      return response;
    } catch (error) {
      if (i === maxRetries - 1) throw error;
    }
  }
}

2. Batch Requests

# Instead of individual requests
for item in items:
    api.process(item)  # 100 requests

# Batch them
api.process_batch(items)  # 1 request

3. Cache Responses

import functools
import time

def timed_cache(seconds=3600):
    def decorator(func):
        cache = {}
        
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            key = str(args) + str(kwargs)
            
            if key in cache:
                result, timestamp = cache[key]
                if time.time() - timestamp < seconds:
                    return result
            
            result = func(*args, **kwargs)
            cache[key] = (result, time.time())
            return result
        
        return wrapper
    return decorator

@timed_cache(seconds=300)
def get_agent_info(agent_id):
    return api.get_agent(agent_id)

4. Use Webhooks

Instead of polling:

# Don't do this
while True:
    status = api.check_status()  # Uses rate limit
    if status == "complete":
        break
    time.sleep(1)

# Do this
api.set_webhook("https://your-app.com/webhook")
# Receive notification when complete

Rate Limit Increases

Need higher limits? Options:

  1. Upgrade Plan: Move to higher tier
  2. Request Increase: Contact support for custom limits
  3. Enterprise Plan: Fully customizable limits

See Also