Skip to main content
Prerequisites: Completed Deploy Your First Agent tutorial and understand Core Concepts

Overview

Streaming responses allow you to receive agent output in real-time as it’s generated, rather than waiting for the complete response. This provides better user experience for long-running operations, chat interfaces, and interactive applications.

Key Concepts

Streaming vs Synchronous

FeatureSynchronousStreaming
Commandrunagent runrunagent run-stream
ConnectionREST APIWebSocket
ResponseComplete result at onceReal-time chunks
Tag RequirementAny tagMust end with _stream
Use CaseQuick operationsLong-running, interactive

Entrypoint Naming Convention

Streaming entrypoints must end with _stream:
# Synchronous entrypoint
def chat_agent(message: str) -> str:
    return "Complete response"

# Streaming entrypoint (note the _stream suffix)
def chat_agent_stream(message: str) -> Iterator[str]:
    yield "Response "
    yield "chunk "
    yield "by "
    yield "chunk"

Using CLI for Streaming

Basic Streaming Command

# Stream from cloud agent
runagent run-stream --id <agent-id> --tag chat_stream --message="Tell me a story"

# Stream from local agent
runagent run-stream --id <agent-id> --tag chat_stream --local --message="Tell me a story"

Command Options

OptionDescriptionRequired
--idAgent ID to runYes (or use —host/—port)
--tagEntrypoint tag (must end with _stream)Yes
--localUse local agent instead of cloudNo
--hostHost address (use with —port)No
--portPort number (use with —host)No
--inputPath to JSON input fileNo
--timeoutTimeout in secondsNo

Examples

Example 1: Basic Streaming

# Stream a story generation
runagent run-stream \
  --id abc-123-def-456 \
  --tag story_stream \
  --prompt="Write a short story about a robot"

Example 2: Using Input File

Create input.json:
{
  "query": "Explain quantum computing",
  "detail_level": "beginner"
}
# Stream with input file
runagent run-stream \
  --id abc-123-def-456 \
  --tag explain_stream \
  --input input.json

Example 3: Local Agent Streaming

# Stream from locally running agent
runagent run-stream \
  --id local-agent-123 \
  --tag chat_stream \
  --local \
  --message="Hello, how are you?"

Example 4: With Host and Port

# Stream from custom host/port
runagent run-stream \
  --host localhost \
  --port 8080 \
  --tag chat_stream \
  --message="Test message"

Using SDKs for Streaming

Python SDK

from runagent import RunAgentClient

# Connect to streaming entrypoint
client = RunAgentClient(
    agent_id="your_agent_id",
    entrypoint_tag="chat_stream",  # Must end with _stream
    local=False  # Set to True for local agents
)

# Stream responses
for chunk in client.run(message="Tell me a story"):
    print(chunk, end="", flush=True)

JavaScript/TypeScript SDK

const { RunAgentClient } = require('runagent');

async function streamResponse() {
    const client = new RunAgentClient({
        agentId: 'your_agent_id',
        entrypointTag: 'chat_stream',  // Must end with _stream
        local: false
    });

    await client.initialize();

    const stream = await client.run({
        message: 'Tell me a story'
    });

    for await (const chunk of stream) {
        process.stdout.write(chunk);
    }
}

streamResponse();

Go SDK

package main

import (
    "context"
    "fmt"
    "github.com/runagent-dev/runagent-go/pkg/client"
)

