Overview
Streaming responses allow you to receive agent output in real-time as it’s generated, rather than waiting for the complete response. This provides better user experience for long-running operations, chat interfaces, and interactive applications.
Key Concepts
Streaming vs Synchronous
Feature Synchronous Streaming Command runagent runrunagent run-streamConnection REST API WebSocket Response Complete result at once Real-time chunks Tag Requirement Any tag Must end with _stream Use Case Quick operations Long-running, interactive
Entrypoint Naming Convention
Streaming entrypoints must end with _stream:
# Synchronous entrypoint
def chat_agent ( message : str ) -> str :
return "Complete response"
# Streaming entrypoint (note the _stream suffix)
def chat_agent_stream ( message : str ) -> Iterator[ str ]:
yield "Response "
yield "chunk "
yield "by "
yield "chunk"
Using CLI for Streaming
Basic Streaming Command
# Stream from cloud agent
runagent run-stream --id < agent-i d > --tag chat_stream --message= "Tell me a story"
# Stream from local agent
runagent run-stream --id < agent-i d > --tag chat_stream --local --message= "Tell me a story"
Command Options
Option Description Required --idAgent ID to run Yes (or use —host/—port) --tagEntrypoint tag (must end with _stream) Yes --localUse local agent instead of cloud No --hostHost address (use with —port) No --portPort number (use with —host) No --inputPath to JSON input file No --timeoutTimeout in seconds No
Examples
Example 1: Basic Streaming
# Stream a story generation
runagent run-stream \
--id abc-123-def-456 \
--tag story_stream \
--prompt= "Write a short story about a robot"
Create input.json:
{
"query" : "Explain quantum computing" ,
"detail_level" : "beginner"
}
# Stream with input file
runagent run-stream \
--id abc-123-def-456 \
--tag explain_stream \
--input input.json
Example 3: Local Agent Streaming
# Stream from locally running agent
runagent run-stream \
--id local-agent-123 \
--tag chat_stream \
--local \
--message= "Hello, how are you?"
Example 4: With Host and Port
# Stream from custom host/port
runagent run-stream \
--host localhost \
--port 8080 \
--tag chat_stream \
--message= "Test message"
Using SDKs for Streaming
Python SDK
from runagent import RunAgentClient
# Connect to streaming entrypoint
client = RunAgentClient(
agent_id = "your_agent_id" ,
entrypoint_tag = "chat_stream" , # Must end with _stream
local = False # Set to True for local agents
)
# Stream responses
for chunk in client.run( message = "Tell me a story" ):
print (chunk, end = "" , flush = True )
JavaScript/TypeScript SDK
const { RunAgentClient } = require ( 'runagent' );
async function streamResponse () {
const client = new RunAgentClient ({
agentId: 'your_agent_id' ,
entrypointTag: 'chat_stream' , // Must end with _stream
local: false
});
await client . initialize ();
const stream = await client . run ({
message: 'Tell me a story'
});
for await ( const chunk of stream ) {
process . stdout . write ( chunk );
}
}
streamResponse ();
Go SDK
package main
import (
" context "
" fmt "
" github.com/runagent-dev/runagent-go/pkg/client "
)
func main () {
ctx := context . Background ()
c , err := client . NewWithAddress (
"your_agent_id" ,
"chat_stream" , // Must end with _stream
false ,
"localhost" ,
8451 ,
)
if err != nil {
log . Fatal ( err )
}
defer c . Close ()
s , err := c . RunStream ( ctx , map [ string ] interface {}{
"message" : "Tell me a story" ,
})
if err != nil {
log . Fatal ( err )
}
defer s . Close ()
for {
data , hasMore , err := s . Next ( ctx )
if err != nil {
log . Fatal ( err )
}
if ! hasMore {
break
}
fmt . Print ( data )
}
}
Rust SDK
use runagent :: client :: RunAgentClient ;
use serde_json :: json;
use futures :: StreamExt ;
#[tokio :: main]
async fn main () -> Result <(), Box < dyn std :: error :: Error >> {
let client = RunAgentClient :: new (
"your_agent_id" ,
"chat_stream" , // Must end with _stream
false
) . await ? ;
let mut stream = client . run_stream ( & [
( "message" , json! ( "Tell me a story" ))
]) . await ? ;
while let Some ( chunk ) = stream . next () . await {
print! ( "{}" , chunk ? );
}
Ok (())
}
Creating Streaming Entrypoints
Python Streaming Function
from typing import Iterator
def chat_stream ( message : str , user_id : str = "anonymous" ) -> Iterator[ str ]:
"""
Streaming chat agent that yields response chunks.
Note: Function name ends with _stream, and return type is Iterator[str]
"""
response_parts = [
"Hello " ,
user_id,
"! " ,
"You said: " ,
message,
". " ,
"Let me think about that... \n " ,
"Here's my response: " ,
generate_response(message)
]
for part in response_parts:
yield part
# Simulate processing delay
import time
time.sleep( 0.1 )
Configuration
Add to runagent.config.json:
{
"agent_architecture" : {
"entrypoints" : [
{
"file" : "main.py" ,
"module" : "chat_stream" ,
"tag" : "chat_stream"
}
]
}
}
Important: The entrypoint tag must end with _stream for streaming to work. The CLI command run-stream validates this requirement.
Best Practices
1. Use Streaming for Long Operations
Streaming is ideal for:
Long text generation (stories, articles, explanations)
Interactive chat (real-time conversation)
Progress updates (status messages during processing)
Large data processing (streaming results as they’re computed)
2. Chunk Size Considerations
# Good: Reasonable chunk sizes
def good_stream () -> Iterator[ str ]:
yield "Processing step 1... \n "
yield "Processing step 2... \n "
yield "Final result: " + result
# Avoid: Too small chunks (overhead)
def bad_stream () -> Iterator[ str ]:
for char in very_long_string:
yield char # Too granular
3. Error Handling in Streaming
from typing import Iterator
def robust_stream ( query : str ) -> Iterator[ str ]:
try :
yield "Starting processing... \n "
# Your processing logic
for result in process_query(query):
yield result + " \n "
yield "Processing complete! \n "
except Exception as e:
yield f " \n Error occurred: { str (e) } \n "
raise
4. Client-Side Error Handling
from runagent import RunAgentClient, RunAgentError
client = RunAgentClient(
agent_id = "your_agent_id" ,
entrypoint_tag = "chat_stream" ,
local = False
)
try :
for chunk in client.run( message = "Hello" ):
print (chunk, end = "" , flush = True )
except RunAgentError as e:
print ( f " \n Streaming error: { e } " )
except KeyboardInterrupt :
print ( " \n\n Streaming interrupted by user" )
Troubleshooting
Error: Tag must end with _stream
Problem:
❌ Execution failed: Streaming command requires entrypoint tag ending with '_stream'. Got: chat
Solution:
Ensure your entrypoint tag ends with _stream
Check your runagent.config.json configuration
Use the correct tag: chat_stream instead of chat
Error: Connection timeout
Problem: WebSocket connection times out during streaming
Solution:
# Increase timeout
runagent run-stream --id < agent-i d > --tag chat_stream --timeout 300 --message= "..."
Streaming stops unexpectedly
Problem: Stream ends without completing
Possible causes:
Agent function raised an exception
Network connection interrupted
Agent timeout exceeded
Solution:
Check agent logs: runagent db logs --agent-id <id>
Verify agent function handles errors gracefully
Test with shorter inputs first
No output appears
Problem: Command runs but no output
Solution:
Verify entrypoint is actually streaming (yields chunks)
Check agent is running: runagent db status --agent-id <id>
Test with synchronous version first to verify agent works
WebSocket Overhead
Streaming uses WebSocket connections which have:
Lower latency for real-time updates
Persistent connection overhead
Better for long-running operations
When to Use Streaming
✅ Use streaming when:
Response time > 2 seconds
User needs real-time feedback
Generating long-form content
Interactive applications
❌ Avoid streaming when:
Quick responses (< 1 second)
Simple data retrieval
Batch processing (use async instead)
Advanced Patterns
Progressive Response Building
def smart_stream ( query : str ) -> Iterator[ str ]:
# Initial acknowledgment
yield "🔍 Analyzing your query... \n\n "
# Progressive results
yield "📊 Found relevant information: \n "
for item in search_results:
yield f " • { item } \n "
# Final summary
yield " \n ✅ Analysis complete!"
Conditional Streaming
def conditional_stream ( query : str , stream : bool = True ) -> Iterator[ str ]:
if stream:
# Streaming mode
for chunk in process_streaming(query):
yield chunk
else :
# Non-streaming: yield complete result
result = process_complete(query)
yield result
Next Steps
SDK Documentation Learn more about SDK streaming capabilities
Core Concepts Understand entrypoints and streaming architecture
Production Considerations Best practices for production streaming
CLI Reference Complete CLI command reference