Resources
Examples
Example agents and code snippets
Quick Examples
Ready-to-use examples for common agent patterns and use cases.
Basic Examples
Simple Q&A Agent
Copy
# agent.py
import os
from openai import OpenAI
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
def invoke(input_data: dict) -> dict:
query = input_data.get("query", "")
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": query}
]
)
return {"response": response.choices[0].message.content}
Streaming Chatbot
Copy
# agent.py
def stream(input_data: dict):
query = input_data.get("query", "")
stream = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": query}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
Framework Examples
LangGraph Agent
Copy
# langgraph_agent.py
from langgraph.graph import StateGraph, END
from typing import TypedDict, List
class ConversationState(TypedDict):
messages: List[str]
context: dict
response: str
def understand_intent(state):
# Analyze user intent
return {"context": {"intent": "question"}}
def generate_response(state):
# Generate appropriate response
return {"response": f"You asked: {state['messages'][-1]}"}
# Build graph
workflow = StateGraph(ConversationState)
workflow.add_node("understand", understand_intent)
workflow.add_node("respond", generate_response)
workflow.add_edge("understand", "respond")
workflow.add_edge("respond", END)
workflow.set_entry_point("understand")
app = workflow.compile()
def invoke(input_data):
result = app.invoke({"messages": [input_data["query"]]})
return {"response": result["response"]}
CrewAI Research Team
Copy
# research_crew.py
from crewai import Crew, Agent, Task
researcher = Agent(
role='Research Specialist',
goal='Find accurate information',
backstory='Expert researcher with access to vast knowledge'
)
analyst = Agent(
role='Data Analyst',
goal='Analyze and synthesize findings',
backstory='Skilled at finding patterns and insights'
)
research_task = Task(
description='Research {topic} thoroughly',
agent=researcher
)
analysis_task = Task(
description='Analyze research findings and provide insights',
agent=analyst
)
crew = Crew(
agents=[researcher, analyst],
tasks=[research_task, analysis_task]
)
def invoke(input_data):
topic = input_data.get("topic", "AI trends")
result = crew.kickoff({"topic": topic})
return {"research_results": result}
Use Case Examples
Customer Support Agent
Copy
# support_agent.py
import json
from datetime import datetime
# Mock knowledge base
KNOWLEDGE_BASE = {
"shipping": "Standard shipping takes 5-7 business days...",
"returns": "Items can be returned within 30 days...",
"warranty": "All products come with a 1-year warranty..."
}
def find_relevant_info(query: str) -> str:
query_lower = query.lower()
for topic, info in KNOWLEDGE_BASE.items():
if topic in query_lower:
return info
return "I'll connect you with a human agent for this query."
def invoke(input_data: dict) -> dict:
query = input_data.get("query", "")
user_id = input_data.get("user_id", "anonymous")
# Log interaction
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"query": query
}
# Find relevant information
response = find_relevant_info(query)
# Check if escalation needed
needs_human = "human agent" in response
return {
"response": response,
"needs_escalation": needs_human,
"log": log_entry
}
Data Analysis Agent
Copy
# data_analyst.py
import pandas as pd
import json
from io import StringIO
def analyze_csv_data(csv_content: str) -> dict:
df = pd.read_csv(StringIO(csv_content))
analysis = {
"rows": len(df),
"columns": list(df.columns),
"summary": df.describe().to_dict(),
"missing_values": df.isnull().sum().to_dict()
}
return analysis
def invoke(input_data: dict) -> dict:
action = input_data.get("action", "analyze")
if action == "analyze":
csv_data = input_data.get("csv_data", "")
if not csv_data:
return {"error": "No CSV data provided"}
try:
analysis = analyze_csv_data(csv_data)
return {"analysis": analysis}
except Exception as e:
return {"error": f"Analysis failed: {str(e)}"}
return {"error": "Unknown action"}
Code Assistant
Copy
# code_assistant.py
import ast
import black
def analyze_python_code(code: str) -> dict:
try:
# Parse code
tree = ast.parse(code)
# Extract information
functions = [node.name for node in ast.walk(tree)
if isinstance(node, ast.FunctionDef)]
classes = [node.name for node in ast.walk(tree)
if isinstance(node, ast.ClassDef)]
# Format code
try:
formatted = black.format_str(code, mode=black.Mode())
except:
formatted = code
return {
"valid": True,
"functions": functions,
"classes": classes,
"formatted_code": formatted
}
except SyntaxError as e:
return {
"valid": False,
"error": str(e),
"line": e.lineno
}
def invoke(input_data: dict) -> dict:
code = input_data.get("code", "")
task = input_data.get("task", "analyze")
if task == "analyze":
return analyze_python_code(code)
elif task == "improve":
# Add suggestions for improvement
analysis = analyze_python_code(code)
if analysis["valid"]:
analysis["suggestions"] = [
"Add type hints",
"Include docstrings",
"Consider error handling"
]
return analysis
return {"error": "Unknown task"}
Integration Examples
Slack Bot Integration
Copy
# slack_bot_agent.py
import os
import requests
SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK_URL")
def send_to_slack(message: str, channel: str = "#general"):
payload = {
"channel": channel,
"text": message,
"username": "RunAgent Bot"
}
requests.post(SLACK_WEBHOOK, json=payload)
def invoke(input_data: dict) -> dict:
command = input_data.get("command", "")
user = input_data.get("user", "")
if command.startswith("/ask"):
question = command.replace("/ask", "").strip()
# Process question
answer = f"Answer to '{question}' coming soon!"
send_to_slack(f"@{user} asked: {question}\nAnswer: {answer}")
return {"response": answer}
return {"response": "Unknown command"}
Database Query Agent
Copy
# db_agent.py
import os
import psycopg2
from psycopg2.extras import RealDictCursor
def get_db_connection():
return psycopg2.connect(
os.getenv("DATABASE_URL"),
cursor_factory=RealDictCursor
)
def safe_query(query: str) -> list:
# Basic SQL injection prevention
forbidden = ["DROP", "DELETE", "UPDATE", "INSERT"]
if any(word in query.upper() for word in forbidden):
raise ValueError("Unsafe query detected")
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(query)
return cursor.fetchall()
def invoke(input_data: dict) -> dict:
query_type = input_data.get("type", "select")
if query_type == "stats":
# Get database statistics
stats = {
"users": safe_query("SELECT COUNT(*) FROM users")[0]["count"],
"orders": safe_query("SELECT COUNT(*) FROM orders")[0]["count"],
"revenue": safe_query("SELECT SUM(amount) FROM orders")[0]["sum"]
}
return {"stats": stats}
elif query_type == "custom":
query = input_data.get("query", "")
try:
results = safe_query(query)
return {"results": results}
except Exception as e:
return {"error": str(e)}
return {"error": "Unknown query type"}
Advanced Patterns
Multi-Step Processing Pipeline
Copy
# pipeline_agent.py
from typing import Dict, Any
import asyncio
class ProcessingPipeline:
def __init__(self):
self.steps = []
def add_step(self, func, name):
self.steps.append((func, name))
async def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
result = {"input": data, "steps": []}
for func, name in self.steps:
try:
if asyncio.iscoroutinefunction(func):
step_result = await func(result)
else:
step_result = func(result)
result["steps"].append({
"name": name,
"status": "success",
"output": step_result
})
result.update(step_result)
except Exception as e:
result["steps"].append({
"name": name,
"status": "error",
"error": str(e)
})
break
return result
# Define pipeline steps
def validate_input(data):
if "text" not in data["input"]:
raise ValueError("Missing 'text' field")
return {"validated": True}
async def analyze_sentiment(data):
# Simulate async API call
await asyncio.sleep(0.1)
return {"sentiment": "positive", "score": 0.8}
def generate_response(data):
sentiment = data.get("sentiment", "neutral")
responses = {
"positive": "That's wonderful to hear!",
"negative": "I understand your concerns.",
"neutral": "Thank you for sharing."
}
return {"response": responses.get(sentiment, "Thanks!")}
# Create pipeline
pipeline = ProcessingPipeline()
pipeline.add_step(validate_input, "validation")
pipeline.add_step(analyze_sentiment, "sentiment_analysis")
pipeline.add_step(generate_response, "response_generation")
def invoke(input_data: dict) -> dict:
# Run async pipeline in sync context
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(pipeline.process(input_data))
return result
Agent with Memory
Copy
# memory_agent.py
import json
import os
from datetime import datetime
from typing import Dict, List
class MemoryStore:
def __init__(self, filepath="memory.json"):
self.filepath = filepath
self.memory = self._load_memory()
def _load_memory(self) -> Dict:
if os.path.exists(self.filepath):
with open(self.filepath, 'r') as f:
return json.load(f)
return {"conversations": {}, "facts": {}}
def save_memory(self):
with open(self.filepath, 'w') as f:
json.dump(self.memory, f)
def remember_conversation(self, user_id: str, exchange: Dict):
if user_id not in self.memory["conversations"]:
self.memory["conversations"][user_id] = []
self.memory["conversations"][user_id].append({
"timestamp": datetime.utcnow().isoformat(),
"exchange": exchange
})
# Keep only last 10 exchanges
self.memory["conversations"][user_id] = \
self.memory["conversations"][user_id][-10:]
def get_conversation_history(self, user_id: str) -> List[Dict]:
return self.memory["conversations"].get(user_id, [])
def remember_fact(self, key: str, value: Any):
self.memory["facts"][key] = {
"value": value,
"timestamp": datetime.utcnow().isoformat()
}
def recall_fact(self, key: str) -> Any:
fact = self.memory["facts"].get(key, {})
return fact.get("value")
# Initialize memory
memory = MemoryStore()
def invoke(input_data: dict) -> dict:
user_id = input_data.get("user_id", "anonymous")
query = input_data.get("query", "")
# Get conversation history
history = memory.get_conversation_history(user_id)
# Check for memory-related queries
if query.lower().startswith("remember that"):
fact = query.replace("remember that", "").strip()
memory.remember_fact(f"user_{user_id}_fact", fact)
response = f"I'll remember that: {fact}"
elif query.lower() == "what do you remember?":
fact = memory.recall_fact(f"user_{user_id}_fact")
if fact:
response = f"I remember you told me: {fact}"
else:
response = "I don't have any specific memories yet."
else:
# Normal processing
response = f"You said: {query}"
if history:
response += f"\n(We've had {len(history)} previous exchanges)"
# Save exchange
memory.remember_conversation(user_id, {
"query": query,
"response": response
})
memory.save_memory()
return {
"response": response,
"conversation_length": len(history) + 1
}
Testing Your Examples
Unit Test Template
Copy
# test_agent.py
import pytest
from agent import invoke, stream
class TestAgent:
def test_basic_invocation(self):
result = invoke({"query": "Hello"})
assert "response" in result
assert isinstance(result["response"], str)
def test_streaming(self):
chunks = list(stream({"query": "Tell a story"}))
assert len(chunks) > 0
assert all(isinstance(chunk, str) for chunk in chunks)
def test_error_handling(self):
result = invoke({}) # Missing query
assert "error" in result or "response" in result
@pytest.mark.parametrize("input,expected_key", [
({"query": "test"}, "response"),
({"action": "analyze"}, "analysis"),
({"command": "help"}, "help_text")
])
def test_various_inputs(self, input, expected_key):
result = invoke(input)
assert expected_key in result
Next Steps
- Browse our GitHub repository for more examples
- Join our Discord community to share your agents
- Check out templates for ready-to-use starting points
Contributing Examples
Have a great example? Share it with the community:
- Fork the examples repository
- Add your example with documentation
- Include tests and requirements
- Submit a pull request
We feature the best community examples in our documentation!
On this page
- Quick Examples
- Basic Examples
- Simple Q&A Agent
- Streaming Chatbot
- Framework Examples
- LangGraph Agent
- CrewAI Research Team
- Use Case Examples
- Customer Support Agent
- Data Analysis Agent
- Code Assistant
- Integration Examples
- Slack Bot Integration
- Database Query Agent
- Advanced Patterns
- Multi-Step Processing Pipeline
- Agent with Memory
- Testing Your Examples
- Unit Test Template
- Next Steps
- Contributing Examples
Assistant
Responses are generated using AI and may contain mistakes.