Python SDK
Asynchronous Client
Using the RunAgent Python SDK asynchronously
Overview
The asynchronous client enables non-blocking operations, perfect for high-performance applications, concurrent requests, and modern async Python applications.
Initialization
Copy
from runagent import AsyncRunAgentClient
import asyncio
# Basic initialization
client = AsyncRunAgentClient(agent_id="your-agent-id")
# With all options
client = AsyncRunAgentClient(
agent_id="your-agent-id",
api_key="your-api-key",
base_url="https://api.run-agent.ai",
timeout=30,
max_retries=3
)
Basic Usage
Simple Async Request
Copy
async def main():
client = AsyncRunAgentClient(agent_id="your-agent-id")
result = await client.run_generic({
"query": "What's the weather like?",
"location": "Tokyo"
})
print(result)
# Run the async function
asyncio.run(main())
Multiple Concurrent Requests
Copy
async def process_multiple():
client = AsyncRunAgentClient(agent_id="your-agent-id")
# Create multiple tasks
tasks = [
client.run_generic({"query": "Question 1"}),
client.run_generic({"query": "Question 2"}),
client.run_generic({"query": "Question 3"})
]
# Run concurrently
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
print(f"Result {i+1}: {result}")
asyncio.run(process_multiple())
Async Context Manager
Copy
async def main():
async with AsyncRunAgentClient(agent_id="your-agent-id") as client:
result = await client.run_generic({"query": "Hello"})
print(result)
# Client automatically cleaned up
asyncio.run(main())
Error Handling
Copy
async def safe_run():
client = AsyncRunAgentClient(agent_id="your-agent-id")
try:
result = await client.run_generic({"query": "Test"})
return result
except asyncio.TimeoutError:
print("Request timed out")
except RunAgentError as e:
print(f"Agent error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
asyncio.run(safe_run())
Advanced Patterns
Rate Limited Concurrent Processing
Copy
async def process_with_limit(items, max_concurrent=5):
client = AsyncRunAgentClient(agent_id="your-agent-id")
semaphore = asyncio.Semaphore(max_concurrent)
async def process_item(item):
async with semaphore:
return await client.run_generic(item)
tasks = [process_item(item) for item in items]
return await asyncio.gather(*tasks)
# Process 100 items, max 5 at a time
items = [{"query": f"Item {i}"} for i in range(100)]
results = asyncio.run(process_with_limit(items))
Async Queue Processing
Copy
async def queue_processor():
client = AsyncRunAgentClient(agent_id="your-agent-id")
queue = asyncio.Queue()
# Producer
async def producer():
for i in range(10):
await queue.put({"query": f"Task {i}"})
await asyncio.sleep(0.1)
# Consumer
async def consumer():
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=1.0)
result = await client.run_generic(item)
print(f"Processed: {result}")
except asyncio.TimeoutError:
break
# Run producer and consumer concurrently
await asyncio.gather(producer(), consumer())
asyncio.run(queue_processor())
Timeout Handling
Copy
async def with_timeout():
client = AsyncRunAgentClient(agent_id="your-agent-id")
try:
# Set a 5-second timeout
result = await asyncio.wait_for(
client.run_generic({"query": "Complex task"}),
timeout=5.0
)
print(result)
except asyncio.TimeoutError:
print("Operation timed out after 5 seconds")
asyncio.run(with_timeout())
Integration with Web Frameworks
FastAPI Example
Copy
from fastapi import FastAPI, HTTPException
from runagent import AsyncRunAgentClient
app = FastAPI()
client = AsyncRunAgentClient(agent_id="your-agent-id")
@app.post("/process")
async def process_request(data: dict):
try:
result = await client.run_generic(data)
return {"status": "success", "result": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
aiohttp Example
Copy
from aiohttp import web
from runagent import AsyncRunAgentClient
client = AsyncRunAgentClient(agent_id="your-agent-id")
async def handle_request(request):
data = await request.json()
result = await client.run_generic(data)
return web.json_response(result)
app = web.Application()
app.router.add_post('/process', handle_request)
Performance Tips
-
Reuse Client Instance
Copy# Good - single client for all requests client = AsyncRunAgentClient(agent_id="...") async def process_many(items): tasks = [client.run_generic(item) for item in items] return await asyncio.gather(*tasks)
-
Use Connection Pooling
Copyclient = AsyncRunAgentClient( agent_id="...", connection_pool_size=20 # Adjust based on load )
-
Batch When Possible
Copy# Instead of many small requests # results = [await client.run_generic(item) for item in items] # Batch them result = await client.run_generic({ "batch": items, "process": "parallel" })
See Also
- Sync Client - For simpler use cases
- Streaming - For real-time responses
- API Reference - Complete method documentation
On this page
Assistant
Responses are generated using AI and may contain mistakes.