Overview
This page provides complete, working examples for common ChaosChain SDK use cases. All examples are production-ready and include proper error handling.Basic Agent Setup
Simple Server Agent
simple_server.py
Copy
import asyncio
import os
from chaoschain_sdk import ChaosChainAgentSDK
# Set up environment
os.environ['BASE_SEPOLIA_RPC_URL'] = 'https://sepolia.base.org'
async def main():
# Initialize server agent
sdk = ChaosChainAgentSDK(
agent_name="SimpleServer",
agent_domain="simple.example.com",
agent_role="server",
network="base-sepolia"
)
print(f"🚀 Server initialized: {sdk.agent_name}")
print(f"📍 Wallet: {sdk.wallet_address}")
# Register a simple service
@sdk.process_integrity.register_function
async def hello_world(name: str) -> dict:
return {
"message": f"Hello, {name}!",
"timestamp": "2024-01-01T00:00:00Z",
"agent": sdk.agent_name
}
# Register on blockchain
try:
agent_id, tx_hash = sdk.register_identity()
print(f"✅ Registered with ID: {agent_id}")
except Exception as e:
print(f"⚠️ Registration: {e}")
# Execute service with integrity proof
result, proof = await sdk.execute_with_integrity_proof(
"hello_world",
{"name": "World"}
)
print(f"📊 Result: {result}")
print(f"🔐 Proof ID: {proof.proof_id}")
if __name__ == "__main__":
asyncio.run(main())
Simple Client Agent
simple_client.py
Copy
import asyncio
import os
from chaoschain_sdk import ChaosChainAgentSDK
async def main():
# Initialize client agent
sdk = ChaosChainAgentSDK(
agent_name="SimpleClient",
agent_domain="client.example.com",
agent_role="client",
network="base-sepolia"
)
print(f"🚀 Client initialized: {sdk.agent_name}")
# Register on blockchain
try:
agent_id, tx_hash = sdk.register_identity()
print(f"✅ Registered with ID: {agent_id}")
except Exception as e:
print(f"⚠️ Registration: {e}")
# Create a payment request
payment_request = sdk.create_x402_payment_request(
cart_id="simple_service_001",
total_amount=1.0,
currency="USDC",
items=[{"name": "Hello World Service", "price": 1.0}]
)
print(f"💰 Payment request created: {payment_request['id']}")
# Execute crypto payment
crypto_result = sdk.execute_x402_crypto_payment(
payment_request=payment_request,
payer_agent="SimpleClient",
service_description="Hello World Service"
)
print(f"✅ Payment completed: {crypto_result['transaction_hash']}")
print(f"💵 Amount: {crypto_result['amount']} USDC")
if __name__ == "__main__":
asyncio.run(main())
Data Analysis Agent
Complete Analysis Service
analysis_agent.py
Copy
import asyncio
import json
import hashlib
import statistics
from datetime import datetime
from chaoschain_sdk import ChaosChainAgentSDK
class DataAnalysisAgent:
def __init__(self):
self.sdk = ChaosChainAgentSDK(
agent_name="DataAnalysisAgent",
agent_domain="analysis.mycompany.com",
agent_role="server",
network="base-sepolia"
)
self._register_services()
def _register_services(self):
"""Register all analysis services."""
@self.sdk.process_integrity.register_function
async def statistical_analysis(self, data: dict) -> dict:
"""Perform statistical analysis on numerical data."""
values = data.get('values', [])
if not values:
return {"error": "No values provided"}
try:
analysis = {
"count": len(values),
"mean": statistics.mean(values),
"median": statistics.median(values),
"std_dev": statistics.stdev(values) if len(values) > 1 else 0,
"min": min(values),
"max": max(values),
"range": max(values) - min(values),
"timestamp": datetime.now().isoformat()
}
# Add quartiles if enough data
if len(values) >= 4:
sorted_values = sorted(values)
analysis["q1"] = statistics.median(sorted_values[:len(values)//2])
analysis["q3"] = statistics.median(sorted_values[len(values)//2:])
return analysis
except Exception as e:
return {"error": f"Analysis failed: {str(e)}"}
@self.sdk.process_integrity.register_function
async def trend_analysis(self, data: dict) -> dict:
"""Analyze trends in time series data."""
values = data.get('values', [])
timestamps = data.get('timestamps', [])
if len(values) < 2:
return {"error": "Need at least 2 data points for trend analysis"}
# Calculate simple trend
differences = [values[i] - values[i-1] for i in range(1, len(values))]
avg_change = sum(differences) / len(differences)
# Determine trend direction
if avg_change > 0:
trend = "upward"
elif avg_change < 0:
trend = "downward"
else:
trend = "stable"
# Calculate volatility
volatility = statistics.stdev(differences) if len(differences) > 1 else 0
return {
"trend_direction": trend,
"average_change": avg_change,
"volatility": volatility,
"total_change": values[-1] - values[0],
"change_percentage": ((values[-1] - values[0]) / values[0] * 100) if values[0] != 0 else 0,
"data_points": len(values),
"timestamp": datetime.now().isoformat()
}
@self.sdk.process_integrity.register_function
async def correlation_analysis(self, data: dict) -> dict:
"""Analyze correlation between two data series."""
series_a = data.get('series_a', [])
series_b = data.get('series_b', [])
if len(series_a) != len(series_b) or len(series_a) < 2:
return {"error": "Series must have equal length and at least 2 points"}
try:
correlation = statistics.correlation(series_a, series_b)
# Interpret correlation strength
abs_corr = abs(correlation)
if abs_corr >= 0.8:
strength = "very strong"
elif abs_corr >= 0.6:
strength = "strong"
elif abs_corr >= 0.4:
strength = "moderate"
elif abs_corr >= 0.2:
strength = "weak"
else:
strength = "very weak"
return {
"correlation_coefficient": correlation,
"correlation_strength": strength,
"correlation_direction": "positive" if correlation > 0 else "negative",
"data_points": len(series_a),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {"error": f"Correlation analysis failed: {str(e)}"}
async def process_analysis_request(self, service_type: str, data: dict, client_agent: str) -> dict:
"""Process an analysis request with payment."""
# Service pricing
pricing = {
"statistical_analysis": 2.0,
"trend_analysis": 3.0,
"correlation_analysis": 4.0
}
if service_type not in pricing:
return {"error": f"Unknown service type: {service_type}"}
price = pricing[service_type]
# Create payment request
payment_request = self.sdk.create_x402_payment_request(
cart_id=f"{service_type}_{int(datetime.now().timestamp())}",
total_amount=price,
currency="USDC",
items=[{"name": service_type.replace('_', ' ').title(), "price": price}]
)
# Execute analysis with integrity proof
result, proof = await self.sdk.execute_with_integrity_proof(
service_type,
data
)
# Process payment
payment_result = self.sdk.execute_x402_crypto_payment(
payment_request=payment_request,
payer_agent=client_agent,
service_description=f"{service_type.replace('_', ' ').title()} Service"
)
# Create comprehensive response
response = {
"service_type": service_type,
"analysis_result": result,
"integrity_proof": {
"proof_id": proof.proof_id,
"code_hash": proof.code_hash
},
"payment_confirmation": {
"transaction_hash": payment_result["transaction_hash"],
"amount": payment_result["amount"],
"currency": "USDC"
},
"timestamp": datetime.now().isoformat()
}
# Store evidence
evidence_cid = self.sdk.store_evidence(response, "analysis_service")
if evidence_cid:
response["evidence_cid"] = evidence_cid
return response
async def start_service(self):
"""Start the analysis service."""
# Register agent
try:
agent_id, tx_hash = self.sdk.register_identity()
print(f"✅ Analysis Agent registered with ID: {agent_id}")
except Exception as e:
print(f"⚠️ Registration: {e}")
print(f"🔬 Data Analysis Agent running at {self.sdk.wallet_address}")
print("📋 Available services:")
print(" - statistical_analysis ($2.00 USDC)")
print(" - trend_analysis ($3.00 USDC)")
print(" - correlation_analysis ($4.00 USDC)")
# Example usage
sample_data = {
"values": [10, 12, 11, 15, 14, 18, 16, 20, 19, 22]
}
result = await self.process_analysis_request(
"statistical_analysis",
sample_data,
"DemoClient"
)
print(f"\n📊 Sample Analysis Result:")
print(f" Mean: {result['analysis_result']['mean']:.2f}")
print(f" Std Dev: {result['analysis_result']['std_dev']:.2f}")
print(f" Payment: {result['payment_confirmation']['transaction_hash']}")
# Run the analysis agent
if __name__ == "__main__":
agent = DataAnalysisAgent()
asyncio.run(agent.start_service())
Payment Processing Examples
Multi-Payment Agent
payment_agent.py
Copy
import asyncio
import os
from chaoschain_sdk import ChaosChainAgentSDK
# Setup payment credentials
os.environ.update({
'BASE_SEPOLIA_RPC_URL': 'https://sepolia.base.org',
'STRIPE_SECRET_KEY': 'sk_test_your_stripe_key', # Use test key for demo
'GOOGLE_PAY_MERCHANT_ID': 'merchant.your-domain.com',
'PAYPAL_CLIENT_ID': 'your_paypal_client_id'
})
class PaymentAgent:
def __init__(self):
self.sdk = ChaosChainAgentSDK(
agent_name="PaymentAgent",
agent_domain="payments.example.com",
agent_role="server",
network="base-sepolia",
enable_payments=True
)
async def demonstrate_crypto_payment(self):
"""Demonstrate A2A-x402 crypto payment."""
print("💰 Demonstrating crypto payment...")
# Create payment request
payment_request = self.sdk.create_x402_payment_request(
cart_id="crypto_demo_001",
total_amount=5.0,
currency="USDC",
items=[
{"name": "Premium Analysis", "price": 4.0},
{"name": "Report Generation", "price": 1.0}
]
)
print(f"📄 Payment Request: {payment_request['id']}")
print(f"💵 Total: {payment_request['total_amount']} {payment_request['currency']}")
# Execute crypto payment
crypto_result = self.sdk.execute_x402_crypto_payment(
payment_request=payment_request,
payer_agent="DemoClient",
service_description="Premium Analysis with Report"
)
print(f"✅ Crypto Payment Success!")
print(f" Transaction: {crypto_result['transaction_hash']}")
print(f" Amount: {crypto_result['amount']} USDC")
print(f" Protocol Fee: {crypto_result['protocol_fee']} USDC")
return crypto_result
async def demonstrate_card_payment(self):
"""Demonstrate traditional card payment."""
print("\n💳 Demonstrating card payment...")
# Demo card payment (uses Stripe test mode)
card_result = self.sdk.execute_traditional_payment(
payment_method="basic-card",
amount=25.99,
currency="USD",
payment_data={
"cardNumber": "4111111111111111", # Stripe test card
"cardType": "visa",
"expiryMonth": "12",
"expiryYear": "2025",
"cvv": "123",
"billingAddress": {
"line1": "123 Test St",
"city": "Test City",
"state": "CA",
"postal_code": "12345",
"country": "US"
}
}
)
print(f"✅ Card Payment Success!")
print(f" Status: {card_result['status']}")
print(f" Charge ID: {card_result.get('charge_id', 'N/A')}")
return card_result
async def demonstrate_google_pay(self):
"""Demonstrate Google Pay payment."""
print("\n📱 Demonstrating Google Pay...")
# Demo Google Pay payment
gpay_result = self.sdk.execute_traditional_payment(
payment_method="https://google.com/pay",
amount=15.50,
currency="USD",
payment_data={
"googleTransactionId": f"gpay_demo_{int(asyncio.get_event_loop().time())}",
"paymentMethodType": "CARD",
"paymentToken": "demo_encrypted_token_from_google",
"billingAddress": {
"name": "Demo User",
"address1": "123 Demo St",
"locality": "Demo City",
"administrativeArea": "CA",
"postalCode": "12345",
"countryCode": "US"
}
}
)
print(f"✅ Google Pay Success!")
print(f" Status: {gpay_result['status']}")
print(f" Transaction ID: {gpay_result.get('transaction_id', 'N/A')}")
return gpay_result
async def run_payment_demos(self):
"""Run all payment demonstrations."""
# Register agent
try:
agent_id, tx_hash = self.sdk.register_identity()
print(f"✅ Payment Agent registered with ID: {agent_id}")
except Exception as e:
print(f"⚠️ Registration: {e}")
# Show supported payment methods
methods = self.sdk.get_supported_payment_methods()
print(f"\n💳 Supported Payment Methods ({len(methods)}):")
for method in methods:
print(f" - {method}")
# Run demonstrations
results = {}
try:
results['crypto'] = await self.demonstrate_crypto_payment()
except Exception as e:
print(f"❌ Crypto payment failed: {e}")
try:
results['card'] = await self.demonstrate_card_payment()
except Exception as e:
print(f"❌ Card payment failed: {e}")
try:
results['google_pay'] = await self.demonstrate_google_pay()
except Exception as e:
print(f"❌ Google Pay failed: {e}")
# Store evidence of all payments
if results:
evidence_cid = self.sdk.store_evidence({
"demo_type": "payment_methods",
"results": results,
"agent": self.sdk.agent_name,
"timestamp": "2024-01-01T00:00:00Z"
})
if evidence_cid:
print(f"\n📦 Payment evidence stored: {evidence_cid}")
print(f"\n🎉 Payment demonstrations completed!")
return results
# Run the payment agent
if __name__ == "__main__":
agent = PaymentAgent()
asyncio.run(agent.run_payment_demos())
Validation Agent Example
Quality Validation Service
validation_agent.py
Copy
import asyncio
import hashlib
import json
from datetime import datetime, timedelta
from chaoschain_sdk import ChaosChainAgentSDK
class QualityValidator:
def __init__(self):
self.sdk = ChaosChainAgentSDK(
agent_name="QualityValidator",
agent_domain="validator.example.com",
agent_role="validator",
network="base-sepolia"
)
# Validation criteria
self.criteria = {
"data_analysis": {
"required_fields": ["mean", "median", "std_dev", "count"],
"min_data_points": 5,
"max_age_hours": 24
},
"trend_analysis": {
"required_fields": ["trend_direction", "average_change", "volatility"],
"min_data_points": 3,
"max_age_hours": 12
}
}
def validate_data_analysis(self, analysis: dict) -> dict:
"""Validate data analysis quality."""
score = 100
issues = []
criteria = self.criteria["data_analysis"]
# Check required fields
for field in criteria["required_fields"]:
if field not in analysis:
score -= 20
issues.append(f"Missing required field: {field}")
# Check data point count
data_count = analysis.get("count", 0)
if data_count < criteria["min_data_points"]:
score -= 15
issues.append(f"Insufficient data points: {data_count} < {criteria['min_data_points']}")
# Check timestamp freshness
timestamp_str = analysis.get("timestamp")
if timestamp_str:
try:
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
age = datetime.now() - timestamp.replace(tzinfo=None)
if age > timedelta(hours=criteria["max_age_hours"]):
score -= 10
issues.append(f"Data too old: {age.total_seconds()/3600:.1f} hours")
except ValueError:
score -= 5
issues.append("Invalid timestamp format")
else:
score -= 10
issues.append("Missing timestamp")
# Check statistical validity
if "mean" in analysis and "std_dev" in analysis:
mean = analysis["mean"]
std_dev = analysis["std_dev"]
# Check for reasonable values
if std_dev < 0:
score -= 25
issues.append("Invalid standard deviation (negative)")
if "min" in analysis and "max" in analysis:
min_val = analysis["min"]
max_val = analysis["max"]
if min_val > mean or max_val < mean:
score -= 20
issues.append("Mean not between min and max values")
return {
"score": max(0, score),
"issues": issues,
"validation_type": "data_analysis",
"validator": self.sdk.agent_name,
"timestamp": datetime.now().isoformat()
}
def validate_trend_analysis(self, analysis: dict) -> dict:
"""Validate trend analysis quality."""
score = 100
issues = []
criteria = self.criteria["trend_analysis"]
# Check required fields
for field in criteria["required_fields"]:
if field not in analysis:
score -= 20
issues.append(f"Missing required field: {field}")
# Validate trend direction
trend_direction = analysis.get("trend_direction")
if trend_direction not in ["upward", "downward", "stable"]:
score -= 15
issues.append(f"Invalid trend direction: {trend_direction}")
# Check data consistency
avg_change = analysis.get("average_change", 0)
if trend_direction == "upward" and avg_change <= 0:
score -= 20
issues.append("Trend direction inconsistent with average change")
elif trend_direction == "downward" and avg_change >= 0:
score -= 20
issues.append("Trend direction inconsistent with average change")
# Check volatility reasonableness
volatility = analysis.get("volatility", 0)
if volatility < 0:
score -= 25
issues.append("Invalid volatility (negative)")
return {
"score": max(0, score),
"issues": issues,
"validation_type": "trend_analysis",
"validator": self.sdk.agent_name,
"timestamp": datetime.now().isoformat()
}
async def process_validation_request(self, requester_id: int, data_hash: str, analysis_data: dict, analysis_type: str) -> dict:
"""Process a validation request."""
print(f"🔍 Validating {analysis_type} from agent {requester_id}")
print(f"📄 Data hash: {data_hash}")
# Perform validation based on type
if analysis_type == "data_analysis":
validation_result = self.validate_data_analysis(analysis_data)
elif analysis_type == "trend_analysis":
validation_result = self.validate_trend_analysis(analysis_data)
else:
validation_result = {
"score": 0,
"issues": [f"Unknown analysis type: {analysis_type}"],
"validation_type": "unknown",
"validator": self.sdk.agent_name,
"timestamp": datetime.now().isoformat()
}
score = validation_result["score"]
print(f"📊 Validation Score: {score}/100")
if validation_result["issues"]:
print("⚠️ Issues found:")
for issue in validation_result["issues"]:
print(f" - {issue}")
else:
print("✅ No issues found")
# Submit validation response on-chain
try:
tx_hash = self.sdk.submit_validation_response(
data_hash=data_hash,
score=score
)
print(f"✅ Validation submitted on-chain: {tx_hash}")
validation_result["transaction_hash"] = tx_hash
except Exception as e:
print(f"❌ Failed to submit validation: {e}")
validation_result["submission_error"] = str(e)
# Store validation evidence
evidence_data = {
"validation_request": {
"requester_id": requester_id,
"data_hash": data_hash,
"analysis_type": analysis_type
},
"analysis_data": analysis_data,
"validation_result": validation_result
}
evidence_cid = self.sdk.store_evidence(evidence_data, "validation_service")
if evidence_cid:
print(f"📦 Validation evidence stored: {evidence_cid}")
validation_result["evidence_cid"] = evidence_cid
return validation_result
async def start_validator(self):
"""Start the validation service."""
# Register as validator
try:
agent_id, tx_hash = self.sdk.register_identity()
print(f"✅ Quality Validator registered with ID: {agent_id}")
except Exception as e:
print(f"⚠️ Registration: {e}")
print(f"🛡️ Quality Validator running at {self.sdk.wallet_address}")
print("📋 Validation Services:")
print(" - Data Analysis Validation")
print(" - Trend Analysis Validation")
# Demo validation requests
demo_requests = [
{
"requester_id": 123,
"analysis_type": "data_analysis",
"data": {
"count": 10,
"mean": 15.5,
"median": 15.0,
"std_dev": 2.3,
"min": 12.0,
"max": 19.0,
"timestamp": datetime.now().isoformat()
}
},
{
"requester_id": 456,
"analysis_type": "trend_analysis",
"data": {
"trend_direction": "upward",
"average_change": 1.2,
"volatility": 0.8,
"data_points": 8,
"timestamp": datetime.now().isoformat()
}
}
]
# Process demo requests
for request in demo_requests:
# Generate data hash
data_str = json.dumps(request["data"], sort_keys=True)
data_hash = "0x" + hashlib.sha256(data_str.encode()).hexdigest()
result = await self.process_validation_request(
request["requester_id"],
data_hash,
request["data"],
request["analysis_type"]
)
print(f"\n🎯 Validation completed for {request['analysis_type']}")
print(f" Score: {result['score']}/100")
print(f" Issues: {len(result['issues'])}")
print("-" * 50)
# Run the validator
if __name__ == "__main__":
validator = QualityValidator()
asyncio.run(validator.start_validator())
Error Handling Examples
Robust Agent Implementation
robust_agent.py
Copy
import asyncio
import logging
from chaoschain_sdk import ChaosChainAgentSDK
from chaoschain_sdk.exceptions import (
ChaosChainSDKError,
AgentRegistrationError,
PaymentError,
IntegrityVerificationError,
ConfigurationError
)
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class RobustAgent:
def __init__(self):
try:
self.sdk = ChaosChainAgentSDK(
agent_name="RobustAgent",
agent_domain="robust.example.com",
agent_role="server",
network="base-sepolia"
)
logger.info("✅ SDK initialized successfully")
except ConfigurationError as e:
logger.error(f"❌ Configuration error: {e}")
raise
except Exception as e:
logger.error(f"❌ Unexpected initialization error: {e}")
raise
async def safe_register(self, max_retries: int = 3) -> bool:
"""Safely register agent with retries and proper error handling."""
for attempt in range(max_retries):
try:
logger.info(f"🔄 Registration attempt {attempt + 1}/{max_retries}")
agent_id, tx_hash = self.sdk.register_identity()
logger.info(f"✅ Successfully registered with ID: {agent_id}")
logger.info(f"📄 Transaction hash: {tx_hash}")
return True
except AgentRegistrationError as e:
error_msg = str(e).lower()
if "insufficient funds" in error_msg:
logger.error("💰 Insufficient funds for registration")
logger.info("💡 Solutions:")
logger.info(" 1. Get testnet ETH from: https://www.coinbase.com/faucets/base-ethereum-sepolia-faucet")
logger.info(f" 2. Send ETH to: {self.sdk.wallet_address}")
return False
elif "already registered" in error_msg:
logger.warning("⚠️ Agent already registered")
return True
else:
logger.warning(f"⚠️ Registration failed: {e}")
except Exception as e:
logger.error(f"❌ Unexpected registration error: {e}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
logger.info(f"⏳ Waiting {wait_time} seconds before retry...")
await asyncio.sleep(wait_time)
logger.error("❌ Registration failed after all retries")
return False
async def safe_execute_service(self, service_name: str, data: dict) -> dict:
"""Safely execute service with comprehensive error handling."""
try:
logger.info(f"🚀 Executing service: {service_name}")
# Execute with integrity proof
result, proof = await self.sdk.execute_with_integrity_proof(
service_name,
data
)
logger.info(f"✅ Service executed successfully")
logger.info(f"🔐 Proof ID: {proof.proof_id}")
return {
"status": "success",
"result": result,
"proof": {
"proof_id": proof.proof_id,
"code_hash": proof.code_hash,
"ipfs_cid": proof.ipfs_cid
}
}
except IntegrityVerificationError as e:
logger.error(f"🔐 Integrity verification failed: {e}")
return {
"status": "integrity_error",
"error": str(e),
"error_type": "integrity_verification"
}
except KeyError as e:
logger.error(f"🔑 Missing required data field: {e}")
return {
"status": "data_error",
"error": f"Missing required field: {e}",
"error_type": "missing_data"
}
except ValueError as e:
logger.error(f"📊 Invalid data value: {e}")
return {
"status": "validation_error",
"error": str(e),
"error_type": "invalid_data"
}
except Exception as e:
logger.error(f"❌ Unexpected service error: {e}")
return {
"status": "unexpected_error",
"error": str(e),
"error_type": "unknown"
}
async def safe_process_payment(self, amount: float, currency: str, client_agent: str) -> dict:
"""Safely process payment with error handling."""
try:
logger.info(f"💰 Processing payment: {amount} {currency}")
# Create payment request
payment_request = self.sdk.create_x402_payment_request(
cart_id=f"service_{int(asyncio.get_event_loop().time())}",
total_amount=amount,
currency=currency,
items=[{"name": "Service Payment", "price": amount}]
)
# Execute payment
payment_result = self.sdk.execute_x402_crypto_payment(
payment_request=payment_request,
payer_agent=client_agent,
service_description="Robust Agent Service"
)
logger.info(f"✅ Payment processed successfully")
logger.info(f"📄 Transaction: {payment_result['transaction_hash']}")
return {
"status": "success",
"payment_result": payment_result
}
except PaymentError as e:
error_msg = str(e).lower()
if "insufficient balance" in error_msg:
logger.error("💳 Insufficient balance for payment")
return {
"status": "insufficient_balance",
"error": str(e),
"suggestion": "Add more USDC to wallet"
}
elif "invalid payment data" in error_msg:
logger.error("📝 Invalid payment data")
return {
"status": "invalid_payment_data",
"error": str(e),
"suggestion": "Check payment request format"
}
else:
logger.error(f"💳 Payment processing failed: {e}")
return {
"status": "payment_error",
"error": str(e)
}
except Exception as e:
logger.error(f"❌ Unexpected payment error: {e}")
return {
"status": "unexpected_payment_error",
"error": str(e)
}
async def health_check(self) -> dict:
"""Perform comprehensive health check."""
logger.info("🏥 Performing health check...")
health = {
"agent_name": self.sdk.agent_name,
"wallet_address": self.sdk.wallet_address,
"timestamp": asyncio.get_event_loop().time(),
"checks": {}
}
# Check SDK status
try:
status = self.sdk.get_sdk_status()
health["checks"]["sdk_status"] = {
"status": "healthy",
"features": status.get("features", {}),
"payment_methods": len(status.get("payment_methods", []))
}
except Exception as e:
health["checks"]["sdk_status"] = {
"status": "error",
"error": str(e)
}
# Check network connectivity
try:
network_info = self.sdk.network_info
health["checks"]["network"] = {
"status": "connected" if network_info.get("connected") else "disconnected",
"chain_id": network_info.get("chain_id"),
"network": network_info.get("network")
}
except Exception as e:
health["checks"]["network"] = {
"status": "error",
"error": str(e)
}
# Check registration status
try:
is_registered = self.sdk.is_registered
agent_id = self.sdk.get_agent_id()
health["checks"]["registration"] = {
"status": "registered" if is_registered else "not_registered",
"agent_id": agent_id
}
except Exception as e:
health["checks"]["registration"] = {
"status": "error",
"error": str(e)
}
# Overall health
all_healthy = all(
check.get("status") in ["healthy", "connected", "registered"]
for check in health["checks"].values()
)
health["overall_status"] = "healthy" if all_healthy else "degraded"
logger.info(f"🏥 Health check completed: {health['overall_status']}")
return health
async def run_demo(self):
"""Run a complete demo with error handling."""
logger.info("🚀 Starting Robust Agent Demo")
# Health check
health = await self.health_check()
if health["overall_status"] != "healthy":
logger.warning("⚠️ Health check shows issues, continuing anyway...")
# Safe registration
registered = await self.safe_register()
if not registered:
logger.error("❌ Could not register agent, stopping demo")
return
# Register a demo service
@self.sdk.process_integrity.register_function
async def demo_service(data: dict) -> dict:
if "values" not in data:
raise ValueError("Missing 'values' field in data")
values = data["values"]
if not isinstance(values, list) or len(values) == 0:
raise ValueError("'values' must be a non-empty list")
return {
"sum": sum(values),
"count": len(values),
"average": sum(values) / len(values),
"processed": True
}
# Test service execution
test_cases = [
{"values": [1, 2, 3, 4, 5]}, # Valid data
{"invalid": "data"}, # Missing required field
{"values": []}, # Empty values
{"values": "not_a_list"} # Invalid data type
]
for i, test_data in enumerate(test_cases):
logger.info(f"\n📋 Test case {i + 1}: {test_data}")
result = await self.safe_execute_service("demo_service", test_data)
logger.info(f" Result: {result['status']}")
if result["status"] == "success":
logger.info(f" Output: {result['result']}")
else:
logger.info(f" Error: {result['error']}")
# Test payment processing
logger.info(f"\n💰 Testing payment processing...")
payment_result = await self.safe_process_payment(2.0, "USDC", "DemoClient")
logger.info(f" Payment status: {payment_result['status']}")
logger.info("\n🎉 Robust Agent Demo completed!")
# Run the robust agent
if __name__ == "__main__":
agent = RobustAgent()
asyncio.run(agent.run_demo())