Skip to main content

Overview

This guide covers everything you need to build production-ready autonomous agents on ChaosChain. From basic setup to advanced patterns, learn how to create agents that can interact, transact, and validate each other’s work.
Zero Setup Required: All ERC-8004 contracts are pre-deployed. Focus on building your agent logic, not infrastructure!

Agent Architecture Patterns

Server Agent Pattern

Server agents provide services to other agents and handle incoming requests.
server_agent.py
import asyncio
import os
from chaoschain_sdk import ChaosChainAgentSDK

class DataAnalysisAgent:
    def __init__(self):
        # Initialize as server agent
        self.sdk = ChaosChainAgentSDK(
            agent_name="DataAnalysisAgent",
            agent_domain="analysis.mycompany.com",
            agent_role="server",
            network="base-sepolia"
        )
        
        # Register analysis functions
        self._register_services()
    
    def _register_services(self):
        """Register all available services."""
        
        @self.sdk.process_integrity.register_function
        async def analyze_market_data(self, data: dict) -> dict:
            """Analyze market data with integrity proof."""
            # Your analysis logic here
            trends = self._calculate_trends(data.get('prices', []))
            predictions = self._generate_predictions(trends)
            
            return {
                "trends": trends,
                "predictions": predictions,
                "confidence": 0.85,
                "timestamp": datetime.now().isoformat(),
                "data_points": len(data.get('prices', []))
            }
        
        @self.sdk.process_integrity.register_function
        async def risk_assessment(self, portfolio: dict) -> dict:
            """Assess portfolio risk."""
            risk_score = self._calculate_risk(portfolio)
            recommendations = self._generate_recommendations(risk_score)
            
            return {
                "risk_score": risk_score,
                "risk_level": self._get_risk_level(risk_score),
                "recommendations": recommendations,
                "portfolio_value": portfolio.get('total_value', 0)
            }
    
    async def start_server(self):
        """Start the agent server."""
        # Register on ERC-8004
        try:
            agent_id, tx_hash = self.sdk.register_identity()
            print(f"✅ Server registered with ID: {agent_id}")
        except Exception as e:
            print(f"⚠️  Registration: {e}")
        
        # Start listening for requests
        print(f"🚀 Data Analysis Agent running on {self.sdk.wallet_address}")
        print(f"📋 Available services: analyze_market_data, risk_assessment")
        
        # In a real implementation, you'd listen for incoming requests
        # This is a simplified example
        while True:
            await asyncio.sleep(10)
            print("💓 Agent heartbeat - ready for requests")

# Run the server agent
if __name__ == "__main__":
    agent = DataAnalysisAgent()
    asyncio.run(agent.start_server())

Client Agent Pattern

Client agents consume services from server agents and handle payments.
client_agent.py
import asyncio
from chaoschain_sdk import ChaosChainAgentSDK

class TradingClientAgent:
    def __init__(self):
        # Initialize as client agent
        self.sdk = ChaosChainAgentSDK(
            agent_name="TradingClient",
            agent_domain="trading.mycompany.com", 
            agent_role="client",
            network="base-sepolia"
        )
    
    async def request_analysis(self, market_data: dict, server_agent_address: str):
        """Request analysis from a server agent."""
        
        # Create payment request for the service
        payment_request = self.sdk.create_x402_payment_request(
            cart_id=f"analysis_{int(time.time())}",
            total_amount=5.0,
            currency="USDC",
            items=[{"name": "Market Data Analysis", "price": 5.0}]
        )
        
        # In a real implementation, you'd send this to the server agent
        # For this example, we'll simulate the analysis request
        print(f"📤 Requesting analysis from {server_agent_address}")
        print(f"💰 Payment prepared: {payment_request['id']}")
        
        # Simulate receiving analysis result
        # In reality, this would come from the server agent
        analysis_result = {
            "trends": ["upward", "volatile"],
            "predictions": {"next_week": "bullish", "confidence": 0.85},
            "timestamp": "2024-01-01T00:00:00Z"
        }
        
        # Execute payment for the service
        crypto_result = self.sdk.execute_x402_crypto_payment(
            payment_request=payment_request,
            payer_agent="TradingClient",
            service_description="Market Data Analysis"
        )
        
        print(f"✅ Analysis received and paid: {crypto_result['transaction_hash']}")
        
        # Store evidence of the transaction
        evidence_cid = self.sdk.store_evidence({
            "service_request": "market_data_analysis",
            "analysis_result": analysis_result,
            "payment_proof": crypto_result,
            "server_agent": server_agent_address
        })
        
        if evidence_cid:
            print(f"📦 Evidence stored: {evidence_cid}")
        
        return analysis_result
    
    async def run_trading_cycle(self):
        """Run a complete trading cycle."""
        # Register as client
        try:
            agent_id, tx_hash = self.sdk.register_identity()
            print(f"✅ Client registered with ID: {agent_id}")
        except Exception as e:
            print(f"⚠️  Registration: {e}")
        
        # Simulate market data
        market_data = {
            "prices": [100, 102, 98, 105, 103],
            "volume": [1000, 1200, 800, 1500, 1100],
            "timestamp": "2024-01-01T00:00:00Z"
        }
        
        # Request analysis from server agent
        server_address = "0x742d35Cc6634C0532925a3b8D4Cc6634C0532925"  # Example
        analysis = await self.request_analysis(market_data, server_address)
        
        print(f"📊 Analysis complete: {analysis}")
        return analysis

