Skip to main content
Prerequisites: Completed Deploy Your First Agent tutorial and understand Core Concepts

Overview

Streaming responses allow you to receive agent output in real-time as it’s generated, rather than waiting for the complete response. This provides better user experience for long-running operations, chat interfaces, and interactive applications.

Key Concepts

Streaming vs Synchronous

FeatureSynchronousStreaming
Commandrunagent runrunagent run-stream
ConnectionREST APIWebSocket
ResponseComplete result at onceReal-time chunks
Tag RequirementAny tagMust end with _stream
Use CaseQuick operationsLong-running, interactive

Entrypoint Naming Convention

Streaming entrypoints must end with _stream:
# Synchronous entrypoint
def chat_agent(message: str) -> str:
    return "Complete response"

# Streaming entrypoint (note the _stream suffix)
def chat_agent_stream(message: str) -> Iterator[str]:
    yield "Response "
    yield "chunk "
    yield "by "
    yield "chunk"

Using CLI for Streaming

Basic Streaming Command

# Stream from cloud agent
runagent run-stream --id <agent-id> --tag chat_stream --message="Tell me a story"

# Stream from local agent
runagent run-stream --id <agent-id> --tag chat_stream --local --message="Tell me a story"

Command Options

OptionDescriptionRequired
--idAgent ID to runYes (or use —host/—port)
--tagEntrypoint tag (must end with _stream)Yes
--localUse local agent instead of cloudNo
--hostHost address (use with —port)No
--portPort number (use with —host)No
--inputPath to JSON input fileNo
--timeoutTimeout in secondsNo

Examples

Example 1: Basic Streaming

# Stream a story generation
runagent run-stream \
  --id abc-123-def-456 \
  --tag story_stream \
  --prompt="Write a short story about a robot"

Example 2: Using Input File

Create input.json:
{
  "query": "Explain quantum computing",
  "detail_level": "beginner"
}
# Stream with input file
runagent run-stream \
  --id abc-123-def-456 \
  --tag explain_stream \
  --input input.json

Example 3: Local Agent Streaming

# Stream from locally running agent
runagent run-stream \
  --id local-agent-123 \
  --tag chat_stream \
  --local \
  --message="Hello, how are you?"

Example 4: With Host and Port

# Stream from custom host/port
runagent run-stream \
  --host localhost \
  --port 8080 \
  --tag chat_stream \
  --message="Test message"

Using SDKs for Streaming

Python SDK

from runagent import RunAgentClient

# Connect to streaming entrypoint
client = RunAgentClient(
    agent_id="your_agent_id",
    entrypoint_tag="chat_stream",  # Must end with _stream
    local=False  # Set to True for local agents
)

# Stream responses
for chunk in client.run(message="Tell me a story"):
    print(chunk, end="", flush=True)

JavaScript/TypeScript SDK

const { RunAgentClient } = require('runagent');

async function streamResponse() {
    const client = new RunAgentClient({
        agentId: 'your_agent_id',
        entrypointTag: 'chat_stream',  // Must end with _stream
        local: false
    });

    await client.initialize();

    const stream = await client.run({
        message: 'Tell me a story'
    });

    for await (const chunk of stream) {
        process.stdout.write(chunk);
    }
}

streamResponse();

Go SDK

package main

import (
    "context"
    "fmt"
    "github.com/runagent-dev/runagent-go/pkg/client"
)

