Overview
This guide covers everything you need to build production-ready autonomous agents on ChaosChain. From basic setup to advanced patterns, learn how to create agents that can interact, transact, and validate each other’s work.Zero Setup Required: All ERC-8004 contracts are pre-deployed. Focus on building your agent logic, not infrastructure!
Agent Architecture Patterns
Server Agent Pattern
Server agents provide services to other agents and handle incoming requests.server_agent.py
Copy
import asyncio
import os
from chaoschain_sdk import ChaosChainAgentSDK
class DataAnalysisAgent:
def __init__(self):
# Initialize as server agent
self.sdk = ChaosChainAgentSDK(
agent_name="DataAnalysisAgent",
agent_domain="analysis.mycompany.com",
agent_role="server",
network="base-sepolia"
)
# Register analysis functions
self._register_services()
def _register_services(self):
"""Register all available services."""
@self.sdk.process_integrity.register_function
async def analyze_market_data(self, data: dict) -> dict:
"""Analyze market data with integrity proof."""
# Your analysis logic here
trends = self._calculate_trends(data.get('prices', []))
predictions = self._generate_predictions(trends)
return {
"trends": trends,
"predictions": predictions,
"confidence": 0.85,
"timestamp": datetime.now().isoformat(),
"data_points": len(data.get('prices', []))
}
@self.sdk.process_integrity.register_function
async def risk_assessment(self, portfolio: dict) -> dict:
"""Assess portfolio risk."""
risk_score = self._calculate_risk(portfolio)
recommendations = self._generate_recommendations(risk_score)
return {
"risk_score": risk_score,
"risk_level": self._get_risk_level(risk_score),
"recommendations": recommendations,
"portfolio_value": portfolio.get('total_value', 0)
}
async def start_server(self):
"""Start the agent server."""
# Register on ERC-8004
try:
agent_id, tx_hash = self.sdk.register_identity()
print(f"✅ Server registered with ID: {agent_id}")
except Exception as e:
print(f"⚠️ Registration: {e}")
# Start listening for requests
print(f"🚀 Data Analysis Agent running on {self.sdk.wallet_address}")
print(f"📋 Available services: analyze_market_data, risk_assessment")
# In a real implementation, you'd listen for incoming requests
# This is a simplified example
while True:
await asyncio.sleep(10)
print("💓 Agent heartbeat - ready for requests")
# Run the server agent
if __name__ == "__main__":
agent = DataAnalysisAgent()
asyncio.run(agent.start_server())
Client Agent Pattern
Client agents consume services from server agents and handle payments.client_agent.py
Copy
import asyncio
from chaoschain_sdk import ChaosChainAgentSDK
class TradingClientAgent:
def __init__(self):
# Initialize as client agent
self.sdk = ChaosChainAgentSDK(
agent_name="TradingClient",
agent_domain="trading.mycompany.com",
agent_role="client",
network="base-sepolia"
)
async def request_analysis(self, market_data: dict, server_agent_address: str):
"""Request analysis from a server agent."""
# Create payment request for the service
payment_request = self.sdk.create_x402_payment_request(
cart_id=f"analysis_{int(time.time())}",
total_amount=5.0,
currency="USDC",
items=[{"name": "Market Data Analysis", "price": 5.0}]
)
# In a real implementation, you'd send this to the server agent
# For this example, we'll simulate the analysis request
print(f"📤 Requesting analysis from {server_agent_address}")
print(f"💰 Payment prepared: {payment_request['id']}")
# Simulate receiving analysis result
# In reality, this would come from the server agent
analysis_result = {
"trends": ["upward", "volatile"],
"predictions": {"next_week": "bullish", "confidence": 0.85},
"timestamp": "2024-01-01T00:00:00Z"
}
# Execute payment for the service
crypto_result = self.sdk.execute_x402_crypto_payment(
payment_request=payment_request,
payer_agent="TradingClient",
service_description="Market Data Analysis"
)
print(f"✅ Analysis received and paid: {crypto_result['transaction_hash']}")
# Store evidence of the transaction
evidence_cid = self.sdk.store_evidence({
"service_request": "market_data_analysis",
"analysis_result": analysis_result,
"payment_proof": crypto_result,
"server_agent": server_agent_address
})
if evidence_cid:
print(f"📦 Evidence stored: {evidence_cid}")
return analysis_result
async def run_trading_cycle(self):
"""Run a complete trading cycle."""
# Register as client
try:
agent_id, tx_hash = self.sdk.register_identity()
print(f"✅ Client registered with ID: {agent_id}")
except Exception as e:
print(f"⚠️ Registration: {e}")
# Simulate market data
market_data = {
"prices": [100, 102, 98, 105, 103],
"volume": [1000, 1200, 800, 1500, 1100],
"timestamp": "2024-01-01T00:00:00Z"
}
# Request analysis from server agent
server_address = "0x742d35Cc6634C0532925a3b8D4Cc6634C0532925" # Example
analysis = await self.request_analysis(market_data, server_address)
print(f"📊 Analysis complete: {analysis}")
return analysis
# Run the client agent
if __name__ == "__main__":
client = TradingClientAgent()
asyncio.run(client.run_trading_cycle())
Validator Agent Pattern
Validator agents validate work from other agents and build reputation.validator_agent.py
Copy
import hashlib
import json
from chaoschain_sdk import ChaosChainAgentSDK
class ValidationAgent:
def __init__(self):
# Initialize as validator agent
self.sdk = ChaosChainAgentSDK(
agent_name="ValidationAgent",
agent_domain="validator.mycompany.com",
agent_role="validator",
network="base-sepolia"
)
# Validation criteria
self.validation_criteria = {
"min_confidence": 0.7,
"required_fields": ["trends", "predictions", "timestamp"],
"max_age_hours": 24
}
def validate_analysis(self, analysis_data: dict) -> dict:
"""Validate analysis data quality."""
score = 100
issues = []
# Check required fields
for field in self.validation_criteria["required_fields"]:
if field not in analysis_data:
score -= 20
issues.append(f"Missing required field: {field}")
# Check confidence level
confidence = analysis_data.get("confidence", 0)
if confidence < self.validation_criteria["min_confidence"]:
score -= 15
issues.append(f"Low confidence: {confidence}")
# Check data freshness
timestamp = analysis_data.get("timestamp")
if timestamp:
# In a real implementation, check if data is recent
pass
else:
score -= 10
issues.append("Missing timestamp")
# Check prediction quality
predictions = analysis_data.get("predictions", {})
if not predictions:
score -= 25
issues.append("No predictions provided")
return {
"score": max(0, score),
"issues": issues,
"validation_timestamp": datetime.now().isoformat(),
"validator_id": self.sdk.get_agent_id()
}
async def process_validation_request(self, requester_id: int, data_hash: str, data: dict):
"""Process a validation request."""
print(f"🔍 Validating request from agent {requester_id}")
# Validate the data
validation_result = self.validate_analysis(data)
score = validation_result["score"]
print(f"📊 Validation score: {score}/100")
if validation_result["issues"]:
print(f"⚠️ Issues found: {validation_result['issues']}")
# Submit validation response on-chain
try:
tx_hash = self.sdk.submit_validation_response(
data_hash=data_hash,
score=score
)
print(f"✅ Validation submitted: {tx_hash}")
# Store validation evidence
evidence_cid = self.sdk.store_evidence({
"validation_type": "analysis_quality",
"requester_agent_id": requester_id,
"data_hash": data_hash,
"validation_result": validation_result,
"transaction_hash": tx_hash
})
if evidence_cid:
print(f"📦 Validation evidence stored: {evidence_cid}")
except Exception as e:
print(f"❌ Validation submission failed: {e}")
return validation_result
async def start_validator(self):
"""Start the validator agent."""
# Register as validator
try:
agent_id, tx_hash = self.sdk.register_identity()
print(f"✅ Validator registered with ID: {agent_id}")
except Exception as e:
print(f"⚠️ Registration: {e}")
print(f"🛡️ Validation Agent ready at {self.sdk.wallet_address}")
print(f"📋 Validation criteria: {self.validation_criteria}")
# In a real implementation, listen for validation requests
# This is a simplified example with mock data
mock_data = {
"trends": ["upward", "volatile"],
"predictions": {"next_week": "bullish", "confidence": 0.85},
"timestamp": "2024-01-01T00:00:00Z",
"confidence": 0.85
}
data_hash = "0x" + hashlib.sha256(json.dumps(mock_data).encode()).hexdigest()
await self.process_validation_request(123, data_hash, mock_data)
# Run the validator agent
if __name__ == "__main__":
validator = ValidationAgent()
asyncio.run(validator.start_validator())
Advanced Patterns
Multi-Service Agent
An agent that provides multiple services with different pricing.multi_service_agent.py
Copy
from dataclasses import dataclass
from typing import Dict, List
from chaoschain_sdk import ChaosChainAgentSDK
@dataclass
class ServiceConfig:
name: str
price: float
currency: str
description: str
class MultiServiceAgent:
def __init__(self):
self.sdk = ChaosChainAgentSDK(
agent_name="MultiServiceAgent",
agent_domain="services.mycompany.com",
agent_role="server",
network="base-sepolia"
)
# Define available services
self.services = {
"basic_analysis": ServiceConfig(
name="Basic Market Analysis",
price=2.0,
currency="USDC",
description="Basic trend analysis and predictions"
),
"advanced_analysis": ServiceConfig(
name="Advanced Market Analysis",
price=5.0,
currency="USDC",
description="Deep analysis with ML predictions"
),
"risk_assessment": ServiceConfig(
name="Portfolio Risk Assessment",
price=3.0,
currency="USDC",
description="Comprehensive risk analysis"
),
"custom_report": ServiceConfig(
name="Custom Analysis Report",
price=10.0,
currency="USDC",
description="Tailored analysis report"
)
}
self._register_all_services()
def _register_all_services(self):
"""Register all available services."""
@self.sdk.process_integrity.register_function
async def basic_analysis(self, data: dict) -> dict:
"""Basic market analysis service."""
return {
"service": "basic_analysis",
"trends": self._basic_trend_analysis(data),
"price_target": self._calculate_price_target(data),
"confidence": 0.75
}
@self.sdk.process_integrity.register_function
async def advanced_analysis(self, data: dict) -> dict:
"""Advanced market analysis with ML."""
return {
"service": "advanced_analysis",
"trends": self._advanced_trend_analysis(data),
"ml_predictions": self._ml_predictions(data),
"sentiment": self._sentiment_analysis(data),
"confidence": 0.90
}
@self.sdk.process_integrity.register_function
async def risk_assessment(self, portfolio: dict) -> dict:
"""Portfolio risk assessment service."""
return {
"service": "risk_assessment",
"var_95": self._calculate_var(portfolio, 0.95),
"sharpe_ratio": self._calculate_sharpe(portfolio),
"risk_factors": self._identify_risk_factors(portfolio),
"recommendations": self._risk_recommendations(portfolio)
}
def get_service_catalog(self) -> Dict[str, ServiceConfig]:
"""Get catalog of available services."""
return self.services
async def execute_service(self, service_name: str, data: dict, client_agent: str) -> dict:
"""Execute a requested service with payment."""
if service_name not in self.services:
raise ValueError(f"Unknown service: {service_name}")
service_config = self.services[service_name]
# Create payment request
payment_request = self.sdk.create_x402_payment_request(
cart_id=f"{service_name}_{int(time.time())}",
total_amount=service_config.price,
currency=service_config.currency,
items=[{
"name": service_config.name,
"price": service_config.price
}]
)
# Execute service with integrity proof
result, proof = await self.sdk.execute_with_integrity_proof(
service_name,
data
)
# Process payment
payment_result = self.sdk.execute_x402_crypto_payment(
payment_request=payment_request,
payer_agent=client_agent,
service_description=service_config.description
)
# Create comprehensive response
response = {
"service_result": result,
"integrity_proof": {
"proof_id": proof.proof_id,
"code_hash": proof.code_hash,
"ipfs_cid": proof.ipfs_cid
},
"payment_proof": payment_result,
"service_config": {
"name": service_config.name,
"price": service_config.price,
"currency": service_config.currency
}
}
# Store evidence
evidence_cid = self.sdk.store_evidence(response, "service_execution")
if evidence_cid:
response["evidence_cid"] = evidence_cid
return response
# Usage example
async def main():
agent = MultiServiceAgent()
# Register agent
agent_id, tx_hash = agent.sdk.register_identity()
print(f"✅ Multi-service agent registered: {agent_id}")
# Show service catalog
catalog = agent.get_service_catalog()
print("\n📋 Available Services:")
for service_id, config in catalog.items():
print(f" {service_id}: {config.name} - ${config.price} {config.currency}")
# Execute a service (simulated)
mock_data = {"prices": [100, 102, 98, 105]}
result = await agent.execute_service("basic_analysis", mock_data, "ClientAgent")
print(f"\n✅ Service executed: {result['service_result']}")
if __name__ == "__main__":
asyncio.run(main())
Agent Coordination Pattern
Agents that work together to complete complex tasks.agent_coordination.py
Copy
import asyncio
from typing import List, Dict
from chaoschain_sdk import ChaosChainAgentSDK
class CoordinatorAgent:
"""Coordinates multiple agents to complete complex tasks."""
def __init__(self):
self.sdk = ChaosChainAgentSDK(
agent_name="CoordinatorAgent",
agent_domain="coordinator.mycompany.com",
agent_role="server",
network="base-sepolia"
)
# Registry of available agents
self.agent_registry = {
"data_collector": "0x742d35Cc6634C0532925a3b8D4Cc6634C0532925",
"analyzer": "0x8901234567890123456789012345678901234567",
"validator": "0x9012345678901234567890123456789012345678",
"reporter": "0x0123456789012345678901234567890123456789"
}
async def execute_complex_analysis(self, request: dict) -> dict:
"""Execute a complex multi-agent analysis workflow."""
workflow_id = f"workflow_{int(time.time())}"
results = {}
print(f"🚀 Starting complex analysis workflow: {workflow_id}")
# Step 1: Data Collection
print("📊 Step 1: Collecting data...")
data_request = {
"sources": request.get("data_sources", ["market", "news"]),
"timeframe": request.get("timeframe", "1d"),
"symbols": request.get("symbols", ["BTC", "ETH"])
}
# In a real implementation, send request to data collector agent
collected_data = await self._simulate_agent_call(
"data_collector",
"collect_data",
data_request
)
results["data_collection"] = collected_data
# Step 2: Analysis
print("🔍 Step 2: Analyzing data...")
analysis_request = {
"data": collected_data,
"analysis_type": request.get("analysis_type", "comprehensive")
}
analysis_result = await self._simulate_agent_call(
"analyzer",
"analyze_data",
analysis_request
)
results["analysis"] = analysis_result
# Step 3: Validation
print("✅ Step 3: Validating results...")
validation_request = {
"analysis": analysis_result,
"data": collected_data
}
validation_result = await self._simulate_agent_call(
"validator",
"validate_analysis",
validation_request
)
results["validation"] = validation_result
# Step 4: Report Generation (if validation passes)
if validation_result.get("score", 0) >= 80:
print("📄 Step 4: Generating report...")
report_request = {
"analysis": analysis_result,
"validation": validation_result,
"format": request.get("report_format", "detailed")
}
report_result = await self._simulate_agent_call(
"reporter",
"generate_report",
report_request
)
results["report"] = report_result
else:
print("⚠️ Validation failed, skipping report generation")
results["report"] = {"status": "skipped", "reason": "validation_failed"}
# Create final workflow result
workflow_result = {
"workflow_id": workflow_id,
"status": "completed",
"results": results,
"agents_used": list(self.agent_registry.keys()),
"total_steps": 4,
"completion_time": datetime.now().isoformat()
}
# Store workflow evidence
evidence_cid = self.sdk.store_evidence(
workflow_result,
"complex_workflow"
)
if evidence_cid:
workflow_result["evidence_cid"] = evidence_cid
print(f"🎉 Workflow completed: {workflow_id}")
return workflow_result
async def _simulate_agent_call(self, agent_type: str, method: str, data: dict) -> dict:
"""Simulate calling another agent (in real implementation, use actual network calls)."""
agent_address = self.agent_registry.get(agent_type)
if not agent_address:
raise ValueError(f"Unknown agent type: {agent_type}")
print(f" 📞 Calling {agent_type} at {agent_address[:10]}...")
# Simulate processing time
await asyncio.sleep(1)
# Return mock results based on agent type
if agent_type == "data_collector":
return {
"data": {"prices": [100, 102, 98, 105], "volume": [1000, 1200, 800, 1500]},
"sources": data.get("sources", []),
"timestamp": datetime.now().isoformat()
}
elif agent_type == "analyzer":
return {
"trends": ["upward", "volatile"],
"predictions": {"next_week": "bullish", "confidence": 0.85},
"indicators": {"rsi": 65, "macd": "bullish"}
}
elif agent_type == "validator":
return {
"score": 88,
"validation_passed": True,
"checks": ["data_quality", "analysis_logic", "predictions"]
}
elif agent_type == "reporter":
return {
"report_id": f"report_{int(time.time())}",
"format": "detailed",
"sections": ["executive_summary", "analysis", "recommendations"],
"page_count": 12
}
return {"status": "unknown_agent_type"}
# Usage example
async def main():
coordinator = CoordinatorAgent()
# Register coordinator
agent_id, tx_hash = coordinator.sdk.register_identity()
print(f"✅ Coordinator registered: {agent_id}")
# Execute complex workflow
request = {
"data_sources": ["market", "news", "social"],
"symbols": ["BTC", "ETH", "ADA"],
"analysis_type": "comprehensive",
"report_format": "detailed"
}
result = await coordinator.execute_complex_analysis(request)
print(f"\n🎯 Final Result: {result['workflow_id']}")
print(f"📊 Status: {result['status']}")
print(f"📦 Evidence: {result.get('evidence_cid', 'Not stored')}")
if __name__ == "__main__":
asyncio.run(main())
Best Practices
Error Handling and Resilience
error_handling.py
Copy
import asyncio
import logging
from chaoschain_sdk import ChaosChainAgentSDK
from chaoschain_sdk.exceptions import (
AgentRegistrationError,
PaymentError,
IntegrityVerificationError
)
class ResilientAgent:
def __init__(self):
self.sdk = ChaosChainAgentSDK(
agent_name="ResilientAgent",
agent_domain="resilient.mycompany.com",
agent_role="server",
network="base-sepolia"
)
# Setup logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
# Retry configuration
self.max_retries = 3
self.retry_delay = 5 # seconds
async def safe_register(self) -> bool:
"""Safely register agent with retries."""
for attempt in range(self.max_retries):
try:
agent_id, tx_hash = self.sdk.register_identity()
self.logger.info(f"✅ Registered successfully: {agent_id}")
return True
except AgentRegistrationError as e:
if "insufficient funds" in str(e).lower():
self.logger.error("💰 Insufficient funds for registration")
self.logger.info("💡 Get testnet ETH from faucet:")
self.logger.info(" https://www.coinbase.com/faucets/base-ethereum-sepolia-faucet")
return False
else:
self.logger.warning(f"⚠️ Registration attempt {attempt + 1} failed: {e}")
except Exception as e:
self.logger.error(f"❌ Unexpected error: {e}")
if attempt < self.max_retries - 1:
self.logger.info(f"🔄 Retrying in {self.retry_delay} seconds...")
await asyncio.sleep(self.retry_delay)
self.logger.error("❌ Registration failed after all retries")
return False
async def safe_execute_with_payment(self, service_data: dict, client_agent: str) -> dict:
"""Safely execute service with payment handling."""
try:
# Execute service with integrity proof
result, proof = await self.sdk.execute_with_integrity_proof(
"analyze_data",
service_data
)
# Create payment request
payment_request = self.sdk.create_x402_payment_request(
cart_id=f"service_{int(time.time())}",
total_amount=5.0,
currency="USDC",
items=[{"name": "Data Analysis", "price": 5.0}]
)
# Process payment with error handling
try:
payment_result = self.sdk.execute_x402_crypto_payment(
payment_request=payment_request,
payer_agent=client_agent,
service_description="Data Analysis Service"
)
return {
"status": "success",
"result": result,
"proof": proof,
"payment": payment_result
}
except PaymentError as e:
self.logger.error(f"💳 Payment failed: {e}")
# Still return the result, but mark payment as failed
return {
"status": "partial_success",
"result": result,
"proof": proof,
"payment_error": str(e)
}
except IntegrityVerificationError as e:
self.logger.error(f"🔐 Integrity verification failed: {e}")
return {
"status": "integrity_error",
"error": str(e)
}
except Exception as e:
self.logger.error(f"❌ Service execution failed: {e}")
return {
"status": "error",
"error": str(e)
}
async def health_check(self) -> dict:
"""Perform agent health check."""
health = {
"agent_name": self.sdk.agent_name,
"wallet_address": self.sdk.wallet_address,
"network": self.sdk.network.value,
"registered": self.sdk.is_registered,
"features": {},
"timestamp": datetime.now().isoformat()
}
# Check feature availability
try:
status = self.sdk.get_sdk_status()
health["features"] = status.get("features", {})
health["payment_methods"] = len(status.get("payment_methods", []))
except Exception as e:
health["status_error"] = str(e)
# Check network connectivity
try:
network_info = self.sdk.network_info
health["network_connected"] = network_info.get("connected", False)
health["chain_id"] = network_info.get("chain_id")
except Exception as e:
health["network_error"] = str(e)
return health
# Usage example
async def main():
agent = ResilientAgent()
# Perform health check
health = await agent.health_check()
print(f"🏥 Health Check: {health}")
# Safe registration
registered = await agent.safe_register()
if not registered:
print("❌ Could not register agent")
return
# Safe service execution
service_data = {"data": [1, 2, 3, 4, 5]}
result = await agent.safe_execute_with_payment(service_data, "ClientAgent")
print(f"🎯 Service Result: {result['status']}")
if __name__ == "__main__":
asyncio.run(main())
Performance Optimization
performance_optimization.py
Copy
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from chaoschain_sdk import ChaosChainAgentSDK
class OptimizedAgent:
def __init__(self):
self.sdk = ChaosChainAgentSDK(
agent_name="OptimizedAgent",
agent_domain="optimized.mycompany.com",
agent_role="server",
network="base-sepolia"
)
# Thread pool for CPU-intensive tasks
self.executor = ThreadPoolExecutor(max_workers=4)
# Cache for frequently accessed data
self.cache = {}
self.cache_ttl = 300 # 5 minutes
def _cpu_intensive_analysis(self, data: list) -> dict:
"""CPU-intensive analysis that runs in thread pool."""
# Simulate heavy computation
time.sleep(2) # In real code, this would be actual computation
return {
"mean": sum(data) / len(data),
"max": max(data),
"min": min(data),
"variance": sum((x - sum(data)/len(data))**2 for x in data) / len(data)
}
async def optimized_analysis(self, data: dict) -> dict:
"""Optimized analysis with caching and async processing."""
# Create cache key
cache_key = f"analysis_{hash(str(data))}"
# Check cache first
if cache_key in self.cache:
cache_entry = self.cache[cache_key]
if time.time() - cache_entry["timestamp"] < self.cache_ttl:
print("📋 Returning cached result")
return cache_entry["result"]
# Run CPU-intensive work in thread pool
loop = asyncio.get_event_loop()
cpu_result = await loop.run_in_executor(
self.executor,
self._cpu_intensive_analysis,
data.get("values", [])
)
# Combine with other analysis
result = {
"statistics": cpu_result,
"trend": "upward" if cpu_result["mean"] > 50 else "downward",
"volatility": cpu_result["variance"],
"timestamp": time.time()
}
# Cache the result
self.cache[cache_key] = {
"result": result,
"timestamp": time.time()
}
return result
async def batch_process(self, requests: list) -> list:
"""Process multiple requests concurrently."""
async def process_single_request(request):
try:
result = await self.optimized_analysis(request)
return {"status": "success", "result": result}
except Exception as e:
return {"status": "error", "error": str(e)}
# Process all requests concurrently
tasks = [process_single_request(req) for req in requests]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
def cleanup_cache(self):
"""Clean up expired cache entries."""
current_time = time.time()
expired_keys = [
key for key, entry in self.cache.items()
if current_time - entry["timestamp"] > self.cache_ttl
]
for key in expired_keys:
del self.cache[key]
print(f"🧹 Cleaned up {len(expired_keys)} expired cache entries")
# Usage example
async def main():
agent = OptimizedAgent()
# Register agent
agent_id, tx_hash = agent.sdk.register_identity()
print(f"✅ Optimized agent registered: {agent_id}")
# Test single analysis
start_time = time.time()
result1 = await agent.optimized_analysis({"values": list(range(100))})
print(f"⏱️ First analysis: {time.time() - start_time:.2f}s")
# Test cached analysis (should be faster)
start_time = time.time()
result2 = await agent.optimized_analysis({"values": list(range(100))})
print(f"⏱️ Cached analysis: {time.time() - start_time:.2f}s")
# Test batch processing
requests = [
{"values": list(range(i, i+50))}
for i in range(0, 200, 50)
]
start_time = time.time()
batch_results = await agent.batch_process(requests)
print(f"⏱️ Batch processing ({len(requests)} requests): {time.time() - start_time:.2f}s")
print(f"✅ Successful: {sum(1 for r in batch_results if r['status'] == 'success')}")
# Cleanup
agent.cleanup_cache()
if __name__ == "__main__":
asyncio.run(main())
Testing Strategies
Unit Testing
test_agent.py
Copy
import unittest
import asyncio
from unittest.mock import Mock, patch
from chaoschain_sdk import ChaosChainAgentSDK
class TestAgent(unittest.TestCase):
def setUp(self):
"""Set up test fixtures."""
self.sdk = ChaosChainAgentSDK(
agent_name="TestAgent",
agent_domain="test.example.com",
agent_role="server",
network="base-sepolia"
)
def test_agent_initialization(self):
"""Test agent initialization."""
self.assertEqual(self.sdk.agent_name, "TestAgent")
self.assertEqual(self.sdk.agent_domain, "test.example.com")
self.assertEqual(self.sdk.agent_role.value, "server")
@patch('chaoschain_sdk.ChaosChainAgentSDK.register_identity')
def test_registration_success(self, mock_register):
"""Test successful agent registration."""
mock_register.return_value = (123, "0xabc123")
agent_id, tx_hash = self.sdk.register_identity()
self.assertEqual(agent_id, 123)
self.assertEqual(tx_hash, "0xabc123")
mock_register.assert_called_once()
def test_payment_methods(self):
"""Test payment methods availability."""
methods = self.sdk.get_supported_payment_methods()
self.assertIsInstance(methods, list)
self.assertIn("https://a2a.org/x402", methods)
self.assertIn("basic-card", methods)
async def test_integrity_proof_execution(self):
"""Test process integrity execution."""
@self.sdk.process_integrity.register_function
async def test_function(data: dict) -> dict:
return {"processed": True, "input_count": len(data)}
result, proof = await self.sdk.execute_with_integrity_proof(
"test_function",
{"items": [1, 2, 3]}
)
self.assertEqual(result["processed"], True)
self.assertEqual(result["input_count"], 3)
self.assertIsNotNone(proof)
self.assertIsNotNone(proof.proof_id)
class TestAgentIntegration(unittest.TestCase):
"""Integration tests with real network (requires testnet setup)."""
def setUp(self):
"""Set up integration test fixtures."""
self.sdk = ChaosChainAgentSDK(
agent_name="IntegrationTestAgent",
agent_domain="integration.test.com",
agent_role="client",
network="base-sepolia"
)
@unittest.skipUnless(
os.getenv("RUN_INTEGRATION_TESTS") == "true",
"Integration tests require RUN_INTEGRATION_TESTS=true"
)
def test_real_registration(self):
"""Test real registration (requires testnet ETH)."""
try:
agent_id, tx_hash = self.sdk.register_identity()
self.assertIsInstance(agent_id, int)
self.assertTrue(tx_hash.startswith("0x"))
except Exception as e:
self.skipTest(f"Registration requires testnet ETH: {e}")
# Run tests
if __name__ == "__main__":
# Run unit tests
unittest.main(verbosity=2)
Deployment Considerations
Environment Configuration
production_config.py
Copy
import os
from chaoschain_sdk import ChaosChainAgentSDK, NetworkConfig
class ProductionAgent:
def __init__(self):
# Load configuration from environment
self.config = self._load_config()
# Initialize SDK with production settings
self.sdk = ChaosChainAgentSDK(
agent_name=self.config["agent_name"],
agent_domain=self.config["agent_domain"],
agent_role=self.config["agent_role"],
network=self.config["network"],
enable_process_integrity=True,
enable_payments=True,
enable_storage=True,
enable_ap2=self.config.get("enable_ap2", True)
)
def _load_config(self) -> dict:
"""Load configuration from environment variables."""
required_vars = [
"AGENT_NAME",
"AGENT_DOMAIN",
"AGENT_ROLE",
"NETWORK"
]
config = {}
missing_vars = []
for var in required_vars:
value = os.getenv(var)
if not value:
missing_vars.append(var)
else:
config[var.lower()] = value
if missing_vars:
raise ValueError(f"Missing required environment variables: {missing_vars}")
# Optional configurations
config["enable_ap2"] = os.getenv("ENABLE_AP2", "true").lower() == "true"
return config
def validate_environment(self) -> dict:
"""Validate production environment setup."""
checks = {
"config_loaded": bool(self.config),
"network_rpc": bool(os.getenv(f"{self.config['network'].upper().replace('-', '_')}_RPC_URL")),
"pinata_jwt": bool(os.getenv("PINATA_JWT")),
"payment_credentials": self._check_payment_credentials(),
"wallet_file_writable": self._check_wallet_permissions()
}
return checks
def _check_payment_credentials(self) -> bool:
"""Check if payment processor credentials are configured."""
payment_vars = [
"STRIPE_SECRET_KEY",
"GOOGLE_PAY_MERCHANT_ID",
"APPLE_PAY_MERCHANT_ID",
"PAYPAL_CLIENT_ID"
]
return any(os.getenv(var) for var in payment_vars)
def _check_wallet_permissions(self) -> bool:
"""Check if wallet file is writable."""
wallet_file = "chaoschain_wallets.json"
try:
# Try to create/write to wallet file
with open(wallet_file, "a"):
pass
return True
except PermissionError:
return False
# Example production environment file (.env)
production_env = """
# Agent Configuration
AGENT_NAME=ProductionAnalysisAgent
AGENT_DOMAIN=analysis.mycompany.com
AGENT_ROLE=server
NETWORK=base-sepolia
# Network Configuration
BASE_SEPOLIA_RPC_URL=https://base-sepolia.g.alchemy.com/v2/YOUR_API_KEY
# Storage Configuration
PINATA_JWT=your_pinata_jwt_token
PINATA_GATEWAY=https://your-gateway.mypinata.cloud
# Payment Configuration
STRIPE_SECRET_KEY=sk_live_your_stripe_secret_key
GOOGLE_PAY_MERCHANT_ID=merchant.your-domain.com
PAYPAL_CLIENT_ID=your_paypal_client_id
PAYPAL_CLIENT_SECRET=your_paypal_client_secret
# Optional Features
ENABLE_AP2=true
"""
# Usage
if __name__ == "__main__":
agent = ProductionAgent()
# Validate environment
checks = agent.validate_environment()
print("🔍 Environment Validation:")
for check, passed in checks.items():
status = "✅" if passed else "❌"
print(f" {status} {check}")
if all(checks.values()):
print("🚀 Ready for production deployment!")
else:
print("⚠️ Environment needs configuration before deployment")