# Run the client agent
if __name__ == "__main__":
    client = TradingClientAgent()
    asyncio.run(client.run_trading_cycle())

Validator Agent Pattern

Validator agents validate work from other agents and build reputation.
validator_agent.py
import hashlib
import json
from chaoschain_sdk import ChaosChainAgentSDK

class ValidationAgent:
    def __init__(self):
        # Initialize as validator agent
        self.sdk = ChaosChainAgentSDK(
            agent_name="ValidationAgent",
            agent_domain="validator.mycompany.com",
            agent_role="validator", 
            network="base-sepolia"
        )
        
        # Validation criteria
        self.validation_criteria = {
            "min_confidence": 0.7,
            "required_fields": ["trends", "predictions", "timestamp"],
            "max_age_hours": 24
        }
    
    def validate_analysis(self, analysis_data: dict) -> dict:
        """Validate analysis data quality."""
        score = 100
        issues = []
        
        # Check required fields
        for field in self.validation_criteria["required_fields"]:
            if field not in analysis_data:
                score -= 20
                issues.append(f"Missing required field: {field}")
        
        # Check confidence level
        confidence = analysis_data.get("confidence", 0)
        if confidence < self.validation_criteria["min_confidence"]:
            score -= 15
            issues.append(f"Low confidence: {confidence}")
        
        # Check data freshness
        timestamp = analysis_data.get("timestamp")
        if timestamp:
            # In a real implementation, check if data is recent
            pass
        else:
            score -= 10
            issues.append("Missing timestamp")
        
        # Check prediction quality
        predictions = analysis_data.get("predictions", {})
        if not predictions:
            score -= 25
            issues.append("No predictions provided")
        
        return {
            "score": max(0, score),
            "issues": issues,
            "validation_timestamp": datetime.now().isoformat(),
            "validator_id": self.sdk.get_agent_id()
        }
    
    async def process_validation_request(self, requester_id: int, data_hash: str, data: dict):
        """Process a validation request."""
        print(f"🔍 Validating request from agent {requester_id}")
        
        # Validate the data
        validation_result = self.validate_analysis(data)
        score = validation_result["score"]
        
        print(f"📊 Validation score: {score}/100")
        if validation_result["issues"]:
            print(f"⚠️  Issues found: {validation_result['issues']}")
        
        # Submit validation response on-chain
        try:
            tx_hash = self.sdk.submit_validation_response(
                data_hash=data_hash,
                score=score
            )
            print(f"✅ Validation submitted: {tx_hash}")
            
            # Store validation evidence
            evidence_cid = self.sdk.store_evidence({
                "validation_type": "analysis_quality",
                "requester_agent_id": requester_id,
                "data_hash": data_hash,
                "validation_result": validation_result,
                "transaction_hash": tx_hash
            })
            
            if evidence_cid:
                print(f"📦 Validation evidence stored: {evidence_cid}")
                
        except Exception as e:
            print(f"❌ Validation submission failed: {e}")
        
        return validation_result
    
    async def start_validator(self):
        """Start the validator agent."""
        # Register as validator
        try:
            agent_id, tx_hash = self.sdk.register_identity()
            print(f"✅ Validator registered with ID: {agent_id}")
        except Exception as e:
            print(f"⚠️  Registration: {e}")
        
        print(f"🛡️  Validation Agent ready at {self.sdk.wallet_address}")
        print(f"📋 Validation criteria: {self.validation_criteria}")
        
        # In a real implementation, listen for validation requests
        # This is a simplified example with mock data
        mock_data = {
            "trends": ["upward", "volatile"],
            "predictions": {"next_week": "bullish", "confidence": 0.85},
            "timestamp": "2024-01-01T00:00:00Z",
            "confidence": 0.85
        }
        
        data_hash = "0x" + hashlib.sha256(json.dumps(mock_data).encode()).hexdigest()
        await self.process_validation_request(123, data_hash, mock_data)

