Quick Examples

Ready-to-use examples for common agent patterns and use cases.

Basic Examples

Simple Q&A Agent

# agent.py
import os
from openai import OpenAI

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

def invoke(input_data: dict) -> dict:
    query = input_data.get("query", "")
    
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": query}
        ]
    )
    
    return {"response": response.choices[0].message.content}

Streaming Chatbot

# agent.py
def stream(input_data: dict):
    query = input_data.get("query", "")
    
    stream = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": query}],
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

Framework Examples

LangGraph Agent

# langgraph_agent.py
from langgraph.graph import StateGraph, END
from typing import TypedDict, List

class ConversationState(TypedDict):
    messages: List[str]
    context: dict
    response: str

def understand_intent(state):
    # Analyze user intent
    return {"context": {"intent": "question"}}

def generate_response(state):
    # Generate appropriate response
    return {"response": f"You asked: {state['messages'][-1]}"}

# Build graph
workflow = StateGraph(ConversationState)
workflow.add_node("understand", understand_intent)
workflow.add_node("respond", generate_response)
workflow.add_edge("understand", "respond")
workflow.add_edge("respond", END)
workflow.set_entry_point("understand")

app = workflow.compile()

def invoke(input_data):
    result = app.invoke({"messages": [input_data["query"]]})
    return {"response": result["response"]}

CrewAI Research Team

# research_crew.py
from crewai import Crew, Agent, Task

researcher = Agent(
    role='Research Specialist',
    goal='Find accurate information',
    backstory='Expert researcher with access to vast knowledge'
)

analyst = Agent(
    role='Data Analyst',
    goal='Analyze and synthesize findings',
    backstory='Skilled at finding patterns and insights'
)

research_task = Task(
    description='Research {topic} thoroughly',
    agent=researcher
)

analysis_task = Task(
    description='Analyze research findings and provide insights',
    agent=analyst
)

crew = Crew(
    agents=[researcher, analyst],
    tasks=[research_task, analysis_task]
)

def invoke(input_data):
    topic = input_data.get("topic", "AI trends")
    result = crew.kickoff({"topic": topic})
    return {"research_results": result}

Use Case Examples

Customer Support Agent

# support_agent.py
import json
from datetime import datetime

# Mock knowledge base
KNOWLEDGE_BASE = {
    "shipping": "Standard shipping takes 5-7 business days...",
    "returns": "Items can be returned within 30 days...",
    "warranty": "All products come with a 1-year warranty..."
}

def find_relevant_info(query: str) -> str:
    query_lower = query.lower()
    for topic, info in KNOWLEDGE_BASE.items():
        if topic in query_lower:
            return info
    return "I'll connect you with a human agent for this query."

def invoke(input_data: dict) -> dict:
    query = input_data.get("query", "")
    user_id = input_data.get("user_id", "anonymous")
    
    # Log interaction
    log_entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "user_id": user_id,
        "query": query
    }
    
    # Find relevant information
    response = find_relevant_info(query)
    
    # Check if escalation needed
    needs_human = "human agent" in response
    
    return {
        "response": response,
        "needs_escalation": needs_human,
        "log": log_entry
    }

Data Analysis Agent

# data_analyst.py
import pandas as pd
import json
from io import StringIO

def analyze_csv_data(csv_content: str) -> dict:
    df = pd.read_csv(StringIO(csv_content))
    
    analysis = {
        "rows": len(df),
        "columns": list(df.columns),
        "summary": df.describe().to_dict(),
        "missing_values": df.isnull().sum().to_dict()
    }
    
    return analysis

def invoke(input_data: dict) -> dict:
    action = input_data.get("action", "analyze")
    
    if action == "analyze":
        csv_data = input_data.get("csv_data", "")
        if not csv_data:
            return {"error": "No CSV data provided"}
        
        try:
            analysis = analyze_csv_data(csv_data)
            return {"analysis": analysis}
        except Exception as e:
            return {"error": f"Analysis failed: {str(e)}"}
    
    return {"error": "Unknown action"}

Code Assistant

# code_assistant.py
import ast
import black

