Skip to main content

Overview

RunAgent implements rate limiting to ensure fair usage and platform stability. Rate limits apply per API key and vary by subscription tier.

Rate Limit Tiers

TierRequests/HourRequests/MinuteConcurrent Requests
Free100102
Pro1,00010010
Team5,00050025
EnterpriseCustomCustomCustom

Rate Limit Headers

Every API response includes rate limit information:
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 999
X-RateLimit-Reset: 1640995200
X-RateLimit-Reset-After: 3600
HeaderDescription
X-RateLimit-LimitMaximum requests allowed
X-RateLimit-RemainingRequests remaining in window
X-RateLimit-ResetUnix timestamp when limit resets
X-RateLimit-Reset-AfterSeconds until limit resets

Rate Limit Response

When rate limited, you’ll receive a 429 response:
{
  "error": {
    "code": "RATE_LIMIT_EXCEEDED",
    "message": "API rate limit exceeded",
    "details": {
      "limit": 100,
      "remaining": 0,
      "reset_at": "2024-01-01T13:00:00Z",
      "retry_after": 3600
    },
    "status": 429
  }
}

Handling Rate Limits

Exponential Backoff

import time
import requests

def call_with_backoff(url, max_retries=5):
    for attempt in range(max_retries):
        response = requests.get(url, headers=headers)
        
        if response.status_code == 429:
            # Get retry time from header or use exponential backoff
            retry_after = int(response.headers.get('Retry-After', 2 ** attempt))
            print(f"Rate limited. Waiting {retry_after} seconds...")
            time.sleep(retry_after)
            continue
            
        return response
    
    raise Exception("Max retries exceeded")

Rate Limit Aware Client

class RateLimitedClient:
    def __init__(self, api_key, requests_per_hour=1000):
        self.api_key = api_key
        self.requests_per_hour = requests_per_hour
        self.request_times = []
    
    def _wait_if_needed(self):
        now = time.time()
        hour_ago = now - 3600
        
        # Remove old requests
        self.request_times = [t for t in self.request_times if t > hour_ago]
        
        # Check if at limit
        if len(self.request_times) >= self.requests_per_hour:
            sleep_time = self.request_times[0] + 3600 - now
            if sleep_time > 0:
                time.sleep(sleep_time)
    
    def request(self, method, url, **kwargs):
        self._wait_if_needed()
        self.request_times.append(time.time())
        
        return requests.request(method, url, **kwargs)

Quota Types

API Request Quotas

Standard rate limits for API calls:
  • Invoke endpoint: Standard rate limits apply
  • Stream endpoint: Counts as single request
  • Health endpoint: Not rate limited

Token Quotas

Monthly token usage limits:
TierTokens/Month
Free100,000
Pro2,000,000
Team10,000,000
EnterpriseCustom

Concurrent Request Limits

Maximum simultaneous requests:
import asyncio
from asyncio import Semaphore

class ConcurrentLimitedClient:
    def __init__(self, max_concurrent=10):
        self.semaphore = Semaphore(max_concurrent)
    
    async def request(self, url):
        async with self.semaphore:
            # Make request
            return await make_async_request(url)

Monitoring Usage

Check Current Usage

GET https://api.run-agent.ai/v1/usage
Authorization: Bearer YOUR_API_KEY
Response:
{
  "period": "2024-01-01T00:00:00Z",
  "requests": {
    "used": 523,
    "limit": 1000,
    "remaining": 477
  },
  "tokens": {
    "used": 45230,
    "limit": 2000000,
    "remaining": 1954770
  }
}

Usage Alerts

Set up alerts when approaching limits:
POST https://api.run-agent.ai/v1/alerts
{
  "type": "rate_limit",
  "threshold": 80,
  "webhook_url": "https://your-app.com/alerts"
}

Best Practices

1. Implement Retry Logic

async function requestWithRetry(url, options, maxRetries = 3) {
  for (let i = 0; i < maxRetries; i++) {
    try {
      const response = await fetch(url, options);
      
      if (response.status === 429) {
        const retryAfter = response.headers.get('Retry-After') || 60;
        await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
        continue;
      }
      
      return response;
    } catch (error) {
      if (i === maxRetries - 1) throw error;
    }
  }
}

2. Batch Requests

# Instead of individual requests
for item in items:
    api.process(item)  # 100 requests

# Batch them
api.process_batch(items)  # 1 request

3. Cache Responses

import functools
import time

def timed_cache(seconds=3600):
    def decorator(func):
        cache = {}
        
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            key = str(args) + str(kwargs)
            
            if key in cache:
                result, timestamp = cache[key]
                if time.time() - timestamp < seconds:
                    return result
            
            result = func(*args, **kwargs)
            cache[key] = (result, time.time())
            return result
        
        return wrapper
    return decorator

@timed_cache(seconds=300)
def get_agent_info(agent_id):
    return api.get_agent(agent_id)

4. Use Webhooks

Instead of polling:
# Don't do this
while True:
    status = api.check_status()  # Uses rate limit
    if status == "complete":
        break
    time.sleep(1)

# Do this
api.set_webhook("https://your-app.com/webhook")
# Receive notification when complete

Rate Limit Increases

Need higher limits? Options:
  1. Upgrade Plan: Move to higher tier
  2. Request Increase: Contact support for custom limits
  3. Enterprise Plan: Fully customizable limits

See Also