# Run the validator agent
if __name__ == "__main__":
    validator = ValidationAgent()
    asyncio.run(validator.start_validator())

Advanced Patterns

Multi-Service Agent

An agent that provides multiple services with different pricing.
multi_service_agent.py
from dataclasses import dataclass
from typing import Dict, List
from chaoschain_sdk import ChaosChainAgentSDK

@dataclass
class ServiceConfig:
    name: str
    price: float
    currency: str
    description: str

class MultiServiceAgent:
    def __init__(self):
        self.sdk = ChaosChainAgentSDK(
            agent_name="MultiServiceAgent",
            agent_domain="services.mycompany.com",
            agent_role="server",
            network="base-sepolia"
        )
        
        # Define available services
        self.services = {
            "basic_analysis": ServiceConfig(
                name="Basic Market Analysis",
                price=2.0,
                currency="USDC",
                description="Basic trend analysis and predictions"
            ),
            "advanced_analysis": ServiceConfig(
                name="Advanced Market Analysis", 
                price=5.0,
                currency="USDC",
                description="Deep analysis with ML predictions"
            ),
            "risk_assessment": ServiceConfig(
                name="Portfolio Risk Assessment",
                price=3.0,
                currency="USDC", 
                description="Comprehensive risk analysis"
            ),
            "custom_report": ServiceConfig(
                name="Custom Analysis Report",
                price=10.0,
                currency="USDC",
                description="Tailored analysis report"
            )
        }
        
        self._register_all_services()
    
    def _register_all_services(self):
        """Register all available services."""
        
        @self.sdk.process_integrity.register_function
        async def basic_analysis(self, data: dict) -> dict:
            """Basic market analysis service."""
            return {
                "service": "basic_analysis",
                "trends": self._basic_trend_analysis(data),
                "price_target": self._calculate_price_target(data),
                "confidence": 0.75
            }
        
        @self.sdk.process_integrity.register_function
        async def advanced_analysis(self, data: dict) -> dict:
            """Advanced market analysis with ML."""
            return {
                "service": "advanced_analysis",
                "trends": self._advanced_trend_analysis(data),
                "ml_predictions": self._ml_predictions(data),
                "sentiment": self._sentiment_analysis(data),
                "confidence": 0.90
            }
        
        @self.sdk.process_integrity.register_function
        async def risk_assessment(self, portfolio: dict) -> dict:
            """Portfolio risk assessment service."""
            return {
                "service": "risk_assessment",
                "var_95": self._calculate_var(portfolio, 0.95),
                "sharpe_ratio": self._calculate_sharpe(portfolio),
                "risk_factors": self._identify_risk_factors(portfolio),
                "recommendations": self._risk_recommendations(portfolio)
            }
    
    def get_service_catalog(self) -> Dict[str, ServiceConfig]:
        """Get catalog of available services."""
        return self.services
    
    async def execute_service(self, service_name: str, data: dict, client_agent: str) -> dict:
        """Execute a requested service with payment."""
        
        if service_name not in self.services:
            raise ValueError(f"Unknown service: {service_name}")
        
        service_config = self.services[service_name]
        
        # Create payment request
        payment_request = self.sdk.create_x402_payment_request(
            cart_id=f"{service_name}_{int(time.time())}",
            total_amount=service_config.price,
            currency=service_config.currency,
            items=[{
                "name": service_config.name,
                "price": service_config.price
            }]
        )
        
        # Execute service with integrity proof
        result, proof = await self.sdk.execute_with_integrity_proof(
            service_name,
            data
        )
        
        # Process payment
        payment_result = self.sdk.execute_x402_crypto_payment(
            payment_request=payment_request,
            payer_agent=client_agent,
            service_description=service_config.description
        )
        
        # Create comprehensive response
        response = {
            "service_result": result,
            "integrity_proof": {
                "proof_id": proof.proof_id,
                "code_hash": proof.code_hash,
                "ipfs_cid": proof.ipfs_cid
            },
            "payment_proof": payment_result,
            "service_config": {
                "name": service_config.name,
                "price": service_config.price,
                "currency": service_config.currency
            }
        }
        
        # Store evidence
        evidence_cid = self.sdk.store_evidence(response, "service_execution")
        if evidence_cid:
            response["evidence_cid"] = evidence_cid
        
        return response

