Python SDK
Streaming Responses
Real-time streaming with RunAgent Python SDK
Overview
Streaming allows you to receive agent responses in real-time as they’re generated, perfect for chatbots, live updates, and improving perceived performance.
Basic Streaming
Synchronous Streaming
Copy
from runagent import RunAgentClient
client = RunAgentClient(agent_id="your-agent-id")
# Stream responses
for chunk in client.run_generic_stream({
"query": "Tell me a long story about space exploration"
}):
print(chunk, end="", flush=True)
Asynchronous Streaming
Copy
from runagent import AsyncRunAgentClient
import asyncio
async def stream_response():
client = AsyncRunAgentClient(agent_id="your-agent-id")
async for chunk in client.run_generic_stream({
"query": "Explain quantum computing in detail"
}):
print(chunk, end="", flush=True)
asyncio.run(stream_response())
Stream Processing
Collecting Chunks
Copy
def get_complete_response(client, query):
chunks = []
for chunk in client.run_generic_stream({"query": query}):
chunks.append(chunk)
# Optionally process each chunk
print(chunk, end="", flush=True)
# Return complete response
return "".join(chunks)
client = RunAgentClient(agent_id="your-agent-id")
full_response = get_complete_response(client, "Your question here")
Processing Stream Data
Copy
def process_stream(client, query):
word_count = 0
for chunk in client.run_generic_stream({"query": query}):
# Process each chunk
words = chunk.split()
word_count += len(words)
# Could do other processing like sentiment analysis
yield {
"chunk": chunk,
"words_so_far": word_count
}
Advanced Streaming Patterns
Stream with Timeout
Copy
import time
def stream_with_timeout(client, query, timeout=30):
start_time = time.time()
for chunk in client.run_generic_stream({"query": query}):
if time.time() - start_time > timeout:
yield "\n[Response truncated due to timeout]"
break
yield chunk
Buffered Streaming
Copy
def buffered_stream(client, query, buffer_size=10):
buffer = []
for chunk in client.run_generic_stream({"query": query}):
buffer.append(chunk)
if len(buffer) >= buffer_size:
# Process buffer
yield "".join(buffer)
buffer = []
# Don't forget remaining data
if buffer:
yield "".join(buffer)
Stream with Progress
Copy
def stream_with_progress(client, query):
chunks_received = 0
for chunk in client.run_generic_stream({"query": query}):
chunks_received += 1
# Update progress (could be a callback, progress bar, etc.)
progress = {
"chunk": chunk,
"chunks_received": chunks_received,
"timestamp": time.time()
}
yield progress
Web Integration
Flask SSE Example
Copy
from flask import Flask, Response, request
from runagent import RunAgentClient
app = Flask(__name__)
client = RunAgentClient(agent_id="your-agent-id")
@app.route('/stream')
def stream():
query = request.args.get('query', 'Hello')
def generate():
for chunk in client.run_generic_stream({"query": query}):
yield f"data: {chunk}\n\n"
return Response(
generate(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no"
}
)
WebSocket Streaming
Copy
import asyncio
import websockets
from runagent import AsyncRunAgentClient
async def handle_websocket(websocket, path):
client = AsyncRunAgentClient(agent_id="your-agent-id")
async for message in websocket:
data = json.loads(message)
async for chunk in client.run_generic_stream(data):
await websocket.send(chunk)
start_server = websockets.serve(handle_websocket, "localhost", 8765)
asyncio.run(start_server)
Error Handling in Streams
Copy
def safe_stream(client, query):
try:
for chunk in client.run_generic_stream({"query": query}):
yield chunk
except StreamError as e:
yield f"\n[Stream error: {e}]"
except Exception as e:
yield f"\n[Unexpected error: {e}]"
Stream Transformation
Format Conversion
Copy
def stream_as_json(client, query):
"""Convert stream to JSON events"""
for i, chunk in enumerate(client.run_generic_stream({"query": query})):
yield json.dumps({
"index": i,
"content": chunk,
"timestamp": time.time()
}) + "\n"
Markdown Processing
Copy
def stream_markdown(client, query):
"""Process markdown in real-time"""
buffer = ""
for chunk in client.run_generic_stream({"query": query}):
buffer += chunk
# Check for complete markdown elements
if "```" in buffer:
# Process code blocks
yield process_code_block(buffer)
buffer = ""
else:
yield chunk
Performance Considerations
-
Don’t Block the Stream
Copy# Good - process asynchronously async def process_stream(client, query): async for chunk in client.run_generic_stream(query): # Non-blocking processing asyncio.create_task(process_chunk(chunk)) yield chunk
-
Handle Backpressure
Copydef controlled_stream(client, query, max_buffer=1000): buffer_size = 0 for chunk in client.run_generic_stream({"query": query}): buffer_size += len(chunk) if buffer_size > max_buffer: # Pause or slow down time.sleep(0.1) buffer_size = 0 yield chunk
See Also
- Sync Client - Non-streaming operations
- Async Client - Async streaming patterns
- API Reference - Complete streaming API
On this page
- Overview
- Basic Streaming
- Synchronous Streaming
- Asynchronous Streaming
- Stream Processing
- Collecting Chunks
- Processing Stream Data
- Advanced Streaming Patterns
- Stream with Timeout
- Buffered Streaming
- Stream with Progress
- Web Integration
- Flask SSE Example
- WebSocket Streaming
- Error Handling in Streams
- Stream Transformation
- Format Conversion
- Markdown Processing
- Performance Considerations
- See Also
Assistant
Responses are generated using AI and may contain mistakes.