func main() {
    ctx := context.Background()
    
    c, err := client.NewWithAddress(
        "your_agent_id",
        "chat_stream",  // Must end with _stream
        false,
        "localhost",
        8451,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    s, err := c.RunStream(ctx, map[string]interface{}{
        "message": "Tell me a story",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer s.Close()

    for {
        data, hasMore, err := s.Next(ctx)
        if err != nil {
            log.Fatal(err)
        }
        if !hasMore {
            break
        }
        fmt.Print(data)
    }
}

Rust SDK

use runagent::client::RunAgentClient;
use serde_json::json;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = RunAgentClient::new(
        "your_agent_id",
        "chat_stream",  // Must end with _stream
        false
    ).await?;

    let mut stream = client.run_stream(&[
        ("message", json!("Tell me a story"))
    ]).await?;

    while let Some(chunk) = stream.next().await {
        print!("{}", chunk?);
    }

    Ok(())
}

Creating Streaming Entrypoints

Python Streaming Function

from typing import Iterator

def chat_stream(message: str, user_id: str = "anonymous") -> Iterator[str]:
    """
    Streaming chat agent that yields response chunks.
    
    Note: Function name ends with _stream, and return type is Iterator[str]
    """
    response_parts = [
        "Hello ",
        user_id,
        "! ",
        "You said: ",
        message,
        ". ",
        "Let me think about that...\n",
        "Here's my response: ",
        generate_response(message)
    ]
    
    for part in response_parts:
        yield part
        # Simulate processing delay
        import time
        time.sleep(0.1)

Configuration

Add to runagent.config.json:
{
  "agent_architecture": {
    "entrypoints": [
      {
        "file": "main.py",
        "module": "chat_stream",
        "tag": "chat_stream"
      }
    ]
  }
}
Important: The entrypoint tag must end with _stream for streaming to work. The CLI command run-stream validates this requirement.

Best Practices

1. Use Streaming for Long Operations

Streaming is ideal for:
  • Long text generation (stories, articles, explanations)
  • Interactive chat (real-time conversation)
  • Progress updates (status messages during processing)
  • Large data processing (streaming results as they’re computed)

2. Chunk Size Considerations

# Good: Reasonable chunk sizes
def good_stream() -> Iterator[str]:
    yield "Processing step 1...\n"
    yield "Processing step 2...\n"
    yield "Final result: " + result

# Avoid: Too small chunks (overhead)
def bad_stream() -> Iterator[str]:
    for char in very_long_string:
        yield char  # Too granular

3. Error Handling in Streaming

from typing import Iterator

def robust_stream(query: str) -> Iterator[str]:
    try:
        yield "Starting processing...\n"
        
        # Your processing logic
        for result in process_query(query):
            yield result + "\n"
            
        yield "Processing complete!\n"
    except Exception as e:
        yield f"\nError occurred: {str(e)}\n"
        raise

4. Client-Side Error Handling

from runagent import RunAgentClient, RunAgentError

client = RunAgentClient(
    agent_id="your_agent_id",
    entrypoint_tag="chat_stream",
    local=False
)

try:
    for chunk in client.run(message="Hello"):
        print(chunk, end="", flush=True)
except RunAgentError as e:
    print(f"\nStreaming error: {e}")
except KeyboardInterrupt:
    print("\n\nStreaming interrupted by user")

Troubleshooting

Error: Tag must end with _stream

Problem:
❌ Execution failed: Streaming command requires entrypoint tag ending with '_stream'. Got: chat
Solution:
  • Ensure your entrypoint tag ends with _stream
  • Check your runagent.config.json configuration
  • Use the correct tag: chat_stream instead of chat

Error: Connection timeout

Problem: WebSocket connection times out during streaming Solution:
# Increase timeout
runagent run-stream --id <agent-id> --tag chat_stream --timeout 300 --message="..."

Streaming stops unexpectedly

Problem: Stream ends without completing Possible causes:
  • Agent function raised an exception
  • Network connection interrupted
  • Agent timeout exceeded
Solution:
  • Check agent logs: runagent db logs --agent-id <id>
  • Verify agent function handles errors gracefully
  • Test with shorter inputs first

No output appears

Problem: Command runs but no output Solution:
  • Verify entrypoint is actually streaming (yields chunks)
  • Check agent is running: runagent db status --agent-id <id>
  • Test with synchronous version first to verify agent works

Performance Considerations

WebSocket Overhead

Streaming uses WebSocket connections which have:
  • Lower latency for real-time updates
  • Persistent connection overhead
  • Better for long-running operations

When to Use Streaming

Use streaming when:
  • Response time > 2 seconds
  • User needs real-time feedback
  • Generating long-form content
  • Interactive applications
Avoid streaming when:
  • Quick responses (< 1 second)
  • Simple data retrieval
  • Batch processing (use async instead)

Advanced Patterns

Progressive Response Building

def smart_stream(query: str) -> Iterator[str]:
    # Initial acknowledgment
    yield "🔍 Analyzing your query...\n\n"
    
    # Progressive results
    yield "📊 Found relevant information:\n"
    for item in search_results:
        yield f"  • {item}\n"
    
    # Final summary
    yield "\n✅ Analysis complete!"

Conditional Streaming

def conditional_stream(query: str, stream: bool = True) -> Iterator[str]:
    if stream:
        # Streaming mode
        for chunk in process_streaming(query):
            yield chunk
    else:
        # Non-streaming: yield complete result
        result = process_complete(query)
        yield result

Next Steps

SDK Documentation

Learn more about SDK streaming capabilities

Core Concepts

Understand entrypoints and streaming architecture

Production Considerations

Best practices for production streaming

CLI Reference

Complete CLI command reference

Still have a question?