# Usage example
async def main():
    agent = MultiServiceAgent()
    
    # Register agent
    agent_id, tx_hash = agent.sdk.register_identity()
    print(f"✅ Multi-service agent registered: {agent_id}")
    
    # Show service catalog
    catalog = agent.get_service_catalog()
    print("\n📋 Available Services:")
    for service_id, config in catalog.items():
        print(f"  {service_id}: {config.name} - ${config.price} {config.currency}")
    
    # Execute a service (simulated)
    mock_data = {"prices": [100, 102, 98, 105]}
    result = await agent.execute_service("basic_analysis", mock_data, "ClientAgent")
    print(f"\n✅ Service executed: {result['service_result']}")

if __name__ == "__main__":
    asyncio.run(main())

Agent Coordination Pattern

Agents that work together to complete complex tasks.
agent_coordination.py
import asyncio
from typing import List, Dict
from chaoschain_sdk import ChaosChainAgentSDK

class CoordinatorAgent:
    """Coordinates multiple agents to complete complex tasks."""
    
    def __init__(self):
        self.sdk = ChaosChainAgentSDK(
            agent_name="CoordinatorAgent",
            agent_domain="coordinator.mycompany.com",
            agent_role="server",
            network="base-sepolia"
        )
        
        # Registry of available agents
        self.agent_registry = {
            "data_collector": "0x742d35Cc6634C0532925a3b8D4Cc6634C0532925",
            "analyzer": "0x8901234567890123456789012345678901234567",
            "validator": "0x9012345678901234567890123456789012345678",
            "reporter": "0x0123456789012345678901234567890123456789"
        }
    
    async def execute_complex_analysis(self, request: dict) -> dict:
        """Execute a complex multi-agent analysis workflow."""
        
        workflow_id = f"workflow_{int(time.time())}"
        results = {}
        
        print(f"🚀 Starting complex analysis workflow: {workflow_id}")
        
        # Step 1: Data Collection
        print("📊 Step 1: Collecting data...")
        data_request = {
            "sources": request.get("data_sources", ["market", "news"]),
            "timeframe": request.get("timeframe", "1d"),
            "symbols": request.get("symbols", ["BTC", "ETH"])
        }
        
        # In a real implementation, send request to data collector agent
        collected_data = await self._simulate_agent_call(
            "data_collector", 
            "collect_data", 
            data_request
        )
        results["data_collection"] = collected_data
        
        # Step 2: Analysis
        print("🔍 Step 2: Analyzing data...")
        analysis_request = {
            "data": collected_data,
            "analysis_type": request.get("analysis_type", "comprehensive")
        }
        
        analysis_result = await self._simulate_agent_call(
            "analyzer",
            "analyze_data",
            analysis_request
        )
        results["analysis"] = analysis_result
        
        # Step 3: Validation
        print("✅ Step 3: Validating results...")
        validation_request = {
            "analysis": analysis_result,
            "data": collected_data
        }
        
        validation_result = await self._simulate_agent_call(
            "validator",
            "validate_analysis", 
            validation_request
        )
        results["validation"] = validation_result
        
        # Step 4: Report Generation (if validation passes)
        if validation_result.get("score", 0) >= 80:
            print("📄 Step 4: Generating report...")
            report_request = {
                "analysis": analysis_result,
                "validation": validation_result,
                "format": request.get("report_format", "detailed")
            }
            
            report_result = await self._simulate_agent_call(
                "reporter",
                "generate_report",
                report_request
            )
            results["report"] = report_result
        else:
            print("⚠️  Validation failed, skipping report generation")
            results["report"] = {"status": "skipped", "reason": "validation_failed"}
        
        # Create final workflow result
        workflow_result = {
            "workflow_id": workflow_id,
            "status": "completed",
            "results": results,
            "agents_used": list(self.agent_registry.keys()),
            "total_steps": 4,
            "completion_time": datetime.now().isoformat()
        }
        
        # Store workflow evidence
        evidence_cid = self.sdk.store_evidence(
            workflow_result, 
            "complex_workflow"
        )
        
        if evidence_cid:
            workflow_result["evidence_cid"] = evidence_cid
        
        print(f"🎉 Workflow completed: {workflow_id}")
        return workflow_result
    
    async def _simulate_agent_call(self, agent_type: str, method: str, data: dict) -> dict:
        """Simulate calling another agent (in real implementation, use actual network calls)."""
        
        agent_address = self.agent_registry.get(agent_type)
        if not agent_address:
            raise ValueError(f"Unknown agent type: {agent_type}")
        
        print(f"  📞 Calling {agent_type} at {agent_address[:10]}...")
        
        # Simulate processing time
        await asyncio.sleep(1)
        
        # Return mock results based on agent type
        if agent_type == "data_collector":
            return {
                "data": {"prices": [100, 102, 98, 105], "volume": [1000, 1200, 800, 1500]},
                "sources": data.get("sources", []),
                "timestamp": datetime.now().isoformat()
            }
        elif agent_type == "analyzer":
            return {
                "trends": ["upward", "volatile"],
                "predictions": {"next_week": "bullish", "confidence": 0.85},
                "indicators": {"rsi": 65, "macd": "bullish"}
            }
        elif agent_type == "validator":
            return {
                "score": 88,
                "validation_passed": True,
                "checks": ["data_quality", "analysis_logic", "predictions"]
            }
        elif agent_type == "reporter":
            return {
                "report_id": f"report_{int(time.time())}",
                "format": "detailed",
                "sections": ["executive_summary", "analysis", "recommendations"],
                "page_count": 12
            }
        
        return {"status": "unknown_agent_type"}