func main() {
    ctx := context.Background()
    
    c, err := client.NewWithAddress(
        "your_agent_id",
        "chat_stream",  // Must end with _stream
        false,
        "localhost",
        8451,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    s, err := c.RunStream(ctx, map[string]interface{}{
        "message": "Tell me a story",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer s.Close()

    for {
        data, hasMore, err := s.Next(ctx)
        if err != nil {
            log.Fatal(err)
        }
        if !hasMore {
            break
        }
        fmt.Print(data)
    }
}

Rust SDK

use runagent::client::RunAgentClient;
use serde_json::json;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = RunAgentClient::new(
        "your_agent_id",
        "chat_stream",  // Must end with _stream
        false
    ).await?;

    let mut stream = client.run_stream(&[
        ("message", json!("Tell me a story"))
    ]).await?;

    while let Some(chunk) = stream.next().await {
        print!("{}", chunk?);
    }

    Ok(())
}

Creating Streaming Entrypoints

Python Streaming Function

from typing import Iterator

def chat_stream(message: str, user_id: str = "anonymous") -> Iterator[str]:
    """
    Streaming chat agent that yields response chunks.
    
    Note: Function name ends with _stream, and return type is Iterator[str]
    """
    response_parts = [
        "Hello ",
        user_id,
        "! ",
        "You said: ",
        message,
        ". ",
        "Let me think about that...\n",
        "Here's my response: ",
        generate_response(message)
    ]
    
    for part in response_parts:
        yield part
        # Simulate processing delay
        import time
        time.sleep(0.1)

Configuration

Add to runagent.config.json:
{
  "agent_architecture": {
    "entrypoints": [
      {
        "file": "main.py",
        "module": "chat_stream",
        "tag": "chat_stream"
      }
    ]
  }
}
Important: The entrypoint tag must end with _stream for streaming to work. The CLI command run-stream validates this requirement.

Best Practices

1. Use Streaming for Long Operations

Streaming is ideal for:
  • Long text generation (stories, articles, explanations)
  • Interactive chat (real-time conversation)
  • Progress updates (status messages during processing)
  • Large data processing (streaming results as they’re computed)

2. Chunk Size Considerations

# Good: Reasonable chunk sizes
def good_stream() -> Iterator[str]:
    yield "Processing step 1...\n"
    yield "Processing step 2...\n"
    yield "Final result: " + result

# Avoid: Too small chunks (overhead)
def bad_stream() -> Iterator[str]:
    for char in very_long_string:
        yield char  # Too granular

3. Error Handling in Streaming

from typing import Iterator

def robust_stream(query: str) -> Iterator[str]:
    try:
        yield "Starting processing...\n"
        
        # Your processing logic
        for result in process_query(query):
            yield result + "\n"
            
        yield "Processing complete!\n"
    except Exception as e:
        yield f"\nError occurred: {str(e)}\n"
        raise

4. Client-Side Error Handling

from runagent import RunAgentClient, RunAgentError

client = RunAgentClient(
    agent_id="your_agent_id",
    entrypoint_tag="chat_stream",
    local=False
)

try:
    for chunk in client.run(message="Hello"):
        print(chunk, end="", flush=True)
except RunAgentError as e:
    print(f"\nStreaming error: {e}")
except KeyboardInterrupt:
    print("\n\nStreaming interrupted by user")

Troubleshooting

Error: Tag must end with _stream

Problem:
❌ Execution failed: Streaming command requires entrypoint tag ending with '_stream'. Got: chat
Solution:
  • Ensure your entrypoint tag ends with _stream
  • Check your runagent.config.json configuration
  • Use the correct tag: chat_stream instead of chat

Error: Connection timeout

Problem: WebSocket connection times out during streaming Solution:
# Increase timeout
runagent run-stream --id <agent-id> --tag chat_stream --timeout 300 --message="..."

Streaming stops unexpectedly

Problem: Stream ends without completing Possible causes:
  • Agent function raised an exception
  • Network connection interrupted
  • Agent timeout exceeded
Solution:
  • Check agent logs: runagent db logs --agent-id <id>
  • Verify agent function handles errors gracefully
  • Test with shorter inputs first

No output appears

Problem: Command runs but no output Solution:
  • Verify entrypoint is actually streaming (yields chunks)
  • Check agent is running: runagent db status --agent-id <id>
  • Test with synchronous version first to verify agent works

Performance Considerations

WebSocket Overhead

Streaming uses WebSocket connections which have:
  • Lower latency for real-time updates
  • Persistent connection overhead
  • Better for long-running operations

When to Use Streaming

Use streaming when:
  • Response time > 2 seconds
  • User needs real-time feedback
  • Generating long-form content
  • Interactive applications
Avoid streaming when:
  • Quick responses (< 1 second)
  • Simple data retrieval
  • Batch processing (use async instead)

Advanced Patterns

Progressive Response Building

def smart_stream(query: str) -> Iterator[str]:
    # Initial acknowledgment
    yield "🔍 Analyzing your query...\n\n"
    
    # Progressive results
    yield "📊 Found relevant information:\n"
    for item in search_results:
        yield f"  • {item}\n"
    
    # Final summary
    yield "\n✅ Analysis complete!"

Conditional Streaming

def conditional_stream(query: str, stream: bool = True) -> Iterator[str]:
    if stream:
        # Streaming mode
        for chunk in process_streaming(query):
            yield chunk
    else:
        # Non-streaming: yield complete result
        result = process_complete(query)
        yield result

Next Steps