Overview
Streaming responses allow you to receive agent output in real-time as it’s generated, rather than waiting for the complete response. This provides better user experience for long-running operations, chat interfaces, and interactive applications.
Key Concepts
Streaming vs Synchronous
| Feature | Synchronous | Streaming |
|---|
| Command | runagent run | runagent run-stream |
| Connection | REST API | WebSocket |
| Response | Complete result at once | Real-time chunks |
| Tag Requirement | Any tag | Must end with _stream |
| Use Case | Quick operations | Long-running, interactive |
Entrypoint Naming Convention
Streaming entrypoints must end with _stream:
# Synchronous entrypoint
def chat_agent(message: str) -> str:
return "Complete response"
# Streaming entrypoint (note the _stream suffix)
def chat_agent_stream(message: str) -> Iterator[str]:
yield "Response "
yield "chunk "
yield "by "
yield "chunk"
Using CLI for Streaming
Basic Streaming Command
# Stream from cloud agent
runagent run-stream --id <agent-id> --tag chat_stream --message="Tell me a story"
# Stream from local agent
runagent run-stream --id <agent-id> --tag chat_stream --local --message="Tell me a story"
Command Options
| Option | Description | Required |
|---|
--id | Agent ID to run | Yes (or use —host/—port) |
--tag | Entrypoint tag (must end with _stream) | Yes |
--local | Use local agent instead of cloud | No |
--host | Host address (use with —port) | No |
--port | Port number (use with —host) | No |
--input | Path to JSON input file | No |
--timeout | Timeout in seconds | No |
Examples
Example 1: Basic Streaming
# Stream a story generation
runagent run-stream \
--id abc-123-def-456 \
--tag story_stream \
--prompt="Write a short story about a robot"
Create input.json:
{
"query": "Explain quantum computing",
"detail_level": "beginner"
}
# Stream with input file
runagent run-stream \
--id abc-123-def-456 \
--tag explain_stream \
--input input.json
Example 3: Local Agent Streaming
# Stream from locally running agent
runagent run-stream \
--id local-agent-123 \
--tag chat_stream \
--local \
--message="Hello, how are you?"
Example 4: With Host and Port
# Stream from custom host/port
runagent run-stream \
--host localhost \
--port 8080 \
--tag chat_stream \
--message="Test message"
Using SDKs for Streaming
Python SDK
from runagent import RunAgentClient
# Connect to streaming entrypoint
client = RunAgentClient(
agent_id="your_agent_id",
entrypoint_tag="chat_stream", # Must end with _stream
local=False # Set to True for local agents
)
# Stream responses
for chunk in client.run(message="Tell me a story"):
print(chunk, end="", flush=True)
JavaScript/TypeScript SDK
const { RunAgentClient } = require('runagent');
async function streamResponse() {
const client = new RunAgentClient({
agentId: 'your_agent_id',
entrypointTag: 'chat_stream', // Must end with _stream
local: false
});
await client.initialize();
const stream = await client.run({
message: 'Tell me a story'
});
for await (const chunk of stream) {
process.stdout.write(chunk);
}
}
streamResponse();
Go SDK
package main
import (
"context"
"fmt"
"github.com/runagent-dev/runagent-go/pkg/client"
)
func main() {
ctx := context.Background()
c, err := client.NewWithAddress(
"your_agent_id",
"chat_stream", // Must end with _stream
false,
"localhost",
8451,
)
if err != nil {
log.Fatal(err)
}
defer c.Close()
s, err := c.RunStream(ctx, map[string]interface{}{
"message": "Tell me a story",
})
if err != nil {
log.Fatal(err)
}
defer s.Close()
for {
data, hasMore, err := s.Next(ctx)
if err != nil {
log.Fatal(err)
}
if !hasMore {
break
}
fmt.Print(data)
}
}
Rust SDK
use runagent::client::RunAgentClient;
use serde_json::json;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = RunAgentClient::new(
"your_agent_id",
"chat_stream", // Must end with _stream
false
).await?;
let mut stream = client.run_stream(&[
("message", json!("Tell me a story"))
]).await?;
while let Some(chunk) = stream.next().await {
print!("{}", chunk?);
}
Ok(())
}
Creating Streaming Entrypoints
Python Streaming Function
from typing import Iterator
def chat_stream(message: str, user_id: str = "anonymous") -> Iterator[str]:
"""
Streaming chat agent that yields response chunks.
Note: Function name ends with _stream, and return type is Iterator[str]
"""
response_parts = [
"Hello ",
user_id,
"! ",
"You said: ",
message,
". ",
"Let me think about that...\n",
"Here's my response: ",
generate_response(message)
]
for part in response_parts:
yield part
# Simulate processing delay
import time
time.sleep(0.1)
Configuration
Add to runagent.config.json:
{
"agent_architecture": {
"entrypoints": [
{
"file": "main.py",
"module": "chat_stream",
"tag": "chat_stream"
}
]
}
}
Important: The entrypoint tag must end with _stream for streaming to work. The CLI command run-stream validates this requirement.
Best Practices
1. Use Streaming for Long Operations
Streaming is ideal for:
- Long text generation (stories, articles, explanations)
- Interactive chat (real-time conversation)
- Progress updates (status messages during processing)
- Large data processing (streaming results as they’re computed)
2. Chunk Size Considerations
# Good: Reasonable chunk sizes
def good_stream() -> Iterator[str]:
yield "Processing step 1...\n"
yield "Processing step 2...\n"
yield "Final result: " + result
# Avoid: Too small chunks (overhead)
def bad_stream() -> Iterator[str]:
for char in very_long_string:
yield char # Too granular
3. Error Handling in Streaming
from typing import Iterator
def robust_stream(query: str) -> Iterator[str]:
try:
yield "Starting processing...\n"
# Your processing logic
for result in process_query(query):
yield result + "\n"
yield "Processing complete!\n"
except Exception as e:
yield f"\nError occurred: {str(e)}\n"
raise
4. Client-Side Error Handling
from runagent import RunAgentClient, RunAgentError
client = RunAgentClient(
agent_id="your_agent_id",
entrypoint_tag="chat_stream",
local=False
)
try:
for chunk in client.run(message="Hello"):
print(chunk, end="", flush=True)
except RunAgentError as e:
print(f"\nStreaming error: {e}")
except KeyboardInterrupt:
print("\n\nStreaming interrupted by user")
Troubleshooting
Error: Tag must end with _stream
Problem:
❌ Execution failed: Streaming command requires entrypoint tag ending with '_stream'. Got: chat
Solution:
- Ensure your entrypoint tag ends with
_stream
- Check your
runagent.config.json configuration
- Use the correct tag:
chat_stream instead of chat
Error: Connection timeout
Problem: WebSocket connection times out during streaming
Solution:
# Increase timeout
runagent run-stream --id <agent-id> --tag chat_stream --timeout 300 --message="..."
Streaming stops unexpectedly
Problem: Stream ends without completing
Possible causes:
- Agent function raised an exception
- Network connection interrupted
- Agent timeout exceeded
Solution:
- Check agent logs:
runagent db logs --agent-id <id>
- Verify agent function handles errors gracefully
- Test with shorter inputs first
No output appears
Problem: Command runs but no output
Solution:
- Verify entrypoint is actually streaming (yields chunks)
- Check agent is running:
runagent db status --agent-id <id>
- Test with synchronous version first to verify agent works
WebSocket Overhead
Streaming uses WebSocket connections which have:
- Lower latency for real-time updates
- Persistent connection overhead
- Better for long-running operations
When to Use Streaming
✅ Use streaming when:
- Response time > 2 seconds
- User needs real-time feedback
- Generating long-form content
- Interactive applications
❌ Avoid streaming when:
- Quick responses (< 1 second)
- Simple data retrieval
- Batch processing (use async instead)
Advanced Patterns
Progressive Response Building
def smart_stream(query: str) -> Iterator[str]:
# Initial acknowledgment
yield "🔍 Analyzing your query...\n\n"
# Progressive results
yield "📊 Found relevant information:\n"
for item in search_results:
yield f" • {item}\n"
# Final summary
yield "\n✅ Analysis complete!"
Conditional Streaming
def conditional_stream(query: str, stream: bool = True) -> Iterator[str]:
if stream:
# Streaming mode
for chunk in process_streaming(query):
yield chunk
else:
# Non-streaming: yield complete result
result = process_complete(query)
yield result
Next Steps