# Usage example
async def main():
    coordinator = CoordinatorAgent()
    
    # Register coordinator
    agent_id, tx_hash = coordinator.sdk.register_identity()
    print(f"✅ Coordinator registered: {agent_id}")
    
    # Execute complex workflow
    request = {
        "data_sources": ["market", "news", "social"],
        "symbols": ["BTC", "ETH", "ADA"],
        "analysis_type": "comprehensive",
        "report_format": "detailed"
    }
    
    result = await coordinator.execute_complex_analysis(request)
    print(f"\n🎯 Final Result: {result['workflow_id']}")
    print(f"📊 Status: {result['status']}")
    print(f"📦 Evidence: {result.get('evidence_cid', 'Not stored')}")

if __name__ == "__main__":
    asyncio.run(main())

Best Practices

Error Handling and Resilience

error_handling.py
import asyncio
import logging
from chaoschain_sdk import ChaosChainAgentSDK
from chaoschain_sdk.exceptions import (
    AgentRegistrationError,
    PaymentError,
    IntegrityVerificationError
)

class ResilientAgent:
    def __init__(self):
        self.sdk = ChaosChainAgentSDK(
            agent_name="ResilientAgent",
            agent_domain="resilient.mycompany.com",
            agent_role="server",
            network="base-sepolia"
        )
        
        # Setup logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        
        # Retry configuration
        self.max_retries = 3
        self.retry_delay = 5  # seconds
    
    async def safe_register(self) -> bool:
        """Safely register agent with retries."""
        for attempt in range(self.max_retries):
            try:
                agent_id, tx_hash = self.sdk.register_identity()
                self.logger.info(f"✅ Registered successfully: {agent_id}")
                return True
                
            except AgentRegistrationError as e:
                if "insufficient funds" in str(e).lower():
                    self.logger.error("💰 Insufficient funds for registration")
                    self.logger.info("💡 Get testnet ETH from faucet:")
                    self.logger.info("   https://www.coinbase.com/faucets/base-ethereum-sepolia-faucet")
                    return False
                else:
                    self.logger.warning(f"⚠️  Registration attempt {attempt + 1} failed: {e}")
                    
            except Exception as e:
                self.logger.error(f"❌ Unexpected error: {e}")
            
            if attempt < self.max_retries - 1:
                self.logger.info(f"🔄 Retrying in {self.retry_delay} seconds...")
                await asyncio.sleep(self.retry_delay)
        
        self.logger.error("❌ Registration failed after all retries")
        return False
    
    async def safe_execute_with_payment(self, service_data: dict, client_agent: str) -> dict:
        """Safely execute service with payment handling."""
        try:
            # Execute service with integrity proof
            result, proof = await self.sdk.execute_with_integrity_proof(
                "analyze_data",
                service_data
            )
            
            # Create payment request
            payment_request = self.sdk.create_x402_payment_request(
                cart_id=f"service_{int(time.time())}",
                total_amount=5.0,
                currency="USDC",
                items=[{"name": "Data Analysis", "price": 5.0}]
            )
            
            # Process payment with error handling
            try:
                payment_result = self.sdk.execute_x402_crypto_payment(
                    payment_request=payment_request,
                    payer_agent=client_agent,
                    service_description="Data Analysis Service"
                )
                
                return {
                    "status": "success",
                    "result": result,
                    "proof": proof,
                    "payment": payment_result
                }
                
            except PaymentError as e:
                self.logger.error(f"💳 Payment failed: {e}")
                # Still return the result, but mark payment as failed
                return {
                    "status": "partial_success",
                    "result": result,
                    "proof": proof,
                    "payment_error": str(e)
                }
                
        except IntegrityVerificationError as e:
            self.logger.error(f"🔐 Integrity verification failed: {e}")
            return {
                "status": "integrity_error",
                "error": str(e)
            }
            
        except Exception as e:
            self.logger.error(f"❌ Service execution failed: {e}")
            return {
                "status": "error",
                "error": str(e)
            }
    
    async def health_check(self) -> dict:
        """Perform agent health check."""
        health = {
            "agent_name": self.sdk.agent_name,
            "wallet_address": self.sdk.wallet_address,
            "network": self.sdk.network.value,
            "registered": self.sdk.is_registered,
            "features": {},
            "timestamp": datetime.now().isoformat()
        }
        
        # Check feature availability
        try:
            status = self.sdk.get_sdk_status()
            health["features"] = status.get("features", {})
            health["payment_methods"] = len(status.get("payment_methods", []))
        except Exception as e:
            health["status_error"] = str(e)
        
        # Check network connectivity
        try:
            network_info = self.sdk.network_info
            health["network_connected"] = network_info.get("connected", False)
            health["chain_id"] = network_info.get("chain_id")
        except Exception as e:
            health["network_error"] = str(e)
        
        return health

