Replace the contents of main.py with a customer support agent:
main.py
Copy
from typing import Iterator, Dict, Anyimport jsonclass CustomerSupportAgent: def __init__(self): # Knowledge base - in production, this would be a database self.knowledge_base = { "pricing": { "basic": "$9/month - Up to 1000 requests", "pro": "$29/month - Up to 10,000 requests", "enterprise": "Custom pricing - Unlimited requests" }, "features": [ "Multi-language SDKs (Python, JavaScript, Rust, Go)", "Real-time streaming responses", "Auto-scaling infrastructure", "Framework integrations (LangGraph, CrewAI, etc.)" ], "troubleshooting": { "connection": "Check your API key and agent ID", "timeout": "Increase timeout settings or check network", "streaming": "Ensure your entrypoint tag ends with '_stream'" } } def categorize_query(self, message: str) -> str: """Categorize the customer query""" message_lower = message.lower() if any(word in message_lower for word in ["price", "cost", "pricing", "plan"]): return "pricing" elif any(word in message_lower for word in ["feature", "capability", "what can"]): return "features" elif any(word in message_lower for word in ["help", "problem", "error", "issue", "troubleshoot"]): return "troubleshooting" else: return "general"def customer_support_sync(message: str, user_id: str = "anonymous") -> Dict[str, Any]: """Synchronous customer support response""" agent = CustomerSupportAgent() category = agent.categorize_query(message) response = { "message": "", "category": category, "user_id": user_id, "timestamp": "2024-01-01T00:00:00Z" # In production, use actual timestamp } if category == "pricing": response["message"] = f"""Here are our current pricing plans:**Basic Plan**: {agent.knowledge_base['pricing']['basic']}- Perfect for small projects and testing**Pro Plan**: {agent.knowledge_base['pricing']['pro']}- Great for growing applications**Enterprise Plan**: {agent.knowledge_base['pricing']['enterprise']}- Custom solutions for large organizationsWould you like to know more about any specific plan?""" elif category == "features": features_list = "\n".join([f"• {feature}" for feature in agent.knowledge_base['features']]) response["message"] = f"""RunAgent offers these key features:{features_list}Is there a specific feature you'd like to learn more about?""" elif category == "troubleshooting": response["message"] = """I'd be happy to help troubleshoot! Here are some common solutions:• **Connection issues**: Check your API key and agent ID• **Timeout errors**: Increase timeout settings or check your network• **Streaming problems**: Ensure your entrypoint tag ends with '_stream'Can you describe the specific issue you're experiencing?""" else: response["message"] = """Hello! I'm here to help with any questions about RunAgent. I can help you with:• Pricing and plans• Features and capabilities • Technical troubleshooting• Getting started guidesWhat would you like to know?""" return responsedef customer_support_stream(message: str, user_id: str = "anonymous") -> Iterator[str]: """Streaming customer support response""" agent = CustomerSupportAgent() category = agent.categorize_query(message) # Stream the response with typing effect if category == "pricing": yield "Let me check our current pricing for you...\n\n" yield "**Basic Plan**: $9/month - Up to 1000 requests\n" yield "Perfect for small projects and testing\n\n" yield "**Pro Plan**: $29/month - Up to 10,000 requests\n" yield "Great for growing applications\n\n" yield "**Enterprise Plan**: Custom pricing - Unlimited requests\n" yield "Custom solutions for large organizations\n\n" yield "Would you like to know more about any specific plan?" elif category == "features": yield "Here are RunAgent's key features:\n\n" for feature in agent.knowledge_base['features']: yield f"• {feature}\n" yield "\nIs there a specific feature you'd like to learn more about?" elif category == "troubleshooting": yield "I'd be happy to help troubleshoot!\n\n" yield "Here are some common solutions:\n\n" yield "• **Connection issues**: Check your API key and agent ID\n" yield "• **Timeout errors**: Increase timeout settings or check your network\n" yield "• **Streaming problems**: Ensure your entrypoint tag ends with '_stream'\n\n" yield "Can you describe the specific issue you're experiencing?" else: yield "Hello! I'm here to help with any questions about RunAgent.\n\n" yield "I can help you with:\n" yield "• Pricing and plans\n" yield "• Features and capabilities\n" yield "• Technical troubleshooting\n" yield "• Getting started guides\n\n" yield "What would you like to know?"
🤖 Agent Details:- Agent ID: support_agent_123- Host: 127.0.0.1- Port: 8451- Framework: custom- Status: ready🌐 Server running at: http://127.0.0.1:8451📖 API Documentation: http://127.0.0.1:8451/docs📍 Available endpoints:- POST /api/v1/agents/.../execute/support - Run your agent- POST /api/v1/agents/.../execute/support_stream - Stream your agent
from runagent import RunAgentClient# Connect to your support agentsupport = RunAgentClient( agent_id="support_agent_123", entrypoint_tag="support", local=True)# Test different types of queriesqueries = [ "What are your pricing plans?", "What features does RunAgent offer?", "I'm having connection issues", "Hello, I need help"]for query in queries: print(f"\nQuery: {query}") response = support.run(message=query, user_id="test_user") print(f"Response: {response['message']}") print(f"Category: {response['category']}")
# Test streaming for better user experiencesupport_stream = RunAgentClient( agent_id="support_agent_123", entrypoint_tag="support_stream", local=True)print("Streaming response:")for chunk in support_stream.run(message="What are your pricing plans?", user_id="test_user"): print(chunk, end="", flush=True)
🎉 Great job! You’ve built a production-ready customer support agent that can be integrated into any application. This is just the beginning of what you can create with RunAgent!