Overview

The asynchronous client enables non-blocking operations, perfect for high-performance applications, concurrent requests, and modern async Python applications.

Initialization

from runagent import AsyncRunAgentClient
import asyncio

# Basic initialization
client = AsyncRunAgentClient(agent_id="your-agent-id")

# With all options
client = AsyncRunAgentClient(
    agent_id="your-agent-id",
    api_key="your-api-key",
    base_url="https://api.run-agent.ai",
    timeout=30,
    max_retries=3
)

Basic Usage

Simple Async Request

async def main():
    client = AsyncRunAgentClient(agent_id="your-agent-id")
    
    result = await client.run_generic({
        "query": "What's the weather like?",
        "location": "Tokyo"
    })
    
    print(result)

# Run the async function
asyncio.run(main())

Multiple Concurrent Requests

async def process_multiple():
    client = AsyncRunAgentClient(agent_id="your-agent-id")
    
    # Create multiple tasks
    tasks = [
        client.run_generic({"query": "Question 1"}),
        client.run_generic({"query": "Question 2"}),
        client.run_generic({"query": "Question 3"})
    ]
    
    # Run concurrently
    results = await asyncio.gather(*tasks)
    
    for i, result in enumerate(results):
        print(f"Result {i+1}: {result}")

asyncio.run(process_multiple())

Async Context Manager

async def main():
    async with AsyncRunAgentClient(agent_id="your-agent-id") as client:
        result = await client.run_generic({"query": "Hello"})
        print(result)
    # Client automatically cleaned up

asyncio.run(main())

Error Handling

async def safe_run():
    client = AsyncRunAgentClient(agent_id="your-agent-id")
    
    try:
        result = await client.run_generic({"query": "Test"})
        return result
    except asyncio.TimeoutError:
        print("Request timed out")
    except RunAgentError as e:
        print(f"Agent error: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")

asyncio.run(safe_run())

Advanced Patterns

Rate Limited Concurrent Processing

async def process_with_limit(items, max_concurrent=5):
    client = AsyncRunAgentClient(agent_id="your-agent-id")
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_item(item):
        async with semaphore:
            return await client.run_generic(item)
    
    tasks = [process_item(item) for item in items]
    return await asyncio.gather(*tasks)

# Process 100 items, max 5 at a time
items = [{"query": f"Item {i}"} for i in range(100)]
results = asyncio.run(process_with_limit(items))

Async Queue Processing

async def queue_processor():
    client = AsyncRunAgentClient(agent_id="your-agent-id")
    queue = asyncio.Queue()
    
    # Producer
    async def producer():
        for i in range(10):
            await queue.put({"query": f"Task {i}"})
            await asyncio.sleep(0.1)
    
    # Consumer
    async def consumer():
        while True:
            try:
                item = await asyncio.wait_for(queue.get(), timeout=1.0)
                result = await client.run_generic(item)
                print(f"Processed: {result}")
            except asyncio.TimeoutError:
                break
    
    # Run producer and consumer concurrently
    await asyncio.gather(producer(), consumer())

asyncio.run(queue_processor())

Timeout Handling

async def with_timeout():
    client = AsyncRunAgentClient(agent_id="your-agent-id")
    
    try:
        # Set a 5-second timeout
        result = await asyncio.wait_for(
            client.run_generic({"query": "Complex task"}),
            timeout=5.0
        )
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out after 5 seconds")

asyncio.run(with_timeout())

Integration with Web Frameworks

FastAPI Example

from fastapi import FastAPI, HTTPException
from runagent import AsyncRunAgentClient

app = FastAPI()
client = AsyncRunAgentClient(agent_id="your-agent-id")

@app.post("/process")
async def process_request(data: dict):
    try:
        result = await client.run_generic(data)
        return {"status": "success", "result": result}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

aiohttp Example

from aiohttp import web
from runagent import AsyncRunAgentClient

client = AsyncRunAgentClient(agent_id="your-agent-id")

async def handle_request(request):
    data = await request.json()
    result = await client.run_generic(data)
    return web.json_response(result)

app = web.Application()
app.router.add_post('/process', handle_request)

Performance Tips

  1. Reuse Client Instance

    # Good - single client for all requests
    client = AsyncRunAgentClient(agent_id="...")
    
    async def process_many(items):
        tasks = [client.run_generic(item) for item in items]
        return await asyncio.gather(*tasks)
    
  2. Use Connection Pooling

    client = AsyncRunAgentClient(
        agent_id="...",
        connection_pool_size=20  # Adjust based on load
    )
    
  3. Batch When Possible

    # Instead of many small requests
    # results = [await client.run_generic(item) for item in items]
    
    # Batch them
    result = await client.run_generic({
        "batch": items,
        "process": "parallel"
    })
    

See Also