Skip to main content
Prerequisites: Python 3.8+ and completed Deploy Your First Agent tutorial

Overview

The Python SDK provides native-feeling access to RunAgent agents with full type safety, async support, and streaming capabilities. It’s designed to feel like calling local Python functions.

Installation

pip install runagent

Basic Usage

Synchronous Calls

from runagent import RunAgentClient

# Connect to your agent
client = RunAgentClient(
    agent_id="your_agent_id_here",
    entrypoint_tag="main",
    local=True  # Set to False for production
)

# Call your agent
result = client.run(
    message="Hello, how are you?",
    user_id="python_user"
)

print(f"Response: {result['response']}")

Asynchronous Calls

import asyncio
from runagent import AsyncRunAgentClient

async def main():
    # Connect to your agent
    client = AsyncRunAgentClient(
        agent_id="your_agent_id_here",
        entrypoint_tag="main",
        local=True
    )
    
    # Call your agent asynchronously
    result = await client.run(
        message="Hello, how are you?",
        user_id="python_user"
    )
    
    print(f"Response: {result['response']}")

# Run the async function
asyncio.run(main())

Advanced Features

1. Streaming Responses

from runagent import RunAgentClient

# Connect to streaming entrypoint
client = RunAgentClient(
    agent_id="your_agent_id_here",
    entrypoint_tag="streaming",  # Note: tag ends with _stream
    local=True
)

# Stream responses
for chunk in client.run(message="Tell me a story"):
    print(chunk, end="", flush=True)

2. Batch Processing

from runagent import RunAgentClient
import asyncio

async def batch_processing():
    client = AsyncRunAgentClient(
        agent_id="your_agent_id_here",
        entrypoint_tag="main",
        local=True
    )
    
    # Process multiple requests concurrently
    tasks = [
        client.run(message=f"Process item {i}", user_id="batch_user")
        for i in range(10)
    ]
    
    results = await asyncio.gather(*tasks)
    
    for i, result in enumerate(results):
        print(f"Item {i}: {result['response']}")

asyncio.run(batch_processing())

3. Error Handling

from runagent import RunAgentClient, RunAgentError

client = RunAgentClient(
    agent_id="your_agent_id_here",
    entrypoint_tag="main",
    local=True
)

try:
    result = client.run(message="Test message")
    print(f"Success: {result['response']}")