# Usage example
async def main():
    agent = ResilientAgent()
    
    # Perform health check
    health = await agent.health_check()
    print(f"🏥 Health Check: {health}")
    
    # Safe registration
    registered = await agent.safe_register()
    if not registered:
        print("❌ Could not register agent")
        return
    
    # Safe service execution
    service_data = {"data": [1, 2, 3, 4, 5]}
    result = await agent.safe_execute_with_payment(service_data, "ClientAgent")
    print(f"🎯 Service Result: {result['status']}")

if __name__ == "__main__":
    asyncio.run(main())

Performance Optimization

performance_optimization.py
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from chaoschain_sdk import ChaosChainAgentSDK

class OptimizedAgent:
    def __init__(self):
        self.sdk = ChaosChainAgentSDK(
            agent_name="OptimizedAgent",
            agent_domain="optimized.mycompany.com",
            agent_role="server",
            network="base-sepolia"
        )
        
        # Thread pool for CPU-intensive tasks
        self.executor = ThreadPoolExecutor(max_workers=4)
        
        # Cache for frequently accessed data
        self.cache = {}
        self.cache_ttl = 300  # 5 minutes
    
    def _cpu_intensive_analysis(self, data: list) -> dict:
        """CPU-intensive analysis that runs in thread pool."""
        # Simulate heavy computation
        time.sleep(2)  # In real code, this would be actual computation
        
        return {
            "mean": sum(data) / len(data),
            "max": max(data),
            "min": min(data),
            "variance": sum((x - sum(data)/len(data))**2 for x in data) / len(data)
        }
    
    async def optimized_analysis(self, data: dict) -> dict:
        """Optimized analysis with caching and async processing."""
        
        # Create cache key
        cache_key = f"analysis_{hash(str(data))}"
        
        # Check cache first
        if cache_key in self.cache:
            cache_entry = self.cache[cache_key]
            if time.time() - cache_entry["timestamp"] < self.cache_ttl:
                print("📋 Returning cached result")
                return cache_entry["result"]
        
        # Run CPU-intensive work in thread pool
        loop = asyncio.get_event_loop()
        cpu_result = await loop.run_in_executor(
            self.executor,
            self._cpu_intensive_analysis,
            data.get("values", [])
        )
        
        # Combine with other analysis
        result = {
            "statistics": cpu_result,
            "trend": "upward" if cpu_result["mean"] > 50 else "downward",
            "volatility": cpu_result["variance"],
            "timestamp": time.time()
        }
        
        # Cache the result
        self.cache[cache_key] = {
            "result": result,
            "timestamp": time.time()
        }
        
        return result
    
    async def batch_process(self, requests: list) -> list:
        """Process multiple requests concurrently."""
        
        async def process_single_request(request):
            try:
                result = await self.optimized_analysis(request)
                return {"status": "success", "result": result}
            except Exception as e:
                return {"status": "error", "error": str(e)}
        
        # Process all requests concurrently
        tasks = [process_single_request(req) for req in requests]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return results
    
    def cleanup_cache(self):
        """Clean up expired cache entries."""
        current_time = time.time()
        expired_keys = [
            key for key, entry in self.cache.items()
            if current_time - entry["timestamp"] > self.cache_ttl
        ]
        
        for key in expired_keys:
            del self.cache[key]
        
        print(f"🧹 Cleaned up {len(expired_keys)} expired cache entries")