def analyze_python_code(code: str) -> dict:
    try:
        # Parse code
        tree = ast.parse(code)
        
        # Extract information
        functions = [node.name for node in ast.walk(tree) 
                    if isinstance(node, ast.FunctionDef)]
        classes = [node.name for node in ast.walk(tree) 
                  if isinstance(node, ast.ClassDef)]
        
        # Format code
        try:
            formatted = black.format_str(code, mode=black.Mode())
        except:
            formatted = code
        
        return {
            "valid": True,
            "functions": functions,
            "classes": classes,
            "formatted_code": formatted
        }
    except SyntaxError as e:
        return {
            "valid": False,
            "error": str(e),
            "line": e.lineno
        }

def invoke(input_data: dict) -> dict:
    code = input_data.get("code", "")
    task = input_data.get("task", "analyze")
    
    if task == "analyze":
        return analyze_python_code(code)
    elif task == "improve":
        # Add suggestions for improvement
        analysis = analyze_python_code(code)
        if analysis["valid"]:
            analysis["suggestions"] = [
                "Add type hints",
                "Include docstrings",
                "Consider error handling"
            ]
        return analysis
    
    return {"error": "Unknown task"}

Integration Examples

Slack Bot Integration

# slack_bot_agent.py
import os
import requests

SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK_URL")

def send_to_slack(message: str, channel: str = "#general"):
    payload = {
        "channel": channel,
        "text": message,
        "username": "RunAgent Bot"
    }
    
    requests.post(SLACK_WEBHOOK, json=payload)

def invoke(input_data: dict) -> dict:
    command = input_data.get("command", "")
    user = input_data.get("user", "")
    
    if command.startswith("/ask"):
        question = command.replace("/ask", "").strip()
        # Process question
        answer = f"Answer to '{question}' coming soon!"
        
        send_to_slack(f"@{user} asked: {question}\nAnswer: {answer}")
        
        return {"response": answer}
    
    return {"response": "Unknown command"}

Database Query Agent

# db_agent.py
import os
import psycopg2
from psycopg2.extras import RealDictCursor

def get_db_connection():
    return psycopg2.connect(
        os.getenv("DATABASE_URL"),
        cursor_factory=RealDictCursor
    )

def safe_query(query: str) -> list:
    # Basic SQL injection prevention
    forbidden = ["DROP", "DELETE", "UPDATE", "INSERT"]
    if any(word in query.upper() for word in forbidden):
        raise ValueError("Unsafe query detected")
    
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(query)
            return cursor.fetchall()

def invoke(input_data: dict) -> dict:
    query_type = input_data.get("type", "select")
    
    if query_type == "stats":
        # Get database statistics
        stats = {
            "users": safe_query("SELECT COUNT(*) FROM users")[0]["count"],
            "orders": safe_query("SELECT COUNT(*) FROM orders")[0]["count"],
            "revenue": safe_query("SELECT SUM(amount) FROM orders")[0]["sum"]
        }
        return {"stats": stats}
    
    elif query_type == "custom":
        query = input_data.get("query", "")
        try:
            results = safe_query(query)
            return {"results": results}
        except Exception as e:
            return {"error": str(e)}
    
    return {"error": "Unknown query type"}

Advanced Patterns

Multi-Step Processing Pipeline

# pipeline_agent.py
from typing import Dict, Any
import asyncio

class ProcessingPipeline:
    def __init__(self):
        self.steps = []
    
    def add_step(self, func, name):
        self.steps.append((func, name))
    
    async def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
        result = {"input": data, "steps": []}
        
        for func, name in self.steps:
            try:
                if asyncio.iscoroutinefunction(func):
                    step_result = await func(result)
                else:
                    step_result = func(result)
                
                result["steps"].append({
                    "name": name,
                    "status": "success",
                    "output": step_result
                })
                result.update(step_result)
                
            except Exception as e:
                result["steps"].append({
                    "name": name,
                    "status": "error",
                    "error": str(e)
                })
                break
        
        return result

# Define pipeline steps
def validate_input(data):
    if "text" not in data["input"]:
        raise ValueError("Missing 'text' field")
    return {"validated": True}

async def analyze_sentiment(data):
    # Simulate async API call
    await asyncio.sleep(0.1)
    return {"sentiment": "positive", "score": 0.8}