except RunAgentError as e:
    print(f"RunAgent error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

4. Custom Headers and Metadata

from runagent import RunAgentClient

client = RunAgentClient(
    agent_id="your_agent_id_here",
    entrypoint_tag="main",
    local=True,
    headers={
        "X-Request-ID": "unique-request-id",
        "X-User-ID": "python_user"
    }
)

result = client.run(
    message="Hello with custom headers",
    metadata={
        "source": "python_app",
        "version": "1.0.0"
    }
)

Configuration

Environment Variables

# Set API key
export RUNAGENT_API_KEY="your-api-key"

# Set API URL
export RUNAGENT_API_URL="https://api.run-agent.ai"

# Set default agent ID
export RUNAGENT_AGENT_ID="your-agent-id"

Configuration File

Create ~/.runagent/config.json:
{
  "api_key": "your-api-key",
  "api_url": "https://api.run-agent.ai",
  "default_agent_id": "your-agent-id",
  "timeout": 30,
  "retry_attempts": 3
}

Programmatic Configuration

from runagent import RunAgentClient

client = RunAgentClient(
    agent_id="your_agent_id_here",
    entrypoint_tag="main",
    local=True,
    api_key="your-api-key",
    api_url="https://api.run-agent.ai",
    timeout=30,
    retry_attempts=3
)

Best Practices

1. Connection Management

from runagent import RunAgentClient
from contextlib import contextmanager

@contextmanager
def get_client(agent_id: str, entrypoint_tag: str):
    client = RunAgentClient(
        agent_id=agent_id,
        entrypoint_tag=entrypoint_tag,
        local=True
    )
    try:
        yield client
    finally:
        # Cleanup if needed
        pass

# Usage
with get_client("your_agent_id", "main") as client:
    result = client.run(message="Hello")

2. Retry Logic

from runagent import RunAgentClient
import time
import random

def run_with_retry(client, message, max_retries=3):
    for attempt in range(max_retries):
        try:
            return client.run(message=message)
        except Exception as e:
            if attempt == max_retries - 1:
                raise e
            time.sleep(random.uniform(1, 3))  # Exponential backoff

client = RunAgentClient(agent_id="your_agent_id", entrypoint_tag="main", local=True)
result = run_with_retry(client, "Hello with retry")

3. Logging and Monitoring

import logging
from runagent import RunAgentClient

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

client = RunAgentClient(
    agent_id="your_agent_id_here",
    entrypoint_tag="main",
    local=True
)

def monitored_run(message: str):
    logger.info(f"Starting request: {message}")
    start_time = time.time()
    
    try:
        result = client.run(message=message)
        duration = time.time() - start_time
        logger.info(f"Request completed in {duration:.2f}s")
        return result
    except Exception as e:
        duration = time.time() - start_time
        logger.error(f"Request failed after {duration:.2f}s: {e}")
        raise

result = monitored_run("Hello with monitoring")

Common Patterns

1. Agent Factory Pattern

from runagent import RunAgentClient
from typing import Dict, Any

class AgentFactory:
    def __init__(self, base_config: Dict[str, Any]):
        self.base_config = base_config
    
    def get_agent(self, agent_id: str, entrypoint_tag: str) -> RunAgentClient:
        config = self.base_config.copy()
        config.update({
            "agent_id": agent_id,
            "entrypoint_tag": entrypoint_tag
        })
        return RunAgentClient(**config)
    
    def get_chat_agent(self, agent_id: str) -> RunAgentClient:
        return self.get_agent(agent_id, "chat")
    
    def get_analysis_agent(self, agent_id: str) -> RunAgentClient:
        return self.get_agent(agent_id, "analyze")

# Usage
factory = AgentFactory({
    "local": True,
    "timeout": 30
})

chat_agent = factory.get_chat_agent("your_agent_id")
result = chat_agent.run(message="Hello")

2. Agent Wrapper Pattern

from runagent import RunAgentClient
from typing import Dict, Any, Optional

class AgentWrapper:
    def __init__(self, agent_id: str, entrypoint_tag: str, **kwargs):
        self.client = RunAgentClient(
            agent_id=agent_id,
            entrypoint_tag=entrypoint_tag,
            **kwargs
        )
        self.agent_id = agent_id
        self.entrypoint_tag = entrypoint_tag
    
    def call(self, **kwargs) -> Dict[str, Any]:
        """Generic call method"""
        return self.client.run(**kwargs)
    
    def chat(self, message: str, user_id: str = "default") -> str:
        """Chat-specific method"""
        result = self.client.run(message=message, user_id=user_id)
        return result.get("response", "")
    
    def analyze(self, data: str, analysis_type: str = "summary") -> Dict[str, Any]:
        """Analysis-specific method"""
        return self.client.run(data=data, analysis_type=analysis_type)
    
    def stream(self, message: str) -> str:
        """Streaming method"""
        response = ""
        for chunk in self.client.run(message=message):
            response += chunk
        return response

# Usage
agent = AgentWrapper("your_agent_id", "main", local=True)
response = agent.chat("Hello, how are you?")

3. Agent Pool Pattern

from runagent import RunAgentClient
from typing import List, Dict, Any
import random
import threading

class AgentPool:
    def __init__(self, agent_configs: List[Dict[str, Any]]):
        self.agents = []
        self.lock = threading.Lock()
        
        for config in agent_configs:
            agent = RunAgentClient(**config)
            self.agents.append(agent)
    
    def get_agent(self) -> RunAgentClient:
        """Get a random agent from the pool"""
        with self.lock:
            return random.choice(self.agents)
    
    def call(self, **kwargs) -> Dict[str, Any]:
        """Call any available agent"""
        agent = self.get_agent()
        return agent.run(**kwargs)

# Usage
pool = AgentPool([
    {"agent_id": "agent1", "entrypoint_tag": "main", "local": True},
    {"agent_id": "agent2", "entrypoint_tag": "main", "local": True},
    {"agent_id": "agent3", "entrypoint_tag": "main", "local": True}
])

result = pool.call(message="Hello from pool")

Error Handling

Common Error Types

from runagent import (
    RunAgentError,
    AuthenticationError,
    AgentNotFoundError,
    ValidationError,
    RateLimitError,
    TimeoutError,
    NetworkError
)

def handle_errors(client, message):
    try:
        return client.run(message=message)
    except AuthenticationError:
        print("Authentication failed. Check your API key.")
    except AgentNotFoundError:
        print("Agent not found. Check your agent ID.")
    except ValidationError as e:
        print(f"Validation error: {e}")
    except RateLimitError:
        print("Rate limit exceeded. Please wait and try again.")
    except TimeoutError:
        print("Request timed out. Please try again.")
    except NetworkError:
        print("Network error. Check your connection.")
    except RunAgentError as e:
        print(f"RunAgent error: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")

Performance Optimization

1. Connection Pooling

from runagent import RunAgentClient
import threading
from queue import Queue

class ConnectionPool:
    def __init__(self, agent_id: str, entrypoint_tag: str, pool_size: int = 5):
        self.pool = Queue(maxsize=pool_size)
        self.agent_id = agent_id
        self.entrypoint_tag = entrypoint_tag
        
        # Pre-populate pool
        for _ in range(pool_size):
            client = RunAgentClient(
                agent_id=agent_id,
                entrypoint_tag=entrypoint_tag,
                local=True
            )
            self.pool.put(client)
    
    def get_client(self) -> RunAgentClient:
        return self.pool.get()
    
    def return_client(self, client: RunAgentClient):
        self.pool.put(client)

# Usage
pool = ConnectionPool("your_agent_id", "main", pool_size=5)
client = pool.get_client()
try:
    result = client.run(message="Hello")
finally:
    pool.return_client(client)

2. Caching

from runagent import RunAgentClient
import hashlib
import json
from functools import lru_cache

class CachedAgent:
    def __init__(self, agent_id: str, entrypoint_tag: str):
        self.client = RunAgentClient(
            agent_id=agent_id,
            entrypoint_tag=entrypoint_tag,
            local=True
        )
        self.cache = {}
    
    def _cache_key(self, **kwargs) -> str:
        """Generate cache key from parameters"""
        key_data = json.dumps(kwargs, sort_keys=True)
        return hashlib.md5(key_data.encode()).hexdigest()
    
    def run(self, **kwargs) -> Dict[str, Any]:
        cache_key = self._cache_key(**kwargs)
        
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        result = self.client.run(**kwargs)
        self.cache[cache_key] = result
        return result

# Usage
cached_agent = CachedAgent("your_agent_id", "main")
result = cached_agent.run(message="Hello")

Testing

Unit Testing

import unittest
from unittest.mock import Mock, patch
from runagent import RunAgentClient

class TestAgentClient(unittest.TestCase):
    def setUp(self):
        self.client = RunAgentClient(
            agent_id="test_agent",
            entrypoint_tag="main",
            local=True
        )
    
    @patch('runagent.client.requests.post')
    def test_run_success(self, mock_post):
        # Mock successful response
        mock_response = Mock()
        mock_response.json.return_value = {"response": "Hello"}
        mock_response.status_code = 200
        mock_post.return_value = mock_response
        
        result = self.client.run(message="Hello")
        
        self.assertEqual(result["response"], "Hello")
        mock_post.assert_called_once()
    
    def test_run_error(self):
        with self.assertRaises(RunAgentError):
            self.client.run(message="")

if __name__ == "__main__":
    unittest.main()

Integration Testing

import pytest
from runagent import RunAgentClient

@pytest.fixture
def agent_client():
    return RunAgentClient(
        agent_id="test_agent",
        entrypoint_tag="main",
        local=True
    )

def test_agent_response(agent_client):
    result = agent_client.run(message="Hello")
    assert "response" in result
    assert result["status"] == "success"

def test_agent_streaming(agent_client):
    response = ""
    for chunk in agent_client.run(message="Hello"):
        response += chunk
    assert len(response) > 0

Next Steps

🎉 Great work! You’ve learned how to use RunAgent agents from Python applications. The Python SDK provides native-feeling access with full type safety and async support!