# Usage example
async def main():
    agent = OptimizedAgent()
    
    # Register agent
    agent_id, tx_hash = agent.sdk.register_identity()
    print(f"✅ Optimized agent registered: {agent_id}")
    
    # Test single analysis
    start_time = time.time()
    result1 = await agent.optimized_analysis({"values": list(range(100))})
    print(f"⏱️  First analysis: {time.time() - start_time:.2f}s")
    
    # Test cached analysis (should be faster)
    start_time = time.time()
    result2 = await agent.optimized_analysis({"values": list(range(100))})
    print(f"⏱️  Cached analysis: {time.time() - start_time:.2f}s")
    
    # Test batch processing
    requests = [
        {"values": list(range(i, i+50))} 
        for i in range(0, 200, 50)
    ]
    
    start_time = time.time()
    batch_results = await agent.batch_process(requests)
    print(f"⏱️  Batch processing ({len(requests)} requests): {time.time() - start_time:.2f}s")
    print(f"✅ Successful: {sum(1 for r in batch_results if r['status'] == 'success')}")
    
    # Cleanup
    agent.cleanup_cache()

if __name__ == "__main__":
    asyncio.run(main())

Testing Strategies

Unit Testing

test_agent.py
import unittest
import asyncio
from unittest.mock import Mock, patch
from chaoschain_sdk import ChaosChainAgentSDK

class TestAgent(unittest.TestCase):
    def setUp(self):
        """Set up test fixtures."""
        self.sdk = ChaosChainAgentSDK(
            agent_name="TestAgent",
            agent_domain="test.example.com",
            agent_role="server",
            network="base-sepolia"
        )
    
    def test_agent_initialization(self):
        """Test agent initialization."""
        self.assertEqual(self.sdk.agent_name, "TestAgent")
        self.assertEqual(self.sdk.agent_domain, "test.example.com")
        self.assertEqual(self.sdk.agent_role.value, "server")
    
    @patch('chaoschain_sdk.ChaosChainAgentSDK.register_identity')
    def test_registration_success(self, mock_register):
        """Test successful agent registration."""
        mock_register.return_value = (123, "0xabc123")
        
        agent_id, tx_hash = self.sdk.register_identity()
        
        self.assertEqual(agent_id, 123)
        self.assertEqual(tx_hash, "0xabc123")
        mock_register.assert_called_once()
    
    def test_payment_methods(self):
        """Test payment methods availability."""
        methods = self.sdk.get_supported_payment_methods()
        
        self.assertIsInstance(methods, list)
        self.assertIn("https://a2a.org/x402", methods)
        self.assertIn("basic-card", methods)
    
    async def test_integrity_proof_execution(self):
        """Test process integrity execution."""
        
        @self.sdk.process_integrity.register_function
        async def test_function(data: dict) -> dict:
            return {"processed": True, "input_count": len(data)}
        
        result, proof = await self.sdk.execute_with_integrity_proof(
            "test_function",
            {"items": [1, 2, 3]}
        )
        
        self.assertEqual(result["processed"], True)
        self.assertEqual(result["input_count"], 3)
        self.assertIsNotNone(proof)
        self.assertIsNotNone(proof.proof_id)

