Prerequisites: Python 3.8+ and completed Deploy Your First Agent tutorial
Overview
The Python SDK provides native-feeling access to RunAgent agents with full type safety, async support, and streaming capabilities. It’s designed to feel like calling local Python functions.Installation
Copy
pip install runagent
Basic Usage
Synchronous Calls
Copy
from runagent import RunAgentClient
# Connect to your agent
client = RunAgentClient(
agent_id="your_agent_id_here",
entrypoint_tag="main",
local=True # Set to False for production
)
# Call your agent
result = client.run(
message="Hello, how are you?",
user_id="python_user"
)
print(f"Response: {result['response']}")
Asynchronous Calls
Copy
import asyncio
from runagent import AsyncRunAgentClient
async def main():
# Connect to your agent
client = AsyncRunAgentClient(
agent_id="your_agent_id_here",
entrypoint_tag="main",
local=True
)
# Call your agent asynchronously
result = await client.run(
message="Hello, how are you?",
user_id="python_user"
)
print(f"Response: {result['response']}")
# Run the async function
asyncio.run(main())
Advanced Features
1. Streaming Responses
Copy
from runagent import RunAgentClient
# Connect to streaming entrypoint
client = RunAgentClient(
agent_id="your_agent_id_here",
entrypoint_tag="streaming", # Note: tag ends with _stream
local=True
)
# Stream responses
for chunk in client.run(message="Tell me a story"):
print(chunk, end="", flush=True)
2. Batch Processing
Copy
from runagent import RunAgentClient
import asyncio
async def batch_processing():
client = AsyncRunAgentClient(
agent_id="your_agent_id_here",
entrypoint_tag="main",
local=True
)
# Process multiple requests concurrently
tasks = [
client.run(message=f"Process item {i}", user_id="batch_user")
for i in range(10)
]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
print(f"Item {i}: {result['response']}")
asyncio.run(batch_processing())
3. Error Handling
Copy
from runagent import RunAgentClient, RunAgentError
client = RunAgentClient(
agent_id="your_agent_id_here",
entrypoint_tag="main",
local=True
)
try:
result = client.run(message="Test message")
print(f"Success: {result['response']}")
except RunAgentError as e:
print(f"RunAgent error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
4. Custom Headers and Metadata
Copy
from runagent import RunAgentClient
client = RunAgentClient(
agent_id="your_agent_id_here",
entrypoint_tag="main",
local=True,
headers={
"X-Request-ID": "unique-request-id",
"X-User-ID": "python_user"
}
)
result = client.run(
message="Hello with custom headers",
metadata={
"source": "python_app",
"version": "1.0.0"
}
)
Configuration
Environment Variables
Copy
# Set API key
export RUNAGENT_API_KEY="your-api-key"
# Set API URL
export RUNAGENT_API_URL="https://api.run-agent.ai"
# Set default agent ID
export RUNAGENT_AGENT_ID="your-agent-id"
Configuration File
Create~/.runagent/config.json
:
Copy
{
"api_key": "your-api-key",
"api_url": "https://api.run-agent.ai",
"default_agent_id": "your-agent-id",
"timeout": 30,
"retry_attempts": 3
}
Programmatic Configuration
Copy
from runagent import RunAgentClient
client = RunAgentClient(
agent_id="your_agent_id_here",
entrypoint_tag="main",
local=True,
api_key="your-api-key",
api_url="https://api.run-agent.ai",
timeout=30,
retry_attempts=3
)
Best Practices
1. Connection Management
Copy
from runagent import RunAgentClient
from contextlib import contextmanager
@contextmanager
def get_client(agent_id: str, entrypoint_tag: str):
client = RunAgentClient(
agent_id=agent_id,
entrypoint_tag=entrypoint_tag,
local=True
)
try:
yield client
finally:
# Cleanup if needed
pass
# Usage
with get_client("your_agent_id", "main") as client:
result = client.run(message="Hello")
2. Retry Logic
Copy
from runagent import RunAgentClient
import time
import random
def run_with_retry(client, message, max_retries=3):
for attempt in range(max_retries):
try:
return client.run(message=message)
except Exception as e:
if attempt == max_retries - 1:
raise e
time.sleep(random.uniform(1, 3)) # Exponential backoff
client = RunAgentClient(agent_id="your_agent_id", entrypoint_tag="main", local=True)
result = run_with_retry(client, "Hello with retry")
3. Logging and Monitoring
Copy
import logging
from runagent import RunAgentClient
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
client = RunAgentClient(
agent_id="your_agent_id_here",
entrypoint_tag="main",
local=True
)
def monitored_run(message: str):
logger.info(f"Starting request: {message}")
start_time = time.time()
try:
result = client.run(message=message)
duration = time.time() - start_time
logger.info(f"Request completed in {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
logger.error(f"Request failed after {duration:.2f}s: {e}")
raise
result = monitored_run("Hello with monitoring")
Common Patterns
1. Agent Factory Pattern
Copy
from runagent import RunAgentClient
from typing import Dict, Any
class AgentFactory:
def __init__(self, base_config: Dict[str, Any]):
self.base_config = base_config
def get_agent(self, agent_id: str, entrypoint_tag: str) -> RunAgentClient:
config = self.base_config.copy()
config.update({
"agent_id": agent_id,
"entrypoint_tag": entrypoint_tag
})
return RunAgentClient(**config)
def get_chat_agent(self, agent_id: str) -> RunAgentClient:
return self.get_agent(agent_id, "chat")
def get_analysis_agent(self, agent_id: str) -> RunAgentClient:
return self.get_agent(agent_id, "analyze")
# Usage
factory = AgentFactory({
"local": True,
"timeout": 30
})
chat_agent = factory.get_chat_agent("your_agent_id")
result = chat_agent.run(message="Hello")
2. Agent Wrapper Pattern
Copy
from runagent import RunAgentClient
from typing import Dict, Any, Optional
class AgentWrapper:
def __init__(self, agent_id: str, entrypoint_tag: str, **kwargs):
self.client = RunAgentClient(
agent_id=agent_id,
entrypoint_tag=entrypoint_tag,
**kwargs
)
self.agent_id = agent_id
self.entrypoint_tag = entrypoint_tag
def call(self, **kwargs) -> Dict[str, Any]:
"""Generic call method"""
return self.client.run(**kwargs)
def chat(self, message: str, user_id: str = "default") -> str:
"""Chat-specific method"""
result = self.client.run(message=message, user_id=user_id)
return result.get("response", "")
def analyze(self, data: str, analysis_type: str = "summary") -> Dict[str, Any]:
"""Analysis-specific method"""
return self.client.run(data=data, analysis_type=analysis_type)
def stream(self, message: str) -> str:
"""Streaming method"""
response = ""
for chunk in self.client.run(message=message):
response += chunk
return response
# Usage
agent = AgentWrapper("your_agent_id", "main", local=True)
response = agent.chat("Hello, how are you?")
3. Agent Pool Pattern
Copy
from runagent import RunAgentClient
from typing import List, Dict, Any
import random
import threading
class AgentPool:
def __init__(self, agent_configs: List[Dict[str, Any]]):
self.agents = []
self.lock = threading.Lock()
for config in agent_configs:
agent = RunAgentClient(**config)
self.agents.append(agent)
def get_agent(self) -> RunAgentClient:
"""Get a random agent from the pool"""
with self.lock:
return random.choice(self.agents)
def call(self, **kwargs) -> Dict[str, Any]:
"""Call any available agent"""
agent = self.get_agent()
return agent.run(**kwargs)
# Usage
pool = AgentPool([
{"agent_id": "agent1", "entrypoint_tag": "main", "local": True},
{"agent_id": "agent2", "entrypoint_tag": "main", "local": True},
{"agent_id": "agent3", "entrypoint_tag": "main", "local": True}
])
result = pool.call(message="Hello from pool")
Error Handling
Common Error Types
Copy
from runagent import (
RunAgentError,
AuthenticationError,
AgentNotFoundError,
ValidationError,
RateLimitError,
TimeoutError,
NetworkError
)
def handle_errors(client, message):
try:
return client.run(message=message)
except AuthenticationError:
print("Authentication failed. Check your API key.")
except AgentNotFoundError:
print("Agent not found. Check your agent ID.")
except ValidationError as e:
print(f"Validation error: {e}")
except RateLimitError:
print("Rate limit exceeded. Please wait and try again.")
except TimeoutError:
print("Request timed out. Please try again.")
except NetworkError:
print("Network error. Check your connection.")
except RunAgentError as e:
print(f"RunAgent error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Performance Optimization
1. Connection Pooling
Copy
from runagent import RunAgentClient
import threading
from queue import Queue
class ConnectionPool:
def __init__(self, agent_id: str, entrypoint_tag: str, pool_size: int = 5):
self.pool = Queue(maxsize=pool_size)
self.agent_id = agent_id
self.entrypoint_tag = entrypoint_tag
# Pre-populate pool
for _ in range(pool_size):
client = RunAgentClient(
agent_id=agent_id,
entrypoint_tag=entrypoint_tag,
local=True
)
self.pool.put(client)
def get_client(self) -> RunAgentClient:
return self.pool.get()
def return_client(self, client: RunAgentClient):
self.pool.put(client)
# Usage
pool = ConnectionPool("your_agent_id", "main", pool_size=5)
client = pool.get_client()
try:
result = client.run(message="Hello")
finally:
pool.return_client(client)
2. Caching
Copy
from runagent import RunAgentClient
import hashlib
import json
from functools import lru_cache
class CachedAgent:
def __init__(self, agent_id: str, entrypoint_tag: str):
self.client = RunAgentClient(
agent_id=agent_id,
entrypoint_tag=entrypoint_tag,
local=True
)
self.cache = {}
def _cache_key(self, **kwargs) -> str:
"""Generate cache key from parameters"""
key_data = json.dumps(kwargs, sort_keys=True)
return hashlib.md5(key_data.encode()).hexdigest()
def run(self, **kwargs) -> Dict[str, Any]:
cache_key = self._cache_key(**kwargs)
if cache_key in self.cache:
return self.cache[cache_key]
result = self.client.run(**kwargs)
self.cache[cache_key] = result
return result
# Usage
cached_agent = CachedAgent("your_agent_id", "main")
result = cached_agent.run(message="Hello")
Testing
Unit Testing
Copy
import unittest
from unittest.mock import Mock, patch
from runagent import RunAgentClient
class TestAgentClient(unittest.TestCase):
def setUp(self):
self.client = RunAgentClient(
agent_id="test_agent",
entrypoint_tag="main",
local=True
)
@patch('runagent.client.requests.post')
def test_run_success(self, mock_post):
# Mock successful response
mock_response = Mock()
mock_response.json.return_value = {"response": "Hello"}
mock_response.status_code = 200
mock_post.return_value = mock_response
result = self.client.run(message="Hello")
self.assertEqual(result["response"], "Hello")
mock_post.assert_called_once()
def test_run_error(self):
with self.assertRaises(RunAgentError):
self.client.run(message="")
if __name__ == "__main__":
unittest.main()
Integration Testing
Copy
import pytest
from runagent import RunAgentClient
@pytest.fixture
def agent_client():
return RunAgentClient(
agent_id="test_agent",
entrypoint_tag="main",
local=True
)
def test_agent_response(agent_client):
result = agent_client.run(message="Hello")
assert "response" in result
assert result["status"] == "success"
def test_agent_streaming(agent_client):
response = ""
for chunk in agent_client.run(message="Hello"):
response += chunk
assert len(response) > 0
Next Steps
Async Patterns
Learn advanced async patterns for Python
Production Deployment
Deploy your Python applications to production
Multi-Language Integration
Integrate with other programming languages
Performance Tuning
Optimize your Python applications for production
🎉 Great work! You’ve learned how to use RunAgent agents from Python applications. The Python SDK provides native-feeling access with full type safety and async support!
Still have a question?
- Join our Discord Community
- Email us: [email protected]
- Follow us on X
- New here? Sign up