def generate_response(data):
    sentiment = data.get("sentiment", "neutral")
    responses = {
        "positive": "That's wonderful to hear!",
        "negative": "I understand your concerns.",
        "neutral": "Thank you for sharing."
    }
    return {"response": responses.get(sentiment, "Thanks!")}

# Create pipeline
pipeline = ProcessingPipeline()
pipeline.add_step(validate_input, "validation")
pipeline.add_step(analyze_sentiment, "sentiment_analysis")
pipeline.add_step(generate_response, "response_generation")

def invoke(input_data: dict) -> dict:
    # Run async pipeline in sync context
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    result = loop.run_until_complete(pipeline.process(input_data))
    return result

Agent with Memory

# memory_agent.py
import json
import os
from datetime import datetime
from typing import Dict, List

class MemoryStore:
    def __init__(self, filepath="memory.json"):
        self.filepath = filepath
        self.memory = self._load_memory()
    
    def _load_memory(self) -> Dict:
        if os.path.exists(self.filepath):
            with open(self.filepath, 'r') as f:
                return json.load(f)
        return {"conversations": {}, "facts": {}}
    
    def save_memory(self):
        with open(self.filepath, 'w') as f:
            json.dump(self.memory, f)
    
    def remember_conversation(self, user_id: str, exchange: Dict):
        if user_id not in self.memory["conversations"]:
            self.memory["conversations"][user_id] = []
        
        self.memory["conversations"][user_id].append({
            "timestamp": datetime.utcnow().isoformat(),
            "exchange": exchange
        })
        
        # Keep only last 10 exchanges
        self.memory["conversations"][user_id] = \
            self.memory["conversations"][user_id][-10:]
    
    def get_conversation_history(self, user_id: str) -> List[Dict]:
        return self.memory["conversations"].get(user_id, [])
    
    def remember_fact(self, key: str, value: Any):
        self.memory["facts"][key] = {
            "value": value,
            "timestamp": datetime.utcnow().isoformat()
        }
    
    def recall_fact(self, key: str) -> Any:
        fact = self.memory["facts"].get(key, {})
        return fact.get("value")

# Initialize memory
memory = MemoryStore()

def invoke(input_data: dict) -> dict:
    user_id = input_data.get("user_id", "anonymous")
    query = input_data.get("query", "")
    
    # Get conversation history
    history = memory.get_conversation_history(user_id)
    
    # Check for memory-related queries
    if query.lower().startswith("remember that"):
        fact = query.replace("remember that", "").strip()
        memory.remember_fact(f"user_{user_id}_fact", fact)
        response = f"I'll remember that: {fact}"
    
    elif query.lower() == "what do you remember?":
        fact = memory.recall_fact(f"user_{user_id}_fact")
        if fact:
            response = f"I remember you told me: {fact}"
        else:
            response = "I don't have any specific memories yet."
    
    else:
        # Normal processing
        response = f"You said: {query}"
        if history:
            response += f"\n(We've had {len(history)} previous exchanges)"
    
    # Save exchange
    memory.remember_conversation(user_id, {
        "query": query,
        "response": response
    })
    memory.save_memory()
    
    return {
        "response": response,
        "conversation_length": len(history) + 1
    }

Testing Your Examples

Unit Test Template

# test_agent.py
import pytest
from agent import invoke, stream

class TestAgent:
    def test_basic_invocation(self):
        result = invoke({"query": "Hello"})
        assert "response" in result
        assert isinstance(result["response"], str)
    
    def test_streaming(self):
        chunks = list(stream({"query": "Tell a story"}))
        assert len(chunks) > 0
        assert all(isinstance(chunk, str) for chunk in chunks)
    
    def test_error_handling(self):
        result = invoke({})  # Missing query
        assert "error" in result or "response" in result
    
    @pytest.mark.parametrize("input,expected_key", [
        ({"query": "test"}, "response"),
        ({"action": "analyze"}, "analysis"),
        ({"command": "help"}, "help_text")
    ])
    def test_various_inputs(self, input, expected_key):
        result = invoke(input)
        assert expected_key in result

Next Steps

Contributing Examples

Have a great example? Share it with the community:

  1. Fork the examples repository
  2. Add your example with documentation
  3. Include tests and requirements
  4. Submit a pull request

We feature the best community examples in our documentation!