Overview

Streaming allows you to receive agent responses in real-time as they’re generated, perfect for chatbots, live updates, and improving perceived performance.

Basic Streaming

Synchronous Streaming

from runagent import RunAgentClient

client = RunAgentClient(agent_id="your-agent-id")

# Stream responses
for chunk in client.run_generic_stream({
    "query": "Tell me a long story about space exploration"
}):
    print(chunk, end="", flush=True)

Asynchronous Streaming

from runagent import AsyncRunAgentClient
import asyncio

async def stream_response():
    client = AsyncRunAgentClient(agent_id="your-agent-id")
    
    async for chunk in client.run_generic_stream({
        "query": "Explain quantum computing in detail"
    }):
        print(chunk, end="", flush=True)

asyncio.run(stream_response())

Stream Processing

Collecting Chunks

def get_complete_response(client, query):
    chunks = []
    for chunk in client.run_generic_stream({"query": query}):
        chunks.append(chunk)
        # Optionally process each chunk
        print(chunk, end="", flush=True)
    
    # Return complete response
    return "".join(chunks)

client = RunAgentClient(agent_id="your-agent-id")
full_response = get_complete_response(client, "Your question here")

Processing Stream Data

def process_stream(client, query):
    word_count = 0
    
    for chunk in client.run_generic_stream({"query": query}):
        # Process each chunk
        words = chunk.split()
        word_count += len(words)
        
        # Could do other processing like sentiment analysis
        yield {
            "chunk": chunk,
            "words_so_far": word_count
        }

Advanced Streaming Patterns

Stream with Timeout

import time

def stream_with_timeout(client, query, timeout=30):
    start_time = time.time()
    
    for chunk in client.run_generic_stream({"query": query}):
        if time.time() - start_time > timeout:
            yield "\n[Response truncated due to timeout]"
            break
        yield chunk

Buffered Streaming

def buffered_stream(client, query, buffer_size=10):
    buffer = []
    
    for chunk in client.run_generic_stream({"query": query}):
        buffer.append(chunk)
        
        if len(buffer) >= buffer_size:
            # Process buffer
            yield "".join(buffer)
            buffer = []
    
    # Don't forget remaining data
    if buffer:
        yield "".join(buffer)

Stream with Progress

def stream_with_progress(client, query):
    chunks_received = 0
    
    for chunk in client.run_generic_stream({"query": query}):
        chunks_received += 1
        
        # Update progress (could be a callback, progress bar, etc.)
        progress = {
            "chunk": chunk,
            "chunks_received": chunks_received,
            "timestamp": time.time()
        }
        
        yield progress

Web Integration

Flask SSE Example

from flask import Flask, Response, request
from runagent import RunAgentClient

app = Flask(__name__)
client = RunAgentClient(agent_id="your-agent-id")

@app.route('/stream')
def stream():
    query = request.args.get('query', 'Hello')
    
    def generate():
        for chunk in client.run_generic_stream({"query": query}):
            yield f"data: {chunk}\n\n"
    
    return Response(
        generate(),
        mimetype="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no"
        }
    )

WebSocket Streaming

import asyncio
import websockets
from runagent import AsyncRunAgentClient

async def handle_websocket(websocket, path):
    client = AsyncRunAgentClient(agent_id="your-agent-id")
    
    async for message in websocket:
        data = json.loads(message)
        
        async for chunk in client.run_generic_stream(data):
            await websocket.send(chunk)

start_server = websockets.serve(handle_websocket, "localhost", 8765)
asyncio.run(start_server)

Error Handling in Streams

def safe_stream(client, query):
    try:
        for chunk in client.run_generic_stream({"query": query}):
            yield chunk
    except StreamError as e:
        yield f"\n[Stream error: {e}]"
    except Exception as e:
        yield f"\n[Unexpected error: {e}]"

Stream Transformation

Format Conversion

def stream_as_json(client, query):
    """Convert stream to JSON events"""
    for i, chunk in enumerate(client.run_generic_stream({"query": query})):
        yield json.dumps({
            "index": i,
            "content": chunk,
            "timestamp": time.time()
        }) + "\n"

Markdown Processing

def stream_markdown(client, query):
    """Process markdown in real-time"""
    buffer = ""
    
    for chunk in client.run_generic_stream({"query": query}):
        buffer += chunk
        
        # Check for complete markdown elements
        if "```" in buffer:
            # Process code blocks
            yield process_code_block(buffer)
            buffer = ""
        else:
            yield chunk

Performance Considerations

  1. Don’t Block the Stream

    # Good - process asynchronously
    async def process_stream(client, query):
        async for chunk in client.run_generic_stream(query):
            # Non-blocking processing
            asyncio.create_task(process_chunk(chunk))
            yield chunk
    
  2. Handle Backpressure

    def controlled_stream(client, query, max_buffer=1000):
        buffer_size = 0
        
        for chunk in client.run_generic_stream({"query": query}):
            buffer_size += len(chunk)
            
            if buffer_size > max_buffer:
                # Pause or slow down
                time.sleep(0.1)
                buffer_size = 0
            
            yield chunk
    

See Also