The Rust SDK provides efficient streaming capabilities for real-time agent interactions using async streams.
use futures::StreamExt;
use runagent::client::RunAgentClient;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = RunAgentClient::new("agent-id", "generic_stream", true).await?;
let mut stream = client.run_stream(&[
("query", json!("Tell me a long story"))
]).await?;
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => print!("{}", chunk),
Err(e) => {
println!("Error: {}", e);
break;
}
}
}
Ok(())
}
use futures::StreamExt;
use serde_json::Value;
async fn collect_stream_response(
client: &RunAgentClient,
query: &str
) -> Result<String, Box<dyn std::error::Error>> {
let mut stream = client.run_stream(&[
("query", json!(query))
]).await?;
let mut collected = String::new();
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => {
if let Some(content) = chunk.get("content") {
if let Some(text) = content.as_str() {
collected.push_str(text);
}
}
}
Err(e) => return Err(e.into()),
}
}
Ok(collected)
}
use futures::{Stream, StreamExt, TryStreamExt};
use serde_json::Value;
async fn transform_stream(
client: &RunAgentClient,
query: &str
) -> impl Stream<Item = Result<String, Box<dyn std::error::Error>>> {
let stream = client.run_stream(&[
("query", json!(query))
]).await.unwrap();
stream.map(|chunk_result| {
chunk_result
.map_err(|e| e.into())
.and_then(|chunk| {
chunk.get("content")
.and_then(|v| v.as_str())
.map(|s| s.to_uppercase())
.ok_or_else(|| "No content in chunk".into())
})
})
}
let client = RunAgentClient::new("langchain-agent", "generic_stream", true).await?;
let mut stream = client.run_stream(&[
("messages", json!([
{"role": "user", "content": "Explain machine learning"}
]))
]).await?;
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => {
if let Some(content) = chunk.get("content") {
print!("{}", content.as_str().unwrap_or(""));
}
}
Err(e) => break,
}
}
let client = RunAgentClient::new("autogen-agent", "autogen_token_stream", true).await?;
let mut stream = client.run_stream(&[
("task", json!("Write a brief summary"))
]).await?;
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => {
if chunk.get("type").and_then(|v| v.as_str()) == Some("ModelClientStreamingChunkEvent") {
if let Some(delta) = chunk.get("delta").and_then(|v| v.as_str()) {
print!("{}", delta);
}
}
}
Err(e) => break,
}
}
use futures::StreamExt;
use std::collections::VecDeque;
struct BufferedStreamer {
buffer: VecDeque<String>,
buffer_size: usize,
}
impl BufferedStreamer {
fn new(buffer_size: usize) -> Self {
Self {
buffer: VecDeque::new(),
buffer_size,
}
}
async fn process_stream(
&mut self,
mut stream: impl Stream<Item = Result<Value, RunAgentError>> + Unpin
) -> Result<(), Box<dyn std::error::Error>> {
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => {
if let Some(content) = chunk.get("content").and_then(|v| v.as_str()) {
self.buffer.push_back(content.to_string());
if self.buffer.len() >= self.buffer_size {
self.flush_buffer();
}
}
}
Err(e) => return Err(e.into()),
}
}
// Flush remaining buffer
self.flush_buffer();
Ok(())
}
fn flush_buffer(&mut self) {
let combined: String = self.buffer.drain(..).collect();
println!("{}", combined);
}
}
use tokio::time::{timeout, Duration};
use futures::StreamExt;
async fn stream_with_timeout(
client: &RunAgentClient,
query: &str,
timeout_duration: Duration
) -> Result<(), Box<dyn std::error::Error>> {
let stream = client.run_stream(&[
("query", json!(query))
]).await?;
let mut stream = Box::pin(stream);
loop {
match timeout(timeout_duration, stream.next()).await {
Ok(Some(Ok(chunk))) => {
println!("Received: {}", chunk);
}
Ok(Some(Err(e))) => {
println!("Stream error: {}", e);
break;
}
Ok(None) => {
println!("Stream ended");
break;
}
Err(_) => {
println!("Stream timed out");
break;
}
}
}
Ok(())
}
For web applications, you can bridge streams to WebSockets:
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
response::Response,
};
use futures::{SinkExt, StreamExt};
async fn websocket_handler(
ws: WebSocketUpgrade,
client: Arc<RunAgentClient>
) -> Response {
ws.on_upgrade(move |socket| handle_socket(socket, client))
}
async fn handle_socket(socket: WebSocket, client: Arc<RunAgentClient>) {
let (mut sender, mut receiver) = socket.split();
while let Some(msg) = receiver.next().await {
if let Ok(Message::Text(text)) = msg {
let query = serde_json::from_str::<serde_json::Value>(&text).unwrap();
if let Ok(mut stream) = client.run_stream(&[
("query", query)
]).await {
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => {
let msg = Message::Text(chunk.to_string());
if sender.send(msg).await.is_err() {
break;
}
}
Err(_) => break,
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
#[tokio::test]
async fn test_stream_processing() {
let client = RunAgentClient::new("test-agent", "generic_stream", true).await.unwrap();
let mut stream = client.run_stream(&[
("query", json!("test query"))
]).await.unwrap();
let mut count = 0;
while let Some(chunk_result) = stream.next().await {
assert!(chunk_result.is_ok());
count += 1;
if count > 5 { break; } // Limit for testing
}
assert!(count > 0);
}
}