The Rust SDK provides efficient streaming capabilities for real-time agent interactions using async streams.

Basic Streaming

use futures::StreamExt;
use runagent::client::RunAgentClient;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = RunAgentClient::new("agent-id", "generic_stream", true).await?;
    
    let mut stream = client.run_stream(&[
        ("query", json!("Tell me a long story"))
    ]).await?;
    
    while let Some(chunk_result) = stream.next().await {
        match chunk_result {
            Ok(chunk) => print!("{}", chunk),
            Err(e) => {
                println!("Error: {}", e);
                break;
            }
        }
    }
    
    Ok(())
}

Stream Processing Patterns

Collecting Stream Data

use futures::StreamExt;
use serde_json::Value;

async fn collect_stream_response(
    client: &RunAgentClient,
    query: &str
) -> Result<String, Box<dyn std::error::Error>> {
    let mut stream = client.run_stream(&[
        ("query", json!(query))
    ]).await?;
    
    let mut collected = String::new();
    
    while let Some(chunk_result) = stream.next().await {
        match chunk_result {
            Ok(chunk) => {
                if let Some(content) = chunk.get("content") {
                    if let Some(text) = content.as_str() {
                        collected.push_str(text);
                    }
                }
            }
            Err(e) => return Err(e.into()),
        }
    }
    
    Ok(collected)
}

Stream Transformation

use futures::{Stream, StreamExt, TryStreamExt};
use serde_json::Value;

async fn transform_stream(
    client: &RunAgentClient,
    query: &str
) -> impl Stream<Item = Result<String, Box<dyn std::error::Error>>> {
    let stream = client.run_stream(&[
        ("query", json!(query))
    ]).await.unwrap();
    
    stream.map(|chunk_result| {
        chunk_result
            .map_err(|e| e.into())
            .and_then(|chunk| {
                chunk.get("content")
                    .and_then(|v| v.as_str())
                    .map(|s| s.to_uppercase())
                    .ok_or_else(|| "No content in chunk".into())
            })
    })
}

Framework-Specific Streaming

LangChain Streaming

let client = RunAgentClient::new("langchain-agent", "generic_stream", true).await?;

let mut stream = client.run_stream(&[
    ("messages", json!([
        {"role": "user", "content": "Explain machine learning"}
    ]))
]).await?;

while let Some(chunk_result) = stream.next().await {
    match chunk_result {
        Ok(chunk) => {
            if let Some(content) = chunk.get("content") {
                print!("{}", content.as_str().unwrap_or(""));
            }
        }
        Err(e) => break,
    }
}

AutoGen Token Streaming

let client = RunAgentClient::new("autogen-agent", "autogen_token_stream", true).await?;

let mut stream = client.run_stream(&[
    ("task", json!("Write a brief summary"))
]).await?;

while let Some(chunk_result) = stream.next().await {
    match chunk_result {
        Ok(chunk) => {
            if chunk.get("type").and_then(|v| v.as_str()) == Some("ModelClientStreamingChunkEvent") {
                if let Some(delta) = chunk.get("delta").and_then(|v| v.as_str()) {
                    print!("{}", delta);
                }
            }
        }
        Err(e) => break,
    }
}

Advanced Stream Operations

Buffered Streaming

use futures::StreamExt;
use std::collections::VecDeque;

struct BufferedStreamer {
    buffer: VecDeque<String>,
    buffer_size: usize,
}

impl BufferedStreamer {
    fn new(buffer_size: usize) -> Self {
        Self {
            buffer: VecDeque::new(),
            buffer_size,
        }
    }
    
    async fn process_stream(
        &mut self,
        mut stream: impl Stream<Item = Result<Value, RunAgentError>> + Unpin
    ) -> Result<(), Box<dyn std::error::Error>> {
        while let Some(chunk_result) = stream.next().await {
            match chunk_result {
                Ok(chunk) => {
                    if let Some(content) = chunk.get("content").and_then(|v| v.as_str()) {
                        self.buffer.push_back(content.to_string());
                        
                        if self.buffer.len() >= self.buffer_size {
                            self.flush_buffer();
                        }
                    }
                }
                Err(e) => return Err(e.into()),
            }
        }
        
        // Flush remaining buffer
        self.flush_buffer();
        Ok(())
    }
    
    fn flush_buffer(&mut self) {
        let combined: String = self.buffer.drain(..).collect();
        println!("{}", combined);
    }
}

Stream with Timeout

use tokio::time::{timeout, Duration};
use futures::StreamExt;

async fn stream_with_timeout(
    client: &RunAgentClient,
    query: &str,
    timeout_duration: Duration
) -> Result<(), Box<dyn std::error::Error>> {
    let stream = client.run_stream(&[
        ("query", json!(query))
    ]).await?;
    
    let mut stream = Box::pin(stream);
    
    loop {
        match timeout(timeout_duration, stream.next()).await {
            Ok(Some(Ok(chunk))) => {
                println!("Received: {}", chunk);
            }
            Ok(Some(Err(e))) => {
                println!("Stream error: {}", e);
                break;
            }
            Ok(None) => {
                println!("Stream ended");
                break;
            }
            Err(_) => {
                println!("Stream timed out");
                break;
            }
        }
    }
    
    Ok(())
}

WebSocket Integration

For web applications, you can bridge streams to WebSockets:

use axum::{
    extract::ws::{Message, WebSocket, WebSocketUpgrade},
    response::Response,
};
use futures::{SinkExt, StreamExt};

async fn websocket_handler(
    ws: WebSocketUpgrade,
    client: Arc<RunAgentClient>
) -> Response {
    ws.on_upgrade(move |socket| handle_socket(socket, client))
}

async fn handle_socket(socket: WebSocket, client: Arc<RunAgentClient>) {
    let (mut sender, mut receiver) = socket.split();
    
    while let Some(msg) = receiver.next().await {
        if let Ok(Message::Text(text)) = msg {
            let query = serde_json::from_str::<serde_json::Value>(&text).unwrap();
            
            if let Ok(mut stream) = client.run_stream(&[
                ("query", query)
            ]).await {
                while let Some(chunk_result) = stream.next().await {
                    match chunk_result {
                        Ok(chunk) => {
                            let msg = Message::Text(chunk.to_string());
                            if sender.send(msg).await.is_err() {
                                break;
                            }
                        }
                        Err(_) => break,
                    }
                }
            }
        }
    }
}

Performance Considerations

  1. Stream Buffering: Use buffering for high-throughput scenarios
  2. Memory Management: Process chunks immediately to avoid memory buildup
  3. Error Handling: Handle stream errors gracefully
  4. Timeouts: Set appropriate timeouts for stream operations

Testing Streams

#[cfg(test)]
mod tests {
    use super::*;
    use futures::StreamExt;
    
    #[tokio::test]
    async fn test_stream_processing() {
        let client = RunAgentClient::new("test-agent", "generic_stream", true).await.unwrap();
        
        let mut stream = client.run_stream(&[
            ("query", json!("test query"))
        ]).await.unwrap();
        
        let mut count = 0;
        while let Some(chunk_result) = stream.next().await {
            assert!(chunk_result.is_ok());
            count += 1;
            if count > 5 { break; } // Limit for testing
        }
        
        assert!(count > 0);
    }
}