class TestAgentIntegration(unittest.TestCase):
    """Integration tests with real network (requires testnet setup)."""
    
    def setUp(self):
        """Set up integration test fixtures."""
        self.sdk = ChaosChainAgentSDK(
            agent_name="IntegrationTestAgent",
            agent_domain="integration.test.com",
            agent_role="client",
            network="base-sepolia"
        )
    
    @unittest.skipUnless(
        os.getenv("RUN_INTEGRATION_TESTS") == "true",
        "Integration tests require RUN_INTEGRATION_TESTS=true"
    )
    def test_real_registration(self):
        """Test real registration (requires testnet ETH)."""
        try:
            agent_id, tx_hash = self.sdk.register_identity()
            self.assertIsInstance(agent_id, int)
            self.assertTrue(tx_hash.startswith("0x"))
        except Exception as e:
            self.skipTest(f"Registration requires testnet ETH: {e}")

# Run tests
if __name__ == "__main__":
    # Run unit tests
    unittest.main(verbosity=2)

Deployment Considerations

Environment Configuration

production_config.py
import os
from chaoschain_sdk import ChaosChainAgentSDK, NetworkConfig

class ProductionAgent:
    def __init__(self):
        # Load configuration from environment
        self.config = self._load_config()
        
        # Initialize SDK with production settings
        self.sdk = ChaosChainAgentSDK(
            agent_name=self.config["agent_name"],
            agent_domain=self.config["agent_domain"],
            agent_role=self.config["agent_role"],
            network=self.config["network"],
            enable_process_integrity=True,
            enable_payments=True,
            enable_storage=True,
            enable_ap2=self.config.get("enable_ap2", True)
        )
    
    def _load_config(self) -> dict:
        """Load configuration from environment variables."""
        required_vars = [
            "AGENT_NAME",
            "AGENT_DOMAIN", 
            "AGENT_ROLE",
            "NETWORK"
        ]
        
        config = {}
        missing_vars = []
        
        for var in required_vars:
            value = os.getenv(var)
            if not value:
                missing_vars.append(var)
            else:
                config[var.lower()] = value
        
        if missing_vars:
            raise ValueError(f"Missing required environment variables: {missing_vars}")
        
        # Optional configurations
        config["enable_ap2"] = os.getenv("ENABLE_AP2", "true").lower() == "true"
        
        return config
    
    def validate_environment(self) -> dict:
        """Validate production environment setup."""
        checks = {
            "config_loaded": bool(self.config),
            "network_rpc": bool(os.getenv(f"{self.config['network'].upper().replace('-', '_')}_RPC_URL")),
            "pinata_jwt": bool(os.getenv("PINATA_JWT")),
            "payment_credentials": self._check_payment_credentials(),
            "wallet_file_writable": self._check_wallet_permissions()
        }
        
        return checks
    
    def _check_payment_credentials(self) -> bool:
        """Check if payment processor credentials are configured."""
        payment_vars = [
            "STRIPE_SECRET_KEY",
            "GOOGLE_PAY_MERCHANT_ID", 
            "APPLE_PAY_MERCHANT_ID",
            "PAYPAL_CLIENT_ID"
        ]
        
        return any(os.getenv(var) for var in payment_vars)
    
    def _check_wallet_permissions(self) -> bool:
        """Check if wallet file is writable."""
        wallet_file = "chaoschain_wallets.json"
        try:
            # Try to create/write to wallet file
            with open(wallet_file, "a"):
                pass
            return True
        except PermissionError:
            return False

# Example production environment file (.env)
production_env = """
# Agent Configuration
AGENT_NAME=ProductionAnalysisAgent
AGENT_DOMAIN=analysis.mycompany.com
AGENT_ROLE=server
NETWORK=base-sepolia

# Network Configuration
BASE_SEPOLIA_RPC_URL=https://base-sepolia.g.alchemy.com/v2/YOUR_API_KEY

# Storage Configuration
PINATA_JWT=your_pinata_jwt_token
PINATA_GATEWAY=https://your-gateway.mypinata.cloud

# Payment Configuration
STRIPE_SECRET_KEY=sk_live_your_stripe_secret_key
GOOGLE_PAY_MERCHANT_ID=merchant.your-domain.com
PAYPAL_CLIENT_ID=your_paypal_client_id
PAYPAL_CLIENT_SECRET=your_paypal_client_secret

# Optional Features
ENABLE_AP2=true
"""

# Usage
if __name__ == "__main__":
    agent = ProductionAgent()
    
    # Validate environment
    checks = agent.validate_environment()
    print("🔍 Environment Validation:")
    for check, passed in checks.items():
        status = "✅" if passed else "❌"
        print(f"  {status} {check}")
    
    if all(checks.values()):
        print("🚀 Ready for production deployment!")
    else:
        print("⚠️  Environment needs configuration before deployment")

Next Steps

I