Overview
RunAgent implements rate limiting to ensure fair usage and platform stability. Rate limits apply per API key and vary by subscription tier.
Rate Limit Tiers
Tier | Requests/Hour | Requests/Minute | Concurrent Requests |
---|
Free | 100 | 10 | 2 |
Pro | 1,000 | 100 | 10 |
Team | 5,000 | 500 | 25 |
Enterprise | Custom | Custom | Custom |
Every API response includes rate limit information:
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 999
X-RateLimit-Reset: 1640995200
X-RateLimit-Reset-After: 3600
Header | Description |
---|
X-RateLimit-Limit | Maximum requests allowed |
X-RateLimit-Remaining | Requests remaining in window |
X-RateLimit-Reset | Unix timestamp when limit resets |
X-RateLimit-Reset-After | Seconds until limit resets |
Rate Limit Response
When rate limited, you’ll receive a 429 response:
{
"error": {
"code": "RATE_LIMIT_EXCEEDED",
"message": "API rate limit exceeded",
"details": {
"limit": 100,
"remaining": 0,
"reset_at": "2024-01-01T13:00:00Z",
"retry_after": 3600
},
"status": 429
}
}
Handling Rate Limits
Exponential Backoff
import time
import requests
def call_with_backoff(url, max_retries=5):
for attempt in range(max_retries):
response = requests.get(url, headers=headers)
if response.status_code == 429:
# Get retry time from header or use exponential backoff
retry_after = int(response.headers.get('Retry-After', 2 ** attempt))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue
return response
raise Exception("Max retries exceeded")
Rate Limit Aware Client
class RateLimitedClient:
def __init__(self, api_key, requests_per_hour=1000):
self.api_key = api_key
self.requests_per_hour = requests_per_hour
self.request_times = []
def _wait_if_needed(self):
now = time.time()
hour_ago = now - 3600
# Remove old requests
self.request_times = [t for t in self.request_times if t > hour_ago]
# Check if at limit
if len(self.request_times) >= self.requests_per_hour:
sleep_time = self.request_times[0] + 3600 - now
if sleep_time > 0:
time.sleep(sleep_time)
def request(self, method, url, **kwargs):
self._wait_if_needed()
self.request_times.append(time.time())
return requests.request(method, url, **kwargs)
Quota Types
API Request Quotas
Standard rate limits for API calls:
- Invoke endpoint: Standard rate limits apply
- Stream endpoint: Counts as single request
- Health endpoint: Not rate limited
Token Quotas
Monthly token usage limits:
Tier | Tokens/Month |
---|
Free | 100,000 |
Pro | 2,000,000 |
Team | 10,000,000 |
Enterprise | Custom |
Concurrent Request Limits
Maximum simultaneous requests:
import asyncio
from asyncio import Semaphore
class ConcurrentLimitedClient:
def __init__(self, max_concurrent=10):
self.semaphore = Semaphore(max_concurrent)
async def request(self, url):
async with self.semaphore:
# Make request
return await make_async_request(url)
Monitoring Usage
Check Current Usage
GET https://api.run-agent.ai/v1/usage
Authorization: Bearer YOUR_API_KEY
Response:
{
"period": "2024-01-01T00:00:00Z",
"requests": {
"used": 523,
"limit": 1000,
"remaining": 477
},
"tokens": {
"used": 45230,
"limit": 2000000,
"remaining": 1954770
}
}
Usage Alerts
Set up alerts when approaching limits:
POST https://api.run-agent.ai/v1/alerts
{
"type": "rate_limit",
"threshold": 80,
"webhook_url": "https://your-app.com/alerts"
}
Best Practices
1. Implement Retry Logic
async function requestWithRetry(url, options, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
const response = await fetch(url, options);
if (response.status === 429) {
const retryAfter = response.headers.get('Retry-After') || 60;
await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
continue;
}
return response;
} catch (error) {
if (i === maxRetries - 1) throw error;
}
}
}
2. Batch Requests
# Instead of individual requests
for item in items:
api.process(item) # 100 requests
# Batch them
api.process_batch(items) # 1 request
3. Cache Responses
import functools
import time
def timed_cache(seconds=3600):
def decorator(func):
cache = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = str(args) + str(kwargs)
if key in cache:
result, timestamp = cache[key]
if time.time() - timestamp < seconds:
return result
result = func(*args, **kwargs)
cache[key] = (result, time.time())
return result
return wrapper
return decorator
@timed_cache(seconds=300)
def get_agent_info(agent_id):
return api.get_agent(agent_id)
4. Use Webhooks
Instead of polling:
# Don't do this
while True:
status = api.check_status() # Uses rate limit
if status == "complete":
break
time.sleep(1)
# Do this
api.set_webhook("https://your-app.com/webhook")
# Receive notification when complete
Rate Limit Increases
Need higher limits? Options:
- Upgrade Plan: Move to higher tier
- Request Increase: Contact support for custom limits
- Enterprise Plan: Fully customizable limits
See Also