Skip to main content
Prerequisites: Completed the Deploy Your First Agent tutorial

What You’ll Build

In this tutorial, you’ll create a Python agent and demonstrate how to call it from:
  • Python (native SDK)
  • JavaScript (Node.js and browser)
  • Rust (high-performance systems)
  • Go (concurrent applications)
This showcases RunAgent’s core value proposition: write once in Python, use everywhere.

The Multi-Language Challenge

Traditional AI agent deployment requires:
  • Language-specific implementations for each target language
  • API design and maintenance across multiple interfaces
  • Consistent behavior across different language bindings
  • Real-time streaming support in each language
RunAgent eliminates all of this complexity by automatically generating native-feeling SDKs for every supported language.

Step 1: Create Your Multi-Language Agent

Let’s build a data analysis agent that can be called from any language:
runagent init data-analyzer --framework custom
cd data-analyzer

Step 2: Build the Data Analysis Agent

Replace main.py with a comprehensive data analysis agent:
main.py
from typing import Iterator, Dict, Any, List
import json
import statistics
from datetime import datetime

class DataAnalyzer:
    def __init__(self):
        self.supported_formats = ["json", "csv", "xml"]
        self.analysis_types = ["summary", "trends", "outliers", "correlations"]
    
    def analyze_data(self, data: List[Dict], analysis_type: str = "summary") -> Dict[str, Any]:
        """Analyze a dataset and return insights"""
        if not data:
            return {"error": "No data provided for analysis"}
        
        # Extract numeric fields
        numeric_fields = {}
        for item in data:
            for key, value in item.items():
                if isinstance(value, (int, float)):
                    if key not in numeric_fields:
                        numeric_fields[key] = []
                    numeric_fields[key].append(value)
        
        result = {
            "analysis_type": analysis_type,
            "total_records": len(data),
            "numeric_fields": list(numeric_fields.keys()),
            "timestamp": datetime.now().isoformat()
        }
        
        if analysis_type == "summary":
            result["summary"] = self._generate_summary(numeric_fields)
        elif analysis_type == "trends":
            result["trends"] = self._analyze_trends(numeric_fields)
        elif analysis_type == "outliers":
            result["outliers"] = self._find_outliers(numeric_fields)
        elif analysis_type == "correlations":
            result["correlations"] = self._find_correlations(numeric_fields)
        
        return result
    
    def _generate_summary(self, numeric_fields: Dict[str, List[float]]) -> Dict[str, Any]:
        """Generate statistical summary for numeric fields"""
        summary = {}
        for field, values in numeric_fields.items():
            summary[field] = {
                "count": len(values),
                "mean": round(statistics.mean(values), 2),
                "median": round(statistics.median(values), 2),
                "min": min(values),
                "max": max(values),
                "std_dev": round(statistics.stdev(values) if len(values) > 1 else 0, 2)
            }
        return summary
    
    def _analyze_trends(self, numeric_fields: Dict[str, List[float]]) -> Dict[str, str]:
        """Analyze trends in numeric data"""
        trends = {}
        for field, values in numeric_fields.items():
            if len(values) < 2:
                trends[field] = "Insufficient data for trend analysis"
                continue
            
            # Simple trend analysis
            first_half = values[:len(values)//2]
            second_half = values[len(values)//2:]
            
            first_avg = statistics.mean(first_half)
            second_avg = statistics.mean(second_half)
            
            if second_avg > first_avg * 1.1:
                trends[field] = "Increasing trend"
            elif second_avg < first_avg * 0.9:
                trends[field] = "Decreasing trend"
            else:
                trends[field] = "Stable trend"
        
        return trends
    
    def _find_outliers(self, numeric_fields: Dict[str, List[float]]) -> Dict[str, List[float]]:
        """Find outliers using IQR method"""
        outliers = {}
        for field, values in numeric_fields.items():
            if len(values) < 4:
                outliers[field] = []
                continue
            
            q1 = statistics.quantiles(values, n=4)[0]
            q3 = statistics.quantiles(values, n=4)[2]
            iqr = q3 - q1
            
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr
            
            field_outliers = [v for v in values if v < lower_bound or v > upper_bound]
            outliers[field] = field_outliers
        
        return outliers
    
    def _find_correlations(self, numeric_fields: Dict[str, List[float]]) -> Dict[str, float]:
        """Find correlations between numeric fields"""
        correlations = {}
        field_names = list(numeric_fields.keys())
        
        for i, field1 in enumerate(field_names):
            for field2 in field_names[i+1:]:
                values1 = numeric_fields[field1]
                values2 = numeric_fields[field2]
                
                if len(values1) == len(values2) and len(values1) > 1:
                    try:
                        corr = statistics.correlation(values1, values2)
                        correlations[f"{field1}_vs_{field2}"] = round(corr, 3)
                    except:
                        correlations[f"{field1}_vs_{field2}"] = 0.0
        
        return correlations

def analyze_data_sync(data: List[Dict], analysis_type: str = "summary") -> Dict[str, Any]:
    """Synchronous data analysis"""
    analyzer = DataAnalyzer()
    return analyzer.analyze_data(data, analysis_type)

def analyze_data_stream(data: List[Dict], analysis_type: str = "summary") -> Iterator[str]:
    """Streaming data analysis with progress updates"""
    analyzer = DataAnalyzer()
    
    yield f"🔍 Starting {analysis_type} analysis of {len(data)} records...\n\n"
    
    if analysis_type == "summary":
        yield "📊 Generating statistical summary...\n"
        result = analyzer.analyze_data(data, analysis_type)
        
        yield f"📈 Found {len(result['numeric_fields'])} numeric fields: {', '.join(result['numeric_fields'])}\n\n"
        
        for field, stats in result['summary'].items():
            yield f"**{field}**:\n"
            yield f"  • Count: {stats['count']}\n"
            yield f"  • Mean: {stats['mean']}\n"
            yield f"  • Median: {stats['median']}\n"
            yield f"  • Range: {stats['min']} - {stats['max']}\n"
            yield f"  • Std Dev: {stats['std_dev']}\n\n"
    
    elif analysis_type == "trends":
        yield "📈 Analyzing trends...\n"
        result = analyzer.analyze_data(data, analysis_type)
        
        for field, trend in result['trends'].items():
            yield f"**{field}**: {trend}\n"
    
    elif analysis_type == "outliers":
        yield "🎯 Detecting outliers...\n"
        result = analyzer.analyze_data(data, analysis_type)
        
        for field, outliers in result['outliers'].items():
            if outliers:
                yield f"**{field}**: Found {len(outliers)} outliers: {outliers}\n"
            else:
                yield f"**{field}**: No outliers detected\n"
    
    elif analysis_type == "correlations":
        yield "🔗 Finding correlations...\n"
        result = analyzer.analyze_data(data, analysis_type)
        
        for pair, corr in result['correlations'].items():
            yield f"**{pair}**: {corr}\n"
    
    yield f"\n✅ Analysis complete! Processed {len(data)} records.\n"

def get_analysis_capabilities() -> Dict[str, Any]:
    """Get information about analysis capabilities"""
    analyzer = DataAnalyzer()
    return {
        "supported_formats": analyzer.supported_formats,
        "analysis_types": analyzer.analysis_types,
        "description": "Advanced data analysis agent with statistical insights",
        "version": "1.0.0"
    }

Step 3: Configure Your Agent

Update runagent.config.json:
{
  "agent_name": "data-analyzer",
  "description": "Multi-language data analysis agent",
  "framework": "custom",
  "agent_architecture": {
    "entrypoints": [
      {
        "file": "main.py",
        "module": "analyze_data_sync",
        "tag": "analyze"
      },
      {
        "file": "main.py",
        "module": "analyze_data_stream", 
        "tag": "analyze_stream"
      },
      {
        "file": "main.py",
        "module": "get_analysis_capabilities",
        "tag": "capabilities"
      }
    ]
  }
}

Step 4: Deploy Your Agent

Start your data analysis agent:
runagent serve .
Note the agent ID for the next steps.

Step 5: Call from Python (Native)

Create a Python client script:
python_client.py
from runagent import RunAgentClient
import json

# Sample data for analysis
sample_data = [
    {"sales": 1000, "profit": 200, "region": "North"},
    {"sales": 1500, "profit": 300, "region": "South"},
    {"sales": 1200, "profit": 250, "region": "East"},
    {"sales": 1800, "profit": 400, "region": "West"},
    {"sales": 900, "profit": 150, "region": "North"}
]

# Connect to your agent
client = RunAgentClient(
    agent_id="your_agent_id_here",  # Replace with actual ID
    entrypoint_tag="analyze",
    local=True
)

# Test different analysis types
analysis_types = ["summary", "trends", "outliers", "correlations"]

for analysis_type in analysis_types:
    print(f"\n=== {analysis_type.upper()} ANALYSIS ===")
    result = client.run(data=sample_data, analysis_type=analysis_type)
    print(json.dumps(result, indent=2))

# Test streaming
print("\n=== STREAMING ANALYSIS ===")
stream_client = RunAgentClient(
    agent_id="your_agent_id_here",
    entrypoint_tag="analyze_stream", 
    local=True
)

for chunk in stream_client.run(data=sample_data, analysis_type="summary"):
    print(chunk, end="", flush=True)

Step 6: Call from JavaScript (Node.js)

Create a JavaScript client:
javascript_client.js
// First install: npm install runagent
const { RunAgentClient } = require('runagent');

const sampleData = [
    { sales: 1000, profit: 200, region: "North" },
    { sales: 1500, profit: 300, region: "South" },
    { sales: 1200, profit: 250, region: "East" },
    { sales: 1800, profit: 400, region: "West" },
    { sales: 900, profit: 150, region: "North" }
];

async function testAnalysis() {
    // Connect to your agent
    const client = new RunAgentClient({
        agentId: "your_agent_id_here", // Replace with actual ID
        entrypointTag: "analyze",
        local: true
    });

    await client.initialize();

    // Test different analysis types
    const analysisTypes = ["summary", "trends", "outliers", "correlations"];

    for (const analysisType of analysisTypes) {
        console.log(`\n=== ${analysisType.toUpperCase()} ANALYSIS ===`);
        const result = await client.run({
            data: sampleData,
            analysis_type: analysisType
        });
        console.log(JSON.stringify(result, null, 2));
    }

    // Test streaming
    console.log("\n=== STREAMING ANALYSIS ===");
    const streamClient = new RunAgentClient({
        agentId: "your_agent_id_here",
        entrypointTag: "analyze_stream",
        local: true
    });

    await streamClient.initialize();
    
    const stream = await streamClient.run({
        data: sampleData,
        analysis_type: "summary"
    });

    for await (const chunk of stream) {
        process.stdout.write(chunk);
    }
}

testAnalysis().catch(console.error);

Step 7: Call from Rust

Create a Rust client:
rust_client.rs
use runagent::client::RunAgentClient;
use serde_json::json;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let sample_data = json!([
        {"sales": 1000, "profit": 200, "region": "North"},
        {"sales": 1500, "profit": 300, "region": "South"},
        {"sales": 1200, "profit": 250, "region": "East"},
        {"sales": 1800, "profit": 400, "region": "West"},
        {"sales": 900, "profit": 150, "region": "North"}
    ]);

    // Connect to your agent
    let client = RunAgentClient::new("your_agent_id_here", "analyze", true).await?;

    // Test different analysis types
    let analysis_types = vec!["summary", "trends", "outliers", "correlations"];

    for analysis_type in analysis_types {
        println!("\n=== {} ANALYSIS ===", analysis_type.to_uppercase());
        
        let result = client.run(&[
            ("data", sample_data.clone()),
            ("analysis_type", json!(analysis_type))
        ]).await?;
        
        println!("{}", serde_json::to_string_pretty(&result)?);
    }

    // Test streaming
    println!("\n=== STREAMING ANALYSIS ===");
    let stream_client = RunAgentClient::new("your_agent_id_here", "analyze_stream", true).await?;
    
    let mut stream = stream_client.run_stream(&[
        ("data", sample_data),
        ("analysis_type", json!("summary"))
    ]).await?;

    while let Some(chunk) = stream.next().await {
        print!("{}", chunk?);
    }

    Ok(())
}

Step 8: Call from Go

Create a Go client:
go_client.go
package main

import (
    "context"
    "fmt"
    "log"
    "time"
    "github.com/runagent-dev/runagent-go/pkg/client"
)

func main() {
    sampleData := []map[string]interface{}{
        {"sales": 1000, "profit": 200, "region": "North"},
        {"sales": 1500, "profit": 300, "region": "South"},
        {"sales": 1200, "profit": 250, "region": "East"},
        {"sales": 1800, "profit": 400, "region": "West"},
        {"sales": 900, "profit": 150, "region": "North"},
    }

    // Connect to your agent
    c, err := client.NewWithAddress(
        "your_agent_id_here", // Replace with actual ID
        "analyze",
        true,
        "localhost",
        8451,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
    defer cancel()

    // Test different analysis types
    analysisTypes := []string{"summary", "trends", "outliers", "correlations"}

    for _, analysisType := range analysisTypes {
        fmt.Printf("\n=== %s ANALYSIS ===\n", analysisType)
        
        result, err := c.Run(ctx, map[string]interface{}{
            "data":          sampleData,
            "analysis_type": analysisType,
        })
        if err != nil {
            log.Printf("Error: %v", err)
            continue
        }
        
        fmt.Printf("%+v\n", result)
    }

    // Test streaming
    fmt.Println("\n=== STREAMING ANALYSIS ===")
    streamClient, err := client.NewWithAddress(
        "your_agent_id_here",
        "analyze_stream",
        true,
        "localhost",
        8451,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer streamClient.Close()

    s, err := streamClient.RunStream(ctx, map[string]interface{}{
        "data":          sampleData,
        "analysis_type": "summary",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer s.Close()

    for {
        data, hasMore, err := s.Next(ctx)
        if err != nil {
            log.Fatal(err)
        }
        if !hasMore {
            break
        }
        fmt.Print(data)
    }
}

Step 9: Test All Languages

Run each client to see the same Python agent working across all languages:
# Python
python python_client.py

# JavaScript (after npm install runagent)
node javascript_client.js

# Rust (after cargo add runagent tokio)
cargo run

# Go (after go mod init && go get github.com/runagent-dev/runagent-go)
go run go_client.go

What You’ve Accomplished

You’ve demonstrated RunAgent’s core value proposition:

🐍 Write Once in Python

Built a sophisticated data analysis agent in Python

🌐 Use Everywhere

Called the same agent from 4 different programming languages

⚡ Native Performance

Each language gets native-feeling APIs and performance characteristics

🔄 Real-Time Streaming

Streaming works seamlessly across all language boundaries

Key Insights

1. Consistent API Design

All languages receive the same function signature:
  • data: List of dictionaries for analysis
  • analysis_type: String specifying the type of analysis

2. Language-Specific Idioms

Each SDK adapts to language conventions:
  • Python: Dictionary parameters, native iteration
  • JavaScript: Object parameters, async/await
  • Rust: Array parameters, futures streams
  • Go: Map parameters, context-aware operations

3. Automatic Serialization

RunAgent handles all the complex serialization/deserialization automatically.

4. Streaming Everywhere

The same streaming function works identically across all languages.

Production Considerations

Error Handling

Each SDK provides language-appropriate error handling:
# Python
try:
    result = client.run(data=data, analysis_type="summary")
except RunAgentError as e:
    print(f"Analysis failed: {e}")
// JavaScript
try {
    const result = await client.run({data, analysis_type: "summary"});
} catch (error) {
    console.error("Analysis failed:", error.message);
}

Performance Optimization

  • Python: Use async client for concurrent requests
  • JavaScript: Use connection pooling for multiple requests
  • Rust: Leverage zero-copy deserialization
  • Go: Use goroutines for concurrent processing

Monitoring and Logging

Add logging to track usage across languages:
def analyze_data_sync(data: List[Dict], analysis_type: str = "summary") -> Dict[str, Any]:
    # Add logging
    print(f"Analysis request: {analysis_type}, {len(data)} records")
    
    analyzer = DataAnalyzer()
    result = analyzer.analyze_data(data, analysis_type)
    
    print(f"Analysis complete: {result['total_records']} records processed")
    return result

Next Steps

🎉 Congratulations! You’ve successfully demonstrated RunAgent’s core value: write sophisticated AI agents in Python and use them from any programming language with native-feeling APIs. This is the future of AI development!