diff --git a/examples/cookbooks/Industry_Templates/README.md b/examples/cookbooks/Industry_Templates/README.md new file mode 100644 index 000000000..7180e08b8 --- /dev/null +++ b/examples/cookbooks/Industry_Templates/README.md @@ -0,0 +1,216 @@ +# Industry Templates for PraisonAI + +This directory contains industry-specific agent templates based on the SRAO Framework, providing pre-built workflows for rapid deployment of agent workforces across different industries. + +## Overview + +These templates demonstrate how PraisonAI can be used to create specialized agent teams for various industries, with **70% code reuse** across different sectors through shared patterns and abstractions. + +## Available Templates + +### 1. Manufacturing (`manufacturing_template.py`) +**Key Agents:** +- **ParseOrder**: Extracts and structures order information from various formats +- **CheckInventory**: Validates material availability and supply chain status +- **OptimizeSchedule**: Optimizes production scheduling based on constraints +- **DefectDetect**: Quality control and defect detection + +**Features:** +- Order processing with multiple input formats +- Real-time inventory management +- Production optimization algorithms +- Quality inspection with defect detection +- Fallback strategies for each critical step + +### 2. Energy (`energy_template.py`) +**Key Agents:** +- **SCADAReader**: Processes SCADA telemetry data from wind turbines +- **VibrationAnalyzer**: Analyzes vibration patterns for predictive maintenance +- **PowerForecaster**: Predicts power generation based on weather and history +- **MaintenanceScheduler**: Schedules predictive maintenance + +**Features:** +- Real-time wind farm monitoring +- Vibration-based fault detection +- Power generation forecasting +- Grid integration and load balancing +- Predictive maintenance scheduling + +### 3. Healthcare (`healthcare_template.py`) +**Key Agents:** +- **VitalSignsCapture**: Processes patient vital signs from monitoring devices +- **EMRRetrieval**: Retrieves electronic medical records with HIPAA compliance +- **TriageRecommendation**: Provides ESI-based triage recommendations +- **ResourceAllocator**: Manages hospital resource allocation + +**Features:** +- Emergency triage workflow +- HIPAA-compliant data handling +- Patient safety protocols +- Resource optimization +- Cross-department coordination + +### 4. Agriculture (`agriculture_template.py`) +**Key Agents:** +- **MultispectralAnalyzer**: Analyzes multispectral imagery for crop health +- **DiseaseIdentifier**: Identifies crop diseases and pests using AI +- **SprayRecommender**: Recommends targeted spraying strategies +- **YieldPredictor**: Predicts crop yield based on conditions + +**Features:** +- Precision agriculture with drone/satellite imagery +- Pest and disease early warning system +- Optimized chemical application +- Yield forecasting +- Sustainable farming patterns + +### 5. Transportation (`transportation_template.py`) +**Key Agents:** +- **LiDARFusion**: Processes LiDAR point cloud data for infrastructure +- **DisplacementCalculator**: Calculates structural displacement +- **HeatmapGenerator**: Generates safety visualization heatmaps +- **MaintenancePlanner**: Plans infrastructure maintenance + +**Features:** +- Tunnel/bridge safety monitoring +- LiDAR-based structural analysis +- Real-time safety assessment +- Traffic flow optimization +- Predictive maintenance scheduling + +## Key Features Across All Templates + +### 1. I/O Schemas +Each template includes Pydantic models for structured data handling: +- Input validation +- Type safety +- Clear data contracts between agents + +### 2. SLA Requirements +Every agent has defined Service Level Agreements: +- Processing time requirements +- Response time guarantees +- Performance benchmarks + +### 3. Fallback Strategies +Robust error handling with fallback mechanisms: +- Graceful degradation +- Manual intervention queues +- Conservative estimates when systems fail + +### 4. Cross-Industry Patterns (70% Reuse) +Reusable base patterns that work across industries: +- Data parsing agents +- Resource availability checkers +- Optimization engines +- Inspection/quality control agents + +## Usage Example + +```python +from manufacturing_template import ( + order_parser, + inventory_checker, + schedule_optimizer, + quality_inspector, + manufacturing_workflow +) + +# Process a manufacturing order +order_text = "Customer ABC needs 100 units of Product-XYZ by March 15th, high priority" +result = manufacturing_workflow(order_text) +print(result) +``` + +## Adapting Templates for Your Industry + +### Using Base Patterns +Each template includes reusable pattern classes that can be adapted: + +```python +from manufacturing_template import IndustryAgentPattern + +# Create a custom data parser for any domain +custom_parser = IndustryAgentPattern.create_data_parser( + name="CustomParser", + domain="retail", + sla_seconds=20 +) + +# Create a resource checker for any resource type +resource_checker = IndustryAgentPattern.create_resource_checker( + name="StaffAvailability", + resource_type="customer_service_staff", + sla_seconds=3 +) +``` + +### Creating Custom Workflows +Combine agents from different templates or create new ones: + +```python +from praisonaiagents import Agent, tool + +# Define custom tools +@tool +def custom_analysis(data: str) -> Dict: + """Your custom analysis logic""" + return {"result": "analyzed"} + +# Create custom agent +custom_agent = Agent( + name="CustomAnalyzer", + instructions="Your specific instructions", + tools=[custom_analysis] +) +``` + +## Integration with PraisonAI Core + +These templates are built on top of PraisonAI's core Agent framework: + +```python +from praisonaiagents import Agent, tool, AgentTeam, Task + +# Combine multiple industry agents into a team +team = AgentTeam( + agents=[order_parser, inventory_checker, schedule_optimizer], + tasks=[ + Task(name="parse", agent=order_parser), + Task(name="check", agent=inventory_checker), + Task(name="optimize", agent=schedule_optimizer) + ] +) +``` + +## Best Practices + +1. **Start with Templates**: Use these as starting points and customize for your specific needs +2. **Maintain SLAs**: Define clear performance requirements for each agent +3. **Implement Fallbacks**: Always have fallback strategies for critical operations +4. **Use Type Safety**: Leverage Pydantic models for data validation +5. **Monitor Performance**: Track agent performance against SLAs +6. **Iterate and Improve**: Continuously refine based on real-world usage + +## Contributing + +To add a new industry template: + +1. Follow the existing template structure +2. Include all key components: + - I/O Schemas (Pydantic models) + - Agent definitions with SLAs + - Tools for agent capabilities + - Complete workflow function + - Fallback strategies + - Cross-industry patterns +3. Add comprehensive documentation +4. Include usage examples + +## License + +These templates are part of the PraisonAI project and follow the same licensing terms. + +## References + +Based on the [SRAO Framework](https://github.com/beixuan577/SRAO-Framework) (MIT License) \ No newline at end of file diff --git a/examples/cookbooks/Industry_Templates/__init__.py b/examples/cookbooks/Industry_Templates/__init__.py new file mode 100644 index 000000000..b8bd27f7d --- /dev/null +++ b/examples/cookbooks/Industry_Templates/__init__.py @@ -0,0 +1,38 @@ +""" +Industry Templates for PraisonAI +================================ + +Pre-built agent templates for rapid deployment across different industries. +Based on SRAO Framework with 70% cross-industry code reuse. + +Available templates: +- Manufacturing: Order processing, inventory, scheduling, quality control +- Energy: Wind farm monitoring, predictive maintenance, power forecasting +- Healthcare: Emergency triage, EMR retrieval, resource allocation +- Agriculture: Precision farming, pest detection, yield prediction +- Transportation: Infrastructure monitoring, safety assessment, maintenance + +Example usage: + from Industry_Templates.manufacturing_template import manufacturing_workflow + + result = manufacturing_workflow("Customer order text") +""" + +__version__ = "1.0.0" +__author__ = "PraisonAI Team" + +# Import the modules so they're accessible via the package +from . import manufacturing_template +from . import energy_template +from . import healthcare_template +from . import agriculture_template +from . import transportation_template + +# Export commonly used items for convenience +__all__ = [ + "manufacturing_template", + "energy_template", + "healthcare_template", + "agriculture_template", + "transportation_template" +] \ No newline at end of file diff --git a/examples/cookbooks/Industry_Templates/agriculture_template.py b/examples/cookbooks/Industry_Templates/agriculture_template.py new file mode 100644 index 000000000..eeea85a8c --- /dev/null +++ b/examples/cookbooks/Industry_Templates/agriculture_template.py @@ -0,0 +1,551 @@ +""" +Agriculture Industry Template +============================= +Precision agriculture and pest management workflow +Based on SRAO Framework with IoT sensor integration + +Key agents: +- MultispectralAnalyzer: Analyzes multispectral imagery +- DiseaseIdentifier: Identifies crop diseases and pests +- SprayRecommender: Recommends targeted spraying strategies +- YieldPredictor: Predicts crop yield based on conditions +""" + +from praisonaiagents import Agent, tool +from typing import Dict, List, Any, Tuple, Optional +from pydantic import BaseModel +from datetime import datetime, timedelta +from enum import Enum + + +# Crop health levels +class CropHealthLevel(str, Enum): + EXCELLENT = "excellent" + GOOD = "good" + FAIR = "fair" + POOR = "poor" + CRITICAL = "critical" + + +# Pest/Disease severity +class SeverityLevel(str, Enum): + NONE = "none" + LOW = "low" + MODERATE = "moderate" + HIGH = "high" + SEVERE = "severe" + + +# I/O Schemas +class MultispectralData(BaseModel): + """Multispectral imagery analysis data""" + field_id: str + capture_time: str + ndvi_index: float # Normalized Difference Vegetation Index + ndre_index: float # Normalized Difference Red Edge + ccci_index: float # Canopy Chlorophyll Content Index + moisture_level: float # Soil moisture percentage + temperature: float # Celsius + affected_area: float # Hectares + gps_coordinates: Tuple[float, float] + + +class PestDiseaseReport(BaseModel): + """Pest and disease identification report""" + report_id: str + field_id: str + detection_time: str + pest_type: Optional[str] + disease_type: Optional[str] + severity: SeverityLevel + affected_crops: List[str] + spread_rate: float # % per day + environmental_factors: Dict[str, Any] + confidence_score: float + + +class SprayRecommendation(BaseModel): + """Targeted spraying recommendation""" + recommendation_id: str + field_id: str + treatment_type: str # pesticide, fungicide, herbicide, fertilizer + product_name: str + application_rate: float # L/hectare + target_zones: List[Dict[str, Any]] # GPS zones + optimal_time: str + weather_window: Dict[str, Any] + cost_estimate: float + environmental_impact: str # low, medium, high + + +class YieldForecast(BaseModel): + """Crop yield prediction""" + forecast_id: str + field_id: str + crop_type: str + predicted_yield: float # tons/hectare + confidence_interval: Tuple[float, float] + harvest_window: Tuple[str, str] + quality_grade: str # A, B, C + market_price_estimate: float + risk_factors: List[str] + + +# Agriculture-specific tools +@tool +def analyze_multispectral_imagery(field_id: str, image_data: Dict) -> Dict: + """Analyze multispectral drone/satellite imagery for crop health""" + # Simulate multispectral analysis + ndvi = 0.75 # Healthy vegetation typically 0.6-0.9 + + # Determine health based on NDVI + if ndvi > 0.8: + health = CropHealthLevel.EXCELLENT + affected_area = 0.0 + elif ndvi > 0.6: + health = CropHealthLevel.GOOD + affected_area = 2.5 + elif ndvi > 0.4: + health = CropHealthLevel.FAIR + affected_area = 5.0 + else: + health = CropHealthLevel.POOR + affected_area = 10.0 + + return { + "field_id": field_id, + "capture_time": datetime.now().isoformat(), + "ndvi_index": ndvi, + "ndre_index": 0.68, + "ccci_index": 0.72, + "moisture_level": 35.0, + "temperature": 24.5, + "affected_area": affected_area, + "gps_coordinates": (40.7128, -74.0060), + "health_level": health.value + } + + +@tool +def identify_pest_disease(spectral_data: Dict, historical_data: List) -> Dict: + """Identify pests and diseases using AI image recognition""" + # Simulate pest/disease detection + ndvi = spectral_data.get("ndvi_index", 0.75) + affected_area = spectral_data.get("affected_area", 0) + + if affected_area > 8.0: + pest_type = "aphids" + disease_type = "powdery_mildew" + severity = SeverityLevel.HIGH + spread_rate = 2.5 + elif affected_area > 4.0: + pest_type = "whiteflies" + disease_type = "leaf_spot" + severity = SeverityLevel.MODERATE + spread_rate = 1.5 + elif affected_area > 0: + pest_type = None + disease_type = "early_blight" + severity = SeverityLevel.LOW + spread_rate = 0.5 + else: + pest_type = None + disease_type = None + severity = SeverityLevel.NONE + spread_rate = 0.0 + + return { + "report_id": f"PDR-{datetime.now().strftime('%Y%m%d%H%M')}", + "field_id": spectral_data["field_id"], + "detection_time": datetime.now().isoformat(), + "pest_type": pest_type, + "disease_type": disease_type, + "severity": severity.value, + "affected_crops": ["wheat", "corn"], + "spread_rate": spread_rate, + "environmental_factors": { + "humidity": 65, + "temperature": spectral_data.get("temperature", 25), + "rainfall_last_week": 15 # mm + }, + "confidence_score": 0.85 + } + + +@tool +def calculate_spray_recommendation(pest_report: Dict, field_data: Dict, weather: Dict) -> Dict: + """Calculate optimal spraying strategy for pest/disease control""" + severity = pest_report.get("severity", "none") + + if severity == "high" or severity == "severe": + treatment = "pesticide" + product = "Imidacloprid" + rate = 0.5 + impact = "medium" + elif severity == "moderate": + treatment = "fungicide" + product = "Azoxystrobin" + rate = 0.3 + impact = "low" + elif severity == "low": + treatment = "organic_treatment" + product = "Neem Oil" + rate = 0.2 + impact = "minimal" + else: + treatment = "fertilizer" + product = "NPK 20-20-20" + rate = 0.4 + impact = "positive" + + # Calculate target zones based on affected area + affected_area = field_data.get("affected_area", 0) + if affected_area > 0: + target_zones = [ + { + "zone_id": "A1", + "gps_boundary": [(40.71, -74.00), (40.72, -74.01)], + "priority": "high" + } + ] + else: + target_zones = [] + + return { + "recommendation_id": f"SPR-{datetime.now().strftime('%Y%m%d%H%M')}", + "field_id": field_data["field_id"], + "treatment_type": treatment, + "product_name": product, + "application_rate": rate, + "affected_area": affected_area, + "target_zones": target_zones, + "optimal_time": (datetime.now() + timedelta(days=1)).isoformat(), + "weather_window": { + "wind_speed": weather.get("wind_speed", 5), + "precipitation": weather.get("precipitation", 0), + "suitable": True + }, + "cost_estimate": rate * affected_area * 50, # $/hectare + "environmental_impact": impact + } + + +@tool +def predict_crop_yield(field_id: str, crop_type: str, current_health: Dict, weather_forecast: Dict) -> Dict: + """Predict crop yield based on current conditions and forecasts""" + # Simulate yield prediction + health_level = current_health.get("health_level", "good") + + base_yield = { + "wheat": 3.5, + "corn": 9.5, + "soybeans": 2.8, + "rice": 6.0 + }.get(crop_type, 5.0) + + # Adjust based on health + health_multiplier = { + "excellent": 1.2, + "good": 1.0, + "fair": 0.8, + "poor": 0.6, + "critical": 0.4 + }.get(health_level, 1.0) + + predicted_yield = base_yield * health_multiplier + + # Quality grade based on conditions + if health_level in ["excellent", "good"]: + quality = "A" + price_multiplier = 1.2 + elif health_level == "fair": + quality = "B" + price_multiplier = 1.0 + else: + quality = "C" + price_multiplier = 0.8 + + return { + "forecast_id": f"YLD-{datetime.now().strftime('%Y%m%d%H%M')}", + "field_id": field_id, + "crop_type": crop_type, + "predicted_yield": predicted_yield, + "confidence_interval": (predicted_yield * 0.9, predicted_yield * 1.1), + "harvest_window": ( + (datetime.now() + timedelta(days=60)).isoformat(), + (datetime.now() + timedelta(days=75)).isoformat() + ), + "quality_grade": quality, + "market_price_estimate": predicted_yield * 250 * price_multiplier, # $/ton + "risk_factors": ["weather_variability", "pest_pressure", "market_volatility"] + } + + +# Agriculture agent definitions +multispectral_agent = Agent( + name="MultispectralAnalyzer", + instructions="""You are a remote sensing specialist for precision agriculture. + Analyze multispectral imagery from drones and satellites. + Calculate vegetation indices (NDVI, NDRE, CCCI) to assess crop health. + Identify stress zones and anomalies in field conditions. + SLA: Process imagery within 2 minutes per field.""", + tools=[analyze_multispectral_imagery] +) + +disease_agent = Agent( + name="DiseaseIdentifier", + instructions="""You are a plant pathology expert with AI vision capabilities. + Identify crop diseases, pests, and nutrient deficiencies. + Assess severity levels and predict spread patterns. + Consider environmental factors affecting disease development. + SLA: Complete identification within 30 seconds.""", + tools=[identify_pest_disease] +) + +spray_agent = Agent( + name="SprayRecommender", + instructions="""You are a precision application specialist. + Recommend targeted spraying strategies for pest and disease control. + Optimize chemical usage to minimize environmental impact. + Consider weather windows and application regulations. + SLA: Generate recommendations within 1 minute.""", + tools=[calculate_spray_recommendation] +) + +yield_agent = Agent( + name="YieldPredictor", + instructions="""You are a crop yield forecasting specialist. + Predict harvest yields based on current conditions and historical data. + Assess crop quality grades and market value estimates. + Identify risk factors that could impact final yield. + SLA: Generate forecast within 45 seconds.""", + tools=[predict_crop_yield] +) + + +# Precision agriculture workflow +def precision_agriculture_workflow(field_ids: List[str], crop_type: str, weather_data: Dict): + """ + Complete precision agriculture workflow from monitoring to treatment + Includes early warning system and sustainable farming practices + """ + + workflow_results = { + "timestamp": datetime.now().isoformat(), + "fields_monitored": len(field_ids), + "alerts": [], + "treatments_needed": [], + "yield_forecasts": [], + "sustainability_score": 0.0 + } + + for field_id in field_ids: + try: + # Step 1: Multispectral analysis + spectral_analysis = multispectral_agent.start( + f"Analyze multispectral imagery for field {field_id}" + ) + + # Step 2: Disease and pest detection + pest_report = disease_agent.start( + f"Identify pests/diseases from spectral data: {spectral_analysis}" + ) + + # Check for alerts + if pest_report.get("severity") in ["high", "severe"]: + workflow_results["alerts"].append({ + "field_id": field_id, + "type": "pest_disease_alert", + "severity": pest_report["severity"], + "pest": pest_report.get("pest_type"), + "disease": pest_report.get("disease_type"), + "action": "immediate_treatment_required" + }) + + # Step 3: Generate spray recommendation + spray_recommendation = spray_agent.start( + f"Calculate spray strategy for pest report: {pest_report}, " + f"field: {spectral_analysis}, weather: {weather_data}" + ) + workflow_results["treatments_needed"].append(spray_recommendation) + + # Step 4: Yield prediction + yield_forecast = yield_agent.start( + f"Predict yield for field {field_id}, crop {crop_type}, " + f"health: {spectral_analysis}, weather: {weather_data}" + ) + workflow_results["yield_forecasts"].append(yield_forecast) + + except Exception as e: + # Fallback: Schedule manual inspection + workflow_results["alerts"].append({ + "field_id": field_id, + "type": "monitoring_failure", + "error": str(e), + "action": "schedule_manual_inspection" + }) + + # Calculate sustainability score + total_chemical = sum( + t.get("application_rate", 0) * t.get("affected_area", 0) + for t in workflow_results["treatments_needed"] + ) + workflow_results["sustainability_score"] = max(0, 100 - (total_chemical * 10)) + + return workflow_results + + +# Sustainable farming patterns +class SustainableFarmingPatterns: + """ + Patterns for sustainable and regenerative agriculture + """ + + @staticmethod + def integrated_pest_management(pest_level: str) -> Dict: + """IPM strategy based on pest pressure""" + strategies = { + "low": { + "method": "biological_control", + "actions": ["release_beneficial_insects", "plant_trap_crops"], + "chemical_use": 0 + }, + "moderate": { + "method": "targeted_intervention", + "actions": ["spot_treatment", "pheromone_traps", "minimal_spray"], + "chemical_use": 30 + }, + "high": { + "method": "integrated_approach", + "actions": ["systematic_spray", "crop_rotation", "resistant_varieties"], + "chemical_use": 70 + } + } + return strategies.get(pest_level, strategies["moderate"]) + + @staticmethod + def precision_irrigation(moisture_level: float, crop_stage: str) -> Dict: + """Calculate optimal irrigation based on soil moisture and crop stage""" + # Water requirements by growth stage (mm/day) + stage_requirements = { + "germination": 3.0, + "vegetative": 5.0, + "flowering": 7.0, + "grain_filling": 6.0, + "maturation": 2.0 + } + + required = stage_requirements.get(crop_stage, 5.0) + deficit = max(0, 40 - moisture_level) # Target 40% moisture + + return { + "irrigation_needed": deficit > 10, + "amount_mm": deficit * 2, + "method": "drip" if deficit < 20 else "sprinkler", + "schedule": "night_irrigation" if deficit > 15 else "morning_irrigation", + "water_saved": (100 - deficit) / 100 * 1000 # Liters per hectare + } + + @staticmethod + def crop_rotation_optimizer(current_crop: str, soil_data: Dict) -> str: + """Recommend next crop for rotation based on soil health""" + rotations = { + "wheat": ["soybeans", "canola"], # Nitrogen fixation + "corn": ["soybeans", "alfalfa"], # Break disease cycle + "soybeans": ["wheat", "corn"], # Utilize fixed nitrogen + "rice": ["legumes", "vegetables"], # Soil structure improvement + } + + return rotations.get(current_crop, ["cover_crops"])[0] + + +# IoT sensor integration patterns +class IoTSensorPatterns: + """ + Patterns for integrating IoT sensors in smart farming + """ + + @staticmethod + def create_sensor_network_agent(sensor_types: List[str]) -> Agent: + """Create agent for managing IoT sensor networks""" + return Agent( + name="SensorNetworkManager", + instructions=f"""Manage IoT sensor network with {', '.join(sensor_types)}. + Collect real-time data from field sensors. + Detect sensor anomalies and maintenance needs. + Aggregate data for precision agriculture decisions. + SLA: Data collection every 15 minutes.""" + ) + + @staticmethod + def process_sensor_data(sensor_readings: List[Dict]) -> Dict: + """Process and aggregate IoT sensor data""" + aggregated = { + "timestamp": datetime.now().isoformat(), + "sensor_count": len(sensor_readings), + "averages": {}, + "alerts": [] + } + + # Calculate averages + for reading in sensor_readings: + for key, value in reading.items(): + if isinstance(value, (int, float)): + if key not in aggregated["averages"]: + aggregated["averages"][key] = [] + aggregated["averages"][key].append(value) + + # Convert to means + for key in aggregated["averages"]: + values = aggregated["averages"][key] + aggregated["averages"][key] = sum(values) / len(values) + + # Check for anomalies + if key == "soil_ph" and (aggregated["averages"][key] < 6.0 or aggregated["averages"][key] > 7.5): + aggregated["alerts"].append({ + "type": "soil_ph_anomaly", + "value": aggregated["averages"][key], + "action": "adjust_soil_treatment" + }) + + return aggregated + + +# Example usage +if __name__ == "__main__": + # Monitor multiple fields + fields = ["FIELD-001", "FIELD-002", "FIELD-003"] + weather = { + "temperature": 25, + "humidity": 60, + "wind_speed": 8, + "precipitation": 0, + "forecast": "partly_cloudy" + } + + result = precision_agriculture_workflow(fields, "wheat", weather) + print("Agriculture workflow result:", result) + + # Sustainable farming example + ipm_strategy = SustainableFarmingPatterns.integrated_pest_management("moderate") + print("IPM Strategy:", ipm_strategy) + + irrigation_plan = SustainableFarmingPatterns.precision_irrigation(25.0, "flowering") + print("Irrigation Plan:", irrigation_plan) + + next_crop = SustainableFarmingPatterns.crop_rotation_optimizer("wheat", {}) + print("Recommended next crop:", next_crop) + + # IoT integration example + sensor_agent = IoTSensorPatterns.create_sensor_network_agent( + ["soil_moisture", "temperature", "humidity", "light", "ph"] + ) + + sensor_data = [ + {"sensor_id": "S001", "soil_moisture": 35, "temperature": 24, "soil_ph": 6.8}, + {"sensor_id": "S002", "soil_moisture": 32, "temperature": 25, "soil_ph": 7.2}, + {"sensor_id": "S003", "soil_moisture": 38, "temperature": 23, "soil_ph": 5.8} + ] + + processed_data = IoTSensorPatterns.process_sensor_data(sensor_data) + print("Processed sensor data:", processed_data) \ No newline at end of file diff --git a/examples/cookbooks/Industry_Templates/energy_template.py b/examples/cookbooks/Industry_Templates/energy_template.py new file mode 100644 index 000000000..cc3c8e0fd --- /dev/null +++ b/examples/cookbooks/Industry_Templates/energy_template.py @@ -0,0 +1,372 @@ +""" +Energy Industry Template +======================== +Wind farm monitoring and power optimization workflow +Based on SRAO Framework patterns with predictive maintenance + +Key agents: +- SCADAReader: Processes SCADA telemetry data +- VibrationAnalyzer: Analyzes turbine vibration patterns +- PowerForecaster: Predicts power generation +- MaintenanceScheduler: Schedules predictive maintenance +""" + +from praisonaiagents import Agent, tool +from typing import Dict, List, Any, Optional +from pydantic import BaseModel +from datetime import datetime, timedelta + + +# I/O Schemas +class TurbineData(BaseModel): + """Wind turbine telemetry data""" + turbine_id: str + timestamp: str + wind_speed: float # m/s + power_output: float # MW + rotor_speed: float # RPM + blade_pitch: float # degrees + nacelle_temperature: float # Celsius + vibration_level: float # mm/s + operational_status: str # running, stopped, maintenance + + +class VibrationAnalysis(BaseModel): + """Vibration analysis results""" + turbine_id: str + analysis_time: str + vibration_rms: float + frequency_peaks: List[float] + anomaly_detected: bool + failure_probability: float + recommended_action: str + + +class PowerForecast(BaseModel): + """Power generation forecast""" + forecast_id: str + start_time: str + end_time: str + predicted_output: List[float] # MW per hour + confidence_interval: float + weather_factors: Dict[str, Any] + grid_demand: float + + +class MaintenancePlan(BaseModel): + """Maintenance scheduling plan""" + plan_id: str + turbine_id: str + maintenance_type: str # preventive, predictive, corrective + scheduled_date: str + estimated_duration: int # hours + required_parts: List[str] + technician_team: str + production_loss: float # MW + + +# Tools for energy management +@tool +def read_scada_telemetry(turbine_id: str, time_range: str = "last_hour") -> Dict: + """Read SCADA system telemetry for wind turbines""" + # Simulate SCADA data reading + return { + "turbine_id": turbine_id, + "timestamp": datetime.now().isoformat(), + "wind_speed": 12.5, + "power_output": 2.3, + "rotor_speed": 16.8, + "blade_pitch": 8.5, + "nacelle_temperature": 45.2, + "vibration_level": 3.2, + "operational_status": "running" + } + + +@tool +def analyze_vibration_patterns(turbine_data: Dict) -> Dict: + """Analyze vibration patterns for early fault detection""" + # Simulate vibration analysis + vibration_level = turbine_data.get("vibration_level", 0) + anomaly = vibration_level > 5.0 # Threshold for anomaly + + return { + "turbine_id": turbine_data["turbine_id"], + "analysis_time": datetime.now().isoformat(), + "vibration_rms": vibration_level, + "frequency_peaks": [10.5, 25.3, 50.0], # Hz + "anomaly_detected": anomaly, + "failure_probability": min(vibration_level / 10.0, 1.0), + "recommended_action": "immediate_inspection" if anomaly else "continue_monitoring" + } + + +@tool +def forecast_power_generation(weather_data: Dict, historical_data: List) -> Dict: + """Forecast power generation based on weather and historical patterns""" + # Simulate power forecasting + base_forecast = [2.0 + (i * 0.1) for i in range(24)] # 24-hour forecast + + return { + "forecast_id": f"FCST-{datetime.now().strftime('%Y%m%d%H%M')}", + "start_time": datetime.now().isoformat(), + "end_time": (datetime.now() + timedelta(hours=24)).isoformat(), + "predicted_output": base_forecast, + "confidence_interval": 0.85, + "weather_factors": weather_data, + "grid_demand": sum(base_forecast) + } + + +@tool +def schedule_maintenance(turbine_id: str, maintenance_type: str, urgency: str = "normal") -> Dict: + """Schedule maintenance based on condition monitoring""" + # Calculate maintenance window + if urgency == "urgent": + scheduled_date = (datetime.now() + timedelta(days=1)).isoformat() + else: + scheduled_date = (datetime.now() + timedelta(days=7)).isoformat() + + return { + "plan_id": f"MNT-{datetime.now().strftime('%Y%m%d%H%M')}", + "turbine_id": turbine_id, + "maintenance_type": maintenance_type, + "scheduled_date": scheduled_date, + "estimated_duration": 4, + "required_parts": ["bearing_set", "lubricant", "filter"], + "technician_team": "Team-A", + "production_loss": 8.0 # MW lost during maintenance + } + + +# Agent Definitions with SLA requirements +scada_reader = Agent( + name="SCADAReader", + instructions="""You are a SCADA system specialist for wind farms. + Process real-time telemetry data from wind turbines. + Identify operational anomalies and performance deviations. + SLA: Process telemetry within 1 second for real-time monitoring.""", + tools=[read_scada_telemetry] +) + +vibration_analyzer = Agent( + name="VibrationAnalyzer", + instructions="""You are a vibration analysis expert for rotating machinery. + Analyze vibration patterns to detect early signs of mechanical failure. + Use FFT analysis and pattern recognition for fault diagnosis. + SLA: Complete analysis within 5 seconds per turbine.""", + tools=[analyze_vibration_patterns] +) + +power_forecaster = Agent( + name="PowerForecaster", + instructions="""You are a power generation forecasting specialist. + Predict power output based on weather forecasts and historical patterns. + Consider grid demand and optimize generation schedules. + SLA: Generate 24-hour forecast within 30 seconds.""", + tools=[forecast_power_generation] +) + +maintenance_scheduler = Agent( + name="MaintenanceScheduler", + instructions="""You are a predictive maintenance planning expert. + Schedule maintenance based on condition monitoring and failure predictions. + Minimize production loss while ensuring turbine reliability. + SLA: Create maintenance plan within 1 minute.""", + tools=[schedule_maintenance] +) + + +# Energy workflow with grid integration +def energy_monitoring_workflow(turbine_ids: List[str], weather_forecast: Dict): + """ + Complete energy monitoring and optimization workflow + Includes predictive maintenance and grid integration + """ + + results = { + "timestamp": datetime.now().isoformat(), + "turbines_monitored": len(turbine_ids), + "alerts": [], + "maintenance_required": [], + "power_forecast": None + } + + # Step 1: Monitor all turbines + for turbine_id in turbine_ids: + try: + # Read SCADA data + scada_data = scada_reader.start( + f"Read telemetry for turbine {turbine_id}" + ) + + # Analyze vibrations + vibration_result = vibration_analyzer.start( + f"Analyze vibrations for data: {scada_data}" + ) + + # Check for anomalies + if vibration_result.get("anomaly_detected"): + results["alerts"].append({ + "turbine_id": turbine_id, + "type": "vibration_anomaly", + "severity": "high" if vibration_result.get("failure_probability", 0) > 0.7 else "medium" + }) + + # Schedule maintenance if needed + maintenance_plan = maintenance_scheduler.start( + f"Schedule maintenance for turbine {turbine_id} with urgency based on {vibration_result}" + ) + results["maintenance_required"].append(maintenance_plan) + + except Exception as e: + # Fallback: Mark turbine for manual inspection + results["alerts"].append({ + "turbine_id": turbine_id, + "type": "monitoring_failure", + "error": str(e), + "action": "manual_inspection_required" + }) + + # Step 2: Power generation forecast + try: + historical_data = [] # Would be fetched from database + power_forecast = power_forecaster.start( + f"Forecast power with weather {weather_forecast} and history {historical_data}" + ) + results["power_forecast"] = power_forecast + + # Grid integration check + if power_forecast.get("predicted_output"): + total_predicted = sum(power_forecast["predicted_output"]) + grid_demand = power_forecast.get("grid_demand", 0) + + if total_predicted < grid_demand * 0.9: + results["alerts"].append({ + "type": "supply_shortage", + "predicted": total_predicted, + "demand": grid_demand, + "action": "consider_backup_sources" + }) + + except Exception as e: + # Fallback: Use conservative forecast + results["power_forecast"] = { + "status": "fallback_mode", + "conservative_estimate": len(turbine_ids) * 1.5, # MW + "error": str(e) + } + + return results + + +# Cross-industry pattern adaptation +class EnergyPatternAdapter: + """ + Adapts manufacturing patterns for energy sector + Demonstrates 70% code reuse across industries + """ + + @staticmethod + def adapt_for_solar_farm(): + """Adapt wind farm agents for solar farm monitoring""" + return Agent( + name="SolarMonitor", + instructions="""Monitor solar panel efficiency and temperature. + Track irradiance levels and predict power output. + Detect panel degradation and soiling issues. + SLA: Real-time monitoring with 2-second updates.""" + ) + + @staticmethod + def adapt_for_power_grid(): + """Adapt for power grid management""" + return Agent( + name="GridBalancer", + instructions="""Balance power supply and demand across the grid. + Manage load distribution and prevent blackouts. + Coordinate with renewable and traditional sources. + SLA: Sub-second response for grid stability.""" + ) + + @staticmethod + def adapt_for_battery_storage(): + """Adapt for battery energy storage systems""" + return Agent( + name="BatteryOptimizer", + instructions="""Optimize battery charge/discharge cycles. + Predict battery degradation and schedule replacements. + Maximize energy arbitrage opportunities. + SLA: Optimization decision within 10 seconds.""" + ) + + +# Fallback strategies for critical operations +class EnergyFallbackStrategies: + """ + Fallback strategies for maintaining grid stability + """ + + @staticmethod + def turbine_failure_fallback(failed_turbine_id: str, available_turbines: List[str]): + """Handle turbine failure by redistributing load""" + return { + "strategy": "load_redistribution", + "failed_turbine": failed_turbine_id, + "compensating_turbines": available_turbines[:3], # Use top 3 turbines + "increase_output_by": 0.3, # MW per turbine + "duration": "until_maintenance_complete" + } + + @staticmethod + def forecast_failure_fallback(last_known_forecast: Optional[Dict]): + """Handle forecasting system failure""" + if last_known_forecast: + return { + "strategy": "use_last_known", + "forecast": last_known_forecast, + "confidence": 0.6, + "validity": "4_hours" + } + else: + return { + "strategy": "seasonal_average", + "estimated_output": 50.0, # MW based on seasonal average + "confidence": 0.4, + "recommendation": "increase_reserves" + } + + @staticmethod + def communication_failure_fallback(): + """Handle SCADA communication failure""" + return { + "strategy": "local_control_mode", + "actions": [ + "Switch turbines to autonomous mode", + "Use local safety thresholds", + "Log data locally for later sync", + "Alert maintenance teams" + ], + "recovery_check_interval": 300 # seconds + } + + +# Example usage +if __name__ == "__main__": + # Monitor wind farm + turbine_list = [f"WT-{i:03d}" for i in range(1, 11)] # 10 turbines + weather = {"wind_speed": 14.0, "direction": "NW", "temperature": 15.0} + + result = energy_monitoring_workflow(turbine_list, weather) + print("Energy monitoring result:", result) + + # Demonstrate cross-industry adaptation + solar_monitor = EnergyPatternAdapter.adapt_for_solar_farm() + grid_balancer = EnergyPatternAdapter.adapt_for_power_grid() + battery_optimizer = EnergyPatternAdapter.adapt_for_battery_storage() + + # Fallback examples + turbine_fallback = EnergyFallbackStrategies.turbine_failure_fallback( + "WT-003", ["WT-001", "WT-002", "WT-004", "WT-005"] + ) + print("Turbine failure fallback:", turbine_fallback) \ No newline at end of file diff --git a/examples/cookbooks/Industry_Templates/healthcare_template.py b/examples/cookbooks/Industry_Templates/healthcare_template.py new file mode 100644 index 000000000..1f292f714 --- /dev/null +++ b/examples/cookbooks/Industry_Templates/healthcare_template.py @@ -0,0 +1,478 @@ +""" +Healthcare Industry Template +============================ +Emergency triage and patient care coordination workflow +Based on SRAO Framework with HIPAA-compliant patterns + +Key agents: +- VitalSignsCapture: Processes patient vital signs +- EMRRetrieval: Retrieves electronic medical records +- TriageRecommendation: Provides triage recommendations +- ResourceAllocator: Manages hospital resource allocation +""" + +from praisonaiagents import Agent, tool +from typing import Dict, List, Any, Optional +from pydantic import BaseModel +from datetime import datetime +from enum import Enum + + +# Triage levels following Emergency Severity Index (ESI) +class TriageLevel(str, Enum): + RESUSCITATION = "1_resuscitation" # Immediate life-saving intervention + EMERGENT = "2_emergent" # High risk, severe pain/distress + URGENT = "3_urgent" # Stable but needs multiple resources + LESS_URGENT = "4_less_urgent" # Stable, one resource needed + NON_URGENT = "5_non_urgent" # Stable, no resources needed + + +# I/O Schemas +class VitalSigns(BaseModel): + """Patient vital signs data""" + patient_id: str + timestamp: str + heart_rate: int # bpm + blood_pressure_systolic: int # mmHg + blood_pressure_diastolic: int # mmHg + respiratory_rate: int # breaths/min + temperature: float # Celsius + oxygen_saturation: int # percentage + pain_scale: int # 0-10 + consciousness_level: str # alert, verbal, pain, unresponsive (AVPU) + + +class MedicalHistory(BaseModel): + """Patient medical history from EMR""" + patient_id: str + allergies: List[str] + chronic_conditions: List[str] + current_medications: List[Dict[str, str]] + recent_visits: List[Dict[str, Any]] + emergency_contacts: List[Dict[str, str]] + insurance_status: str + + +class TriageAssessment(BaseModel): + """Triage assessment and recommendations""" + assessment_id: str + patient_id: str + triage_level: TriageLevel + chief_complaint: str + recommended_department: str + estimated_wait_time: int # minutes + required_resources: List[str] + clinical_notes: str + red_flags: List[str] + + +class ResourceAllocation(BaseModel): + """Hospital resource allocation""" + allocation_id: str + patient_id: str + assigned_bed: Optional[str] + assigned_staff: List[str] + equipment_needed: List[str] + department: str + priority_score: float + estimated_treatment_time: int # minutes + + +# Healthcare-specific tools +@tool +def capture_vital_signs(patient_id: str) -> Dict: + """Capture patient vital signs from monitoring devices""" + # Simulate vital signs capture + return { + "patient_id": patient_id, + "timestamp": datetime.now().isoformat(), + "heart_rate": 78, + "blood_pressure_systolic": 130, + "blood_pressure_diastolic": 85, + "respiratory_rate": 16, + "temperature": 37.2, + "oxygen_saturation": 98, + "pain_scale": 6, + "consciousness_level": "alert" + } + + +@tool +def retrieve_medical_history(patient_id: str, consent_verified: bool = True) -> Dict: + """Retrieve patient medical history from EMR system""" + if not consent_verified: + return {"error": "consent_required", "message": "Patient consent required for EMR access"} + + # Simulate EMR retrieval + return { + "patient_id": patient_id, + "allergies": ["penicillin", "latex"], + "chronic_conditions": ["hypertension", "diabetes_type2"], + "current_medications": [ + {"name": "metformin", "dose": "500mg", "frequency": "twice daily"}, + {"name": "lisinopril", "dose": "10mg", "frequency": "once daily"} + ], + "recent_visits": [ + {"date": "2024-02-15", "reason": "routine_checkup", "department": "primary_care"} + ], + "emergency_contacts": [ + {"name": "Jane Doe", "relationship": "spouse", "phone": "555-0123"} + ], + "insurance_status": "active" + } + + +@tool +def calculate_triage_priority(vital_signs: Dict, medical_history: Dict, chief_complaint: str) -> Dict: + """Calculate triage priority based on clinical indicators""" + # Simulate triage algorithm + priority_score = 0 + red_flags = [] + + # Check vital signs + if vital_signs["heart_rate"] > 120 or vital_signs["heart_rate"] < 50: + priority_score += 3 + red_flags.append("abnormal_heart_rate") + + if vital_signs["oxygen_saturation"] < 92: + priority_score += 4 + red_flags.append("low_oxygen") + + if vital_signs["consciousness_level"] != "alert": + priority_score += 5 + red_flags.append("altered_consciousness") + + # Determine triage level + if priority_score >= 8: + triage_level = TriageLevel.RESUSCITATION + department = "emergency" + wait_time = 0 + elif priority_score >= 5: + triage_level = TriageLevel.EMERGENT + department = "emergency" + wait_time = 10 + elif priority_score >= 3: + triage_level = TriageLevel.URGENT + department = "urgent_care" + wait_time = 30 + else: + triage_level = TriageLevel.LESS_URGENT + department = "primary_care" + wait_time = 60 + + return { + "assessment_id": f"TRG-{datetime.now().strftime('%Y%m%d%H%M%S')}", + "patient_id": vital_signs["patient_id"], + "triage_level": triage_level.value, + "chief_complaint": chief_complaint, + "recommended_department": department, + "estimated_wait_time": wait_time, + "required_resources": ["physician", "nurse", "monitoring"], + "clinical_notes": f"Priority score: {priority_score}", + "red_flags": red_flags + } + + +@tool +def allocate_resources(patient_id: str, triage_level: str, department: str) -> Dict: + """Allocate hospital resources based on triage assessment""" + # Simulate resource allocation + if "resuscitation" in triage_level: + bed = "ER-01" + staff = ["Dr. Smith", "Nurse Johnson", "Nurse Williams"] + equipment = ["cardiac_monitor", "defibrillator", "crash_cart"] + priority = 10.0 + elif "emergent" in triage_level: + bed = "ER-05" + staff = ["Dr. Brown", "Nurse Davis"] + equipment = ["vital_signs_monitor", "IV_pump"] + priority = 8.0 + else: + bed = "UC-12" + staff = ["PA Miller", "Nurse Garcia"] + equipment = ["basic_monitor"] + priority = 5.0 + + return { + "allocation_id": f"RES-{datetime.now().strftime('%Y%m%d%H%M%S')}", + "patient_id": patient_id, + "assigned_bed": bed, + "assigned_staff": staff, + "equipment_needed": equipment, + "department": department, + "priority_score": priority, + "estimated_treatment_time": 45 + } + + +# Healthcare agent definitions with SLA requirements +vital_signs_agent = Agent( + name="VitalSignsCapture", + instructions="""You are a clinical vital signs specialist. + Capture and interpret patient vital signs from various monitoring devices. + Identify abnormal values and trends that require immediate attention. + Follow HIPAA guidelines for patient data handling. + SLA: Capture and process within 30 seconds.""", + tools=[capture_vital_signs] +) + +emr_agent = Agent( + name="EMRRetrieval", + instructions="""You are an EMR system specialist with HIPAA compliance expertise. + Retrieve relevant patient medical history while ensuring data privacy. + Verify patient consent and maintain audit trails. + Highlight relevant allergies and contraindications. + SLA: Retrieve records within 5 seconds.""", + tools=[retrieve_medical_history] +) + +triage_agent = Agent( + name="TriageRecommendation", + instructions="""You are an emergency triage specialist following ESI guidelines. + Assess patient severity and provide triage recommendations. + Consider vital signs, medical history, and presenting symptoms. + Identify red flags that require immediate intervention. + SLA: Complete assessment within 1 minute.""", + tools=[calculate_triage_priority] +) + +resource_agent = Agent( + name="ResourceAllocator", + instructions="""You are a hospital resource management specialist. + Allocate beds, staff, and equipment based on patient needs and priority. + Optimize resource utilization while maintaining quality of care. + Coordinate between departments for efficient patient flow. + SLA: Allocate resources within 30 seconds.""", + tools=[allocate_resources] +) + + +# Healthcare workflow with safety protocols +def emergency_triage_workflow(patient_id: str, chief_complaint: str): + """ + Complete emergency triage workflow from arrival to resource allocation + Includes safety checks and fallback protocols + """ + + workflow_result = { + "patient_id": patient_id, + "timestamp": datetime.now().isoformat(), + "status": "in_progress", + "safety_checks": [], + "clinical_pathway": [] + } + + # Step 1: Vital signs capture with validation + try: + vital_signs = vital_signs_agent.start( + f"Capture vital signs for patient {patient_id}" + ) + workflow_result["clinical_pathway"].append({ + "step": "vital_signs", + "status": "completed", + "data": vital_signs + }) + + # Safety check: Validate vital signs are within possible ranges + if vital_signs.get("heart_rate", 0) < 20 or vital_signs.get("heart_rate", 0) > 300: + workflow_result["safety_checks"].append({ + "type": "vital_signs_validation", + "issue": "impossible_heart_rate", + "action": "manual_verification_required" + }) + except Exception as e: + # Fallback: Manual vital signs entry + workflow_result["clinical_pathway"].append({ + "step": "vital_signs", + "status": "manual_entry_required", + "error": str(e) + }) + vital_signs = {"manual_entry": True} + + # Step 2: EMR retrieval with consent check + try: + medical_history = emr_agent.start( + f"Retrieve medical history for patient {patient_id} with verified consent" + ) + workflow_result["clinical_pathway"].append({ + "step": "emr_retrieval", + "status": "completed", + "records_retrieved": True + }) + + # Safety check: Allergy alerts + if medical_history.get("allergies"): + workflow_result["safety_checks"].append({ + "type": "allergy_alert", + "allergies": medical_history["allergies"], + "action": "notify_all_providers" + }) + except Exception as e: + # Fallback: Proceed with limited information + medical_history = { + "patient_id": patient_id, + "allergies": [], + "note": "EMR_unavailable_proceed_with_caution" + } + workflow_result["safety_checks"].append({ + "type": "emr_unavailable", + "action": "collect_history_manually" + }) + + # Step 3: Triage assessment + try: + triage_assessment = triage_agent.start( + f"Assess triage for patient with vitals {vital_signs}, " + f"history {medical_history}, complaint: {chief_complaint}" + ) + workflow_result["clinical_pathway"].append({ + "step": "triage_assessment", + "status": "completed", + "triage_level": triage_assessment.get("triage_level") + }) + + # Safety check: Red flags requiring immediate action + if triage_assessment.get("red_flags"): + workflow_result["safety_checks"].append({ + "type": "clinical_red_flags", + "flags": triage_assessment["red_flags"], + "action": "immediate_physician_notification" + }) + except Exception as e: + # Fallback: Conservative triage (assume urgent) + triage_assessment = { + "triage_level": TriageLevel.URGENT.value, + "recommended_department": "emergency", + "fallback": True + } + + # Step 4: Resource allocation + try: + resource_allocation = resource_agent.start( + f"Allocate resources for patient {patient_id} " + f"with triage {triage_assessment.get('triage_level')} " + f"to {triage_assessment.get('recommended_department')}" + ) + workflow_result["clinical_pathway"].append({ + "step": "resource_allocation", + "status": "completed", + "bed_assigned": resource_allocation.get("assigned_bed"), + "staff_assigned": resource_allocation.get("assigned_staff") + }) + except Exception as e: + # Fallback: Queue for next available + workflow_result["clinical_pathway"].append({ + "step": "resource_allocation", + "status": "queued", + "queue_position": "next_available", + "error": str(e) + }) + + workflow_result["status"] = "completed" + return workflow_result + + +# HIPAA-compliant data handling patterns +class HIPAACompliancePatterns: + """ + Reusable patterns for HIPAA-compliant healthcare workflows + """ + + @staticmethod + def anonymize_patient_data(data: Dict) -> Dict: + """Remove PHI for analytics and reporting""" + safe_fields = ["triage_level", "department", "wait_time", "resource_type"] + return {k: v for k, v in data.items() if k in safe_fields} + + @staticmethod + def audit_log_access(user_id: str, patient_id: str, action: str, reason: str): + """Create audit trail for EMR access""" + return { + "timestamp": datetime.now().isoformat(), + "user_id": user_id, + "patient_id": patient_id, + "action": action, + "reason": reason, + "ip_address": "10.0.0.1", # Would be actual IP + "session_id": "SEC-" + datetime.now().strftime('%Y%m%d%H%M%S') + } + + @staticmethod + def encrypt_sensitive_data(data: Dict) -> Dict: + """Encrypt sensitive fields (placeholder for real encryption)""" + sensitive_fields = ["ssn", "dob", "address", "phone"] + encrypted = data.copy() + for field in sensitive_fields: + if field in encrypted: + encrypted[field] = f"ENCRYPTED_{field.upper()}" + return encrypted + + +# Cross-department coordination patterns +class HealthcareCoordinationPatterns: + """ + Patterns for coordinating across healthcare departments + """ + + @staticmethod + def coordinate_lab_orders(patient_id: str, triage_level: str) -> Agent: + """Create agent for lab order coordination""" + return Agent( + name="LabCoordinator", + instructions=f"""Coordinate laboratory orders for patient {patient_id}. + Priority level: {triage_level}. + Ensure STAT orders for critical patients. + Track specimen collection and result delivery. + SLA: Order placement within 2 minutes.""" + ) + + @staticmethod + def coordinate_radiology(patient_id: str, imaging_type: str) -> Agent: + """Create agent for radiology coordination""" + return Agent( + name="RadiologyCoordinator", + instructions=f"""Schedule {imaging_type} imaging for patient {patient_id}. + Check for contrast allergies and pregnancy status. + Coordinate with transport for patient movement. + SLA: Schedule within 5 minutes for urgent cases.""" + ) + + @staticmethod + def coordinate_pharmacy(patient_id: str, medications: List[str]) -> Agent: + """Create agent for pharmacy coordination""" + return Agent( + name="PharmacyCoordinator", + instructions=f"""Verify and dispense medications for patient {patient_id}. + Check for drug interactions and allergies. + Ensure proper dosing based on patient parameters. + SLA: Medication verification within 3 minutes.""" + ) + + +# Example usage +if __name__ == "__main__": + # Process emergency patient + patient_id = "PT-2024-00123" + chief_complaint = "severe chest pain radiating to left arm" + + result = emergency_triage_workflow(patient_id, chief_complaint) + print("Triage workflow result:", result) + + # Create specialized coordinators + lab_coordinator = HealthcareCoordinationPatterns.coordinate_lab_orders( + patient_id, TriageLevel.EMERGENT.value + ) + + radiology_coordinator = HealthcareCoordinationPatterns.coordinate_radiology( + patient_id, "chest_xray" + ) + + pharmacy_coordinator = HealthcareCoordinationPatterns.coordinate_pharmacy( + patient_id, ["aspirin", "nitroglycerin"] + ) + + # HIPAA compliance example + audit_log = HIPAACompliancePatterns.audit_log_access( + "DR-001", patient_id, "EMR_ACCESS", "emergency_triage" + ) + print("Audit log created:", audit_log) \ No newline at end of file diff --git a/examples/cookbooks/Industry_Templates/manufacturing_template.py b/examples/cookbooks/Industry_Templates/manufacturing_template.py new file mode 100644 index 000000000..65a938bad --- /dev/null +++ b/examples/cookbooks/Industry_Templates/manufacturing_template.py @@ -0,0 +1,298 @@ +""" +Manufacturing Industry Template +================================ +Order scheduling, inventory management, and quality inspection workflow +Based on SRAO Framework patterns with 70% cross-industry reuse + +Key agents: +- ParseOrder: Extracts order details from various formats +- CheckInventory: Validates material availability +- OptimizeSchedule: Optimizes production scheduling +- DefectDetect: Quality control and defect detection +""" + +from praisonaiagents import Agent, tool +from typing import Dict, List, Any +from pydantic import BaseModel + + +# I/O Schemas +class OrderDetails(BaseModel): + """Structured order information""" + order_id: str + customer_id: str + product_id: str + quantity: int + priority: str = "normal" # normal, high, urgent + due_date: str + requirements: Dict[str, Any] = {} + + +class InventoryStatus(BaseModel): + """Material availability status""" + material_id: str + available_quantity: int + reorder_level: int + supplier_lead_time: int + location: str + + +class ProductionSchedule(BaseModel): + """Optimized production schedule""" + schedule_id: str + production_line: str + start_time: str + end_time: str + assigned_workers: List[str] + estimated_completion: str + bottleneck_analysis: Dict[str, Any] = {} + + +class QualityReport(BaseModel): + """Quality inspection results""" + batch_id: str + inspection_time: str + defect_rate: float + defect_types: List[str] + passed: bool + recommendations: List[str] + + +# Tools for agents +@tool +def extract_order_details(text: str) -> Dict: + """Extract structured order information from unstructured text""" + # Simulate order parsing logic + return { + "order_id": "ORD-2024-001", + "customer_id": "CUST-123", + "product_id": "PROD-456", + "quantity": 100, + "priority": "high", + "due_date": "2024-03-15", + "requirements": {"material": "steel", "finish": "polished"} + } + + +@tool +def check_material_availability(product_id: str, quantity: int) -> Dict: + """Check if materials are available for production""" + # Simulate inventory check + return { + "material_id": "MAT-789", + "available_quantity": 500, + "reorder_level": 100, + "supplier_lead_time": 7, + "location": "Warehouse-A" + } + + +@tool +def calculate_optimal_schedule(order: Dict, inventory: Dict) -> Dict: + """Calculate optimal production schedule based on constraints""" + # Simulate scheduling optimization + return { + "schedule_id": "SCH-2024-001", + "production_line": "Line-3", + "start_time": "2024-03-10T08:00:00", + "end_time": "2024-03-14T17:00:00", + "assigned_workers": ["Worker-001", "Worker-002"], + "estimated_completion": "2024-03-14T16:30:00", + "bottleneck_analysis": { + "critical_path": "Assembly", + "buffer_time": 2.5 + } + } + + +@tool +def perform_quality_inspection(batch_id: str) -> Dict: + """Perform automated quality inspection on production batch""" + # Simulate quality control + return { + "batch_id": batch_id, + "inspection_time": "2024-03-14T17:00:00", + "defect_rate": 0.02, + "defect_types": ["surface_scratch", "dimension_variance"], + "passed": True, + "recommendations": [ + "Adjust machine calibration on Line-3", + "Increase inspection frequency for next batch" + ] + } + + +# Agent Definitions with SLA requirements +order_parser = Agent( + name="ParseOrder", + instructions="""You are an order parsing specialist. Extract and structure order information + from various formats (email, PDF, API). Ensure all required fields are captured and validated. + SLA: Process within 30 seconds per order.""", + tools=[extract_order_details] +) + +inventory_checker = Agent( + name="CheckInventory", + instructions="""You are an inventory management specialist. Verify material availability, + check reorder points, and identify potential supply chain issues. + SLA: Real-time response within 5 seconds.""", + tools=[check_material_availability] +) + +schedule_optimizer = Agent( + name="OptimizeSchedule", + instructions="""You are a production scheduling expert. Optimize production schedules + based on order priority, resource availability, and constraints. Minimize idle time + and maximize throughput. + SLA: Generate schedule within 2 minutes for complex orders.""", + tools=[calculate_optimal_schedule] +) + +quality_inspector = Agent( + name="DefectDetect", + instructions="""You are a quality control specialist. Perform automated inspections, + detect defects using vision analysis, and provide improvement recommendations. + SLA: Complete inspection within 1 minute per batch.""", + tools=[perform_quality_inspection] +) + + +# Workflow with fallback strategies +def manufacturing_workflow(order_text: str): + """ + Complete manufacturing workflow from order to quality control + Includes fallback strategies for each critical step + """ + + # Step 1: Parse order with fallback + try: + order_details = order_parser.start(f"Parse this order: {order_text}") + if not order_details: + # Fallback: Manual review queue + return {"status": "manual_review_required", "reason": "order_parsing_failed"} + except Exception as e: + return {"status": "error", "stage": "order_parsing", "error": str(e)} + + # Step 2: Check inventory with fallback + try: + inventory_status = inventory_checker.start( + f"Check materials for order: {order_details}" + ) + if not inventory_status: + # Fallback: Alternative supplier check + return {"status": "alternative_supplier_needed", "order": order_details} + except Exception as e: + # Fallback: Conservative estimate with safety margin + inventory_status = {"estimated": True, "safety_margin": 1.5} + + # Step 3: Optimize schedule with fallback + try: + schedule = schedule_optimizer.start( + f"Create schedule for order {order_details} with inventory {inventory_status}" + ) + if not schedule: + # Fallback: Standard scheduling template + schedule = {"template": "standard", "priority": "normal"} + except Exception as e: + # Fallback: Queue for next available slot + schedule = {"status": "queued", "estimated_start": "next_available"} + + # Step 4: Quality inspection with fallback + try: + quality_report = quality_inspector.start( + f"Inspect batch for schedule {schedule}" + ) + if not quality_report: + # Fallback: Manual inspection required + quality_report = {"requires_manual": True, "inspector_assigned": "QC-Team-1"} + except Exception as e: + # Fallback: Quarantine for review + quality_report = {"status": "quarantined", "review_required": True} + + return { + "status": "completed", + "order": order_details, + "inventory": inventory_status, + "schedule": schedule, + "quality": quality_report + } + + +# Cross-industry reusable patterns (70% reuse) +class IndustryAgentPattern: + """ + Base pattern reusable across industries + Provides common agent capabilities that can be specialized + """ + + @staticmethod + def create_data_parser(name: str, domain: str, sla_seconds: int = 30): + """Create a data parsing agent for any domain""" + return Agent( + name=name, + instructions=f"""You are a {domain} data parsing specialist. + Extract and structure information from various formats. + Validate all required fields according to {domain} standards. + SLA: Process within {sla_seconds} seconds.""" + ) + + @staticmethod + def create_resource_checker(name: str, resource_type: str, sla_seconds: int = 5): + """Create a resource availability checker for any resource type""" + return Agent( + name=name, + instructions=f"""You are a {resource_type} availability specialist. + Verify {resource_type} availability and identify constraints. + Provide real-time status and alternative options if needed. + SLA: Response within {sla_seconds} seconds.""" + ) + + @staticmethod + def create_optimizer(name: str, optimization_target: str, sla_minutes: int = 2): + """Create an optimization agent for any target metric""" + return Agent( + name=name, + instructions=f"""You are an optimization expert for {optimization_target}. + Find optimal solutions considering all constraints and priorities. + Minimize waste and maximize efficiency. + SLA: Generate solution within {sla_minutes} minutes.""" + ) + + @staticmethod + def create_inspector(name: str, inspection_type: str, sla_minutes: int = 1): + """Create an inspection agent for any quality metric""" + return Agent( + name=name, + instructions=f"""You are a {inspection_type} inspection specialist. + Perform automated analysis and detect anomalies. + Provide actionable recommendations for improvement. + SLA: Complete inspection within {sla_minutes} minute(s).""" + ) + + +# Example usage +if __name__ == "__main__": + # Create a manufacturing workflow instance + order = "Customer ABC needs 100 units of Product-XYZ by March 15th, high priority" + + # Option 1: Use specialized agents + result = manufacturing_workflow(order) + print("Manufacturing workflow result:", result) + + # Option 2: Use reusable patterns for other industries + # These patterns can be adapted for Energy, Healthcare, Agriculture, Transportation + energy_parser = IndustryAgentPattern.create_data_parser( + "EnergyDataParser", "wind farm telemetry", sla_seconds=10 + ) + + healthcare_checker = IndustryAgentPattern.create_resource_checker( + "BedAvailability", "hospital bed", sla_seconds=3 + ) + + agriculture_optimizer = IndustryAgentPattern.create_optimizer( + "IrrigationOptimizer", "water usage", sla_minutes=5 + ) + + transport_inspector = IndustryAgentPattern.create_inspector( + "TunnelSafety", "structural integrity", sla_minutes=2 + ) \ No newline at end of file diff --git a/examples/cookbooks/Industry_Templates/transportation_template.py b/examples/cookbooks/Industry_Templates/transportation_template.py new file mode 100644 index 000000000..d106dc808 --- /dev/null +++ b/examples/cookbooks/Industry_Templates/transportation_template.py @@ -0,0 +1,599 @@ +""" +Transportation Industry Template +================================ +Tunnel safety monitoring and infrastructure management workflow +Based on SRAO Framework with LiDAR and sensor fusion + +Key agents: +- LiDARFusion: Processes LiDAR point cloud data +- DisplacementCalculator: Calculates structural displacement +- HeatmapGenerator: Generates safety heatmaps +- MaintenancePlanner: Plans infrastructure maintenance +""" + +from praisonaiagents import Agent, tool +from typing import Dict, List, Any, Tuple, Optional +from pydantic import BaseModel +from datetime import datetime, timedelta +from enum import Enum +import math + + +# Safety levels for infrastructure +class SafetyLevel(str, Enum): + SAFE = "safe" + CAUTION = "caution" + WARNING = "warning" + DANGER = "danger" + CRITICAL = "critical" + + +# Infrastructure types +class InfrastructureType(str, Enum): + TUNNEL = "tunnel" + BRIDGE = "bridge" + HIGHWAY = "highway" + RAILWAY = "railway" + AIRPORT = "airport" + + +# I/O Schemas +class LiDARScan(BaseModel): + """LiDAR scan data for infrastructure""" + scan_id: str + infrastructure_id: str + scan_time: str + point_cloud_size: int # Number of points + scan_resolution: float # Points per square meter + coverage_area: float # Square meters + anomaly_points: List[Tuple[float, float, float]] # XYZ coordinates + reference_baseline: Optional[Dict[str, Any]] + + +class DisplacementAnalysis(BaseModel): + """Structural displacement analysis""" + analysis_id: str + infrastructure_id: str + analysis_time: str + max_displacement: float # millimeters + displacement_vector: Tuple[float, float, float] # XYZ displacement + critical_zones: List[Dict[str, Any]] + settlement_rate: float # mm/month + tilt_angle: float # degrees + safety_factor: float + + +class SafetyHeatmap(BaseModel): + """Infrastructure safety heatmap""" + heatmap_id: str + infrastructure_id: str + generation_time: str + grid_resolution: float # meters + safety_scores: List[List[float]] # 2D grid of safety scores + critical_areas: List[Dict[str, Any]] + overall_safety: SafetyLevel + risk_trends: Dict[str, Any] + + +class MaintenanceSchedule(BaseModel): + """Maintenance planning schedule""" + schedule_id: str + infrastructure_id: str + maintenance_type: str # preventive, corrective, emergency + priority_level: str # low, medium, high, critical + scheduled_date: str + estimated_duration: int # hours + required_resources: List[str] + traffic_impact: str + cost_estimate: float + + +# Transportation-specific tools +@tool +def process_lidar_scan(infrastructure_id: str, scan_type: str = "full") -> Dict: + """Process LiDAR point cloud data for structural analysis""" + # Simulate LiDAR processing + return { + "scan_id": f"LIDAR-{datetime.now().strftime('%Y%m%d%H%M')}", + "infrastructure_id": infrastructure_id, + "scan_time": datetime.now().isoformat(), + "point_cloud_size": 5000000, # 5 million points + "scan_resolution": 100.0, # points/m² + "coverage_area": 50000.0, # m² + "anomaly_points": [ + (10.5, 20.3, 5.1), + (15.2, 25.8, 5.3), + (20.1, 30.5, 5.5) + ], + "reference_baseline": { + "date": "2024-01-01", + "baseline_id": "BSL-001" + } + } + + +@tool +def calculate_displacement(current_scan: Dict, baseline_scan: Dict) -> Dict: + """Calculate structural displacement from LiDAR scans""" + # Simulate displacement calculation + anomalies = current_scan.get("anomaly_points", []) + + if len(anomalies) > 5: + max_displacement = 15.0 + safety_factor = 0.6 + safety_level = SafetyLevel.WARNING + elif len(anomalies) > 2: + max_displacement = 8.0 + safety_factor = 0.8 + safety_level = SafetyLevel.CAUTION + else: + max_displacement = 3.0 + safety_factor = 0.95 + safety_level = SafetyLevel.SAFE + + return { + "analysis_id": f"DISP-{datetime.now().strftime('%Y%m%d%H%M')}", + "infrastructure_id": current_scan["infrastructure_id"], + "analysis_time": datetime.now().isoformat(), + "max_displacement": max_displacement, + "displacement_vector": (2.5, 1.8, max_displacement), + "critical_zones": [ + { + "zone_id": "CZ-001", + "location": "tunnel_crown", + "displacement": max_displacement * 0.8, + "risk": "high" if max_displacement > 10 else "medium" + } + ], + "settlement_rate": max_displacement / 30, # mm/month + "tilt_angle": math.degrees(math.atan(max_displacement / 1000)), + "safety_factor": safety_factor + } + + +@tool +def generate_safety_heatmap(displacement_data: Dict, sensor_data: List[Dict]) -> Dict: + """Generate safety heatmap visualization for infrastructure""" + # Simulate heatmap generation + safety_factor = displacement_data.get("safety_factor", 1.0) + + # Create grid (simplified 10x10) + grid_size = 10 + safety_grid = [] + critical_areas = [] + + for i in range(grid_size): + row = [] + for j in range(grid_size): + # Generate safety score based on position and displacement + base_score = safety_factor + distance_factor = math.sqrt((i-5)**2 + (j-5)**2) / 10 + safety_score = min(1.0, base_score + (0.2 * distance_factor)) + row.append(safety_score) + + if safety_score < 0.7: + critical_areas.append({ + "grid_position": (i, j), + "safety_score": safety_score, + "recommended_action": "immediate_inspection" + }) + safety_grid.append(row) + + # Determine overall safety + avg_safety = sum(sum(row) for row in safety_grid) / (grid_size * grid_size) + if avg_safety > 0.9: + overall = SafetyLevel.SAFE + elif avg_safety > 0.8: + overall = SafetyLevel.CAUTION + elif avg_safety > 0.7: + overall = SafetyLevel.WARNING + else: + overall = SafetyLevel.DANGER + + return { + "heatmap_id": f"HEAT-{datetime.now().strftime('%Y%m%d%H%M')}", + "infrastructure_id": displacement_data["infrastructure_id"], + "generation_time": datetime.now().isoformat(), + "grid_resolution": 5.0, # meters + "safety_scores": safety_grid, + "critical_areas": critical_areas, + "overall_safety": overall.value, + "risk_trends": { + "trend": "deteriorating" if avg_safety < 0.8 else "stable", + "rate": -0.02 if avg_safety < 0.8 else 0.0 + } + } + + +@tool +def plan_maintenance(infrastructure_id: str, safety_analysis: Dict, traffic_data: Dict) -> Dict: + """Plan maintenance based on safety analysis and traffic patterns""" + safety_level = safety_analysis.get("overall_safety", "safe") + + # Determine maintenance urgency + if safety_level in ["danger", "critical"]: + maintenance_type = "emergency" + priority = "critical" + days_until = 1 + duration = 48 + elif safety_level == "warning": + maintenance_type = "corrective" + priority = "high" + days_until = 7 + duration = 24 + elif safety_level == "caution": + maintenance_type = "preventive" + priority = "medium" + days_until = 30 + duration = 12 + else: + maintenance_type = "preventive" + priority = "low" + days_until = 90 + duration = 8 + + # Traffic impact assessment + peak_traffic = traffic_data.get("peak_volume", 1000) + if peak_traffic > 5000: + traffic_impact = "severe" + cost_multiplier = 2.0 + elif peak_traffic > 2000: + traffic_impact = "moderate" + cost_multiplier = 1.5 + else: + traffic_impact = "minimal" + cost_multiplier = 1.0 + + return { + "schedule_id": f"MAINT-{datetime.now().strftime('%Y%m%d%H%M')}", + "infrastructure_id": infrastructure_id, + "maintenance_type": maintenance_type, + "priority_level": priority, + "scheduled_date": (datetime.now() + timedelta(days=days_until)).isoformat(), + "estimated_duration": duration, + "required_resources": [ + "inspection_team", + "repair_crew", + "safety_equipment", + "traffic_control" + ], + "traffic_impact": traffic_impact, + "cost_estimate": 50000 * cost_multiplier * (duration / 24) + } + + +# Transportation agent definitions +lidar_agent = Agent( + name="LiDARFusion", + instructions="""You are a LiDAR data processing specialist for infrastructure. + Process point cloud data from mobile and terrestrial LiDAR scanners. + Perform data fusion from multiple sensor sources. + Identify structural anomalies and deformations. + SLA: Process scan data within 5 minutes.""", + tools=[process_lidar_scan] +) + +displacement_agent = Agent( + name="DisplacementCalculator", + instructions="""You are a structural displacement analysis expert. + Calculate precise displacement measurements from temporal scans. + Analyze settlement patterns and structural movements. + Predict future displacement trends using ML models. + SLA: Complete analysis within 2 minutes.""", + tools=[calculate_displacement] +) + +heatmap_agent = Agent( + name="HeatmapGenerator", + instructions="""You are a safety visualization specialist. + Generate intuitive safety heatmaps for infrastructure monitoring. + Integrate multiple data sources for comprehensive risk assessment. + Highlight critical areas requiring immediate attention. + SLA: Generate heatmap within 1 minute.""", + tools=[generate_safety_heatmap] +) + +maintenance_agent = Agent( + name="MaintenancePlanner", + instructions="""You are an infrastructure maintenance planning expert. + Schedule maintenance based on structural health and traffic patterns. + Optimize maintenance windows to minimize disruption. + Coordinate resources and estimate costs. + SLA: Create maintenance plan within 3 minutes.""", + tools=[plan_maintenance] +) + + +# Transportation infrastructure monitoring workflow +def infrastructure_monitoring_workflow( + infrastructure_ids: List[str], + infrastructure_type: InfrastructureType, + traffic_data: Dict +): + """ + Complete infrastructure monitoring workflow from scanning to maintenance + Includes safety assessment and predictive maintenance + """ + + monitoring_results = { + "timestamp": datetime.now().isoformat(), + "infrastructure_type": infrastructure_type.value, + "structures_monitored": len(infrastructure_ids), + "safety_assessments": [], + "maintenance_required": [], + "emergency_alerts": [], + "network_health_score": 0.0 + } + + total_safety_score = 0.0 + + for infra_id in infrastructure_ids: + try: + # Step 1: LiDAR scanning + lidar_scan = lidar_agent.start( + f"Process LiDAR scan for {infrastructure_type.value} {infra_id}" + ) + + # Step 2: Displacement analysis + # Get baseline (would be from database in real scenario) + baseline = {"scan_id": "BSL-001", "point_cloud": []} + + displacement_analysis = displacement_agent.start( + f"Calculate displacement for scan: {lidar_scan} against baseline: {baseline}" + ) + + # Step 3: Generate safety heatmap + sensor_data = [] # Would include IoT sensor readings + safety_heatmap = heatmap_agent.start( + f"Generate heatmap for displacement: {displacement_analysis} " + f"with sensors: {sensor_data}" + ) + + monitoring_results["safety_assessments"].append({ + "infrastructure_id": infra_id, + "safety_level": safety_heatmap.get("overall_safety"), + "max_displacement": displacement_analysis.get("max_displacement"), + "critical_zones": len(safety_heatmap.get("critical_areas", [])) + }) + + # Check for emergency conditions + if safety_heatmap.get("overall_safety") in ["danger", "critical"]: + monitoring_results["emergency_alerts"].append({ + "infrastructure_id": infra_id, + "alert_level": "emergency", + "safety_status": safety_heatmap["overall_safety"], + "immediate_action": "close_to_traffic", + "notification_sent": True + }) + + # Step 4: Emergency maintenance planning + maintenance_plan = maintenance_agent.start( + f"Plan emergency maintenance for {infra_id} " + f"with safety: {safety_heatmap} and traffic: {traffic_data}" + ) + monitoring_results["maintenance_required"].append(maintenance_plan) + + elif safety_heatmap.get("overall_safety") in ["warning", "caution"]: + # Regular maintenance planning + maintenance_plan = maintenance_agent.start( + f"Plan maintenance for {infra_id} " + f"with safety: {safety_heatmap} and traffic: {traffic_data}" + ) + monitoring_results["maintenance_required"].append(maintenance_plan) + + # Calculate safety score contribution + safety_mapping = { + "safe": 1.0, "caution": 0.8, "warning": 0.6, + "danger": 0.3, "critical": 0.1 + } + total_safety_score += safety_mapping.get( + safety_heatmap.get("overall_safety", "safe"), 0.5 + ) + + except Exception as e: + # Fallback: Manual inspection + monitoring_results["emergency_alerts"].append({ + "infrastructure_id": infra_id, + "alert_level": "system_failure", + "error": str(e), + "action": "dispatch_manual_inspection_team" + }) + + # Calculate network health score + if infrastructure_ids: + monitoring_results["network_health_score"] = ( + total_safety_score / len(infrastructure_ids) * 100 + ) + + return monitoring_results + + +# Multi-modal transportation patterns +class MultiModalTransportPatterns: + """ + Patterns for different transportation infrastructure types + Demonstrates cross-industry reuse for transportation sector + """ + + @staticmethod + def adapt_for_bridge_monitoring() -> Agent: + """Adapt tunnel monitoring for bridge structures""" + return Agent( + name="BridgeMonitor", + instructions="""Monitor bridge structural health using sensors. + Track deck deflection, cable tension, and vibration patterns. + Detect fatigue cracks and corrosion. + Consider wind and seismic loads. + SLA: Real-time monitoring with 10-second updates.""" + ) + + @staticmethod + def adapt_for_railway_tracks() -> Agent: + """Adapt for railway track monitoring""" + return Agent( + name="RailwayMonitor", + instructions="""Monitor railway track geometry and condition. + Detect rail defects, gauge variations, and ballast degradation. + Analyze wheel-rail interaction forces. + Predict track maintenance needs. + SLA: Process track inspection data within 3 minutes per km.""" + ) + + @staticmethod + def adapt_for_airport_runways() -> Agent: + """Adapt for airport runway monitoring""" + return Agent( + name="RunwayMonitor", + instructions="""Monitor airport runway surface conditions. + Detect FOD (Foreign Object Debris) and surface irregularities. + Assess pavement condition and friction levels. + Monitor drainage and lighting systems. + SLA: Complete runway scan within 10 minutes.""" + ) + + +# Intelligent traffic management integration +class TrafficManagementPatterns: + """ + Patterns for integrating with intelligent traffic systems + """ + + @staticmethod + def optimize_traffic_flow(maintenance_window: Dict, traffic_patterns: Dict) -> Dict: + """Optimize traffic flow during maintenance""" + peak_hours = traffic_patterns.get("peak_hours", [7, 9, 17, 19]) + maintenance_start = maintenance_window.get("start_time", "02:00") + + # Calculate optimal diversion routes + return { + "strategy": "dynamic_routing", + "diversion_routes": [ + {"route_id": "ALT-1", "capacity": 0.6, "added_time": 15}, + {"route_id": "ALT-2", "capacity": 0.3, "added_time": 20} + ], + "lane_management": { + "reversible_lanes": True, + "peak_direction": "inbound" if int(maintenance_start[:2]) < 12 else "outbound" + }, + "public_transport": { + "increase_frequency": True, + "additional_services": 4 + }, + "estimated_delay": 10 # minutes + } + + @staticmethod + def incident_response_plan(incident_type: str, location: Dict) -> Dict: + """Generate incident response plan""" + response_times = { + "accident": 5, + "breakdown": 10, + "infrastructure_failure": 2, + "weather_hazard": 15 + } + + return { + "incident_id": f"INC-{datetime.now().strftime('%Y%m%d%H%M')}", + "response_time": response_times.get(incident_type, 10), + "resources_dispatched": [ + r for r in [ + "emergency_response_team", + "traffic_control_unit", + "maintenance_crew" if "infrastructure" in incident_type else None + ] if r is not None + ], + "traffic_management": { + "lane_closures": 2, + "speed_reduction": 30, # km/h + "message_signs": ["Incident ahead", "Reduce speed", "Merge left"] + }, + "estimated_clearance": 45 # minutes + } + + +# Predictive maintenance patterns +class PredictiveMaintenancePatterns: + """ + ML-based predictive maintenance for transportation infrastructure + """ + + @staticmethod + def predict_failure_probability( + displacement_trend: List[float], + environmental_factors: Dict + ) -> float: + """Predict probability of structural failure""" + # Simplified prediction model + trend_slope = (displacement_trend[-1] - displacement_trend[0]) / len(displacement_trend) + temperature_factor = abs(environmental_factors.get("temp_variation", 0)) / 50 + moisture_factor = environmental_factors.get("humidity", 50) / 100 + + base_probability = 0.1 + trend_contribution = max(0, trend_slope * 10) + env_contribution = (temperature_factor + moisture_factor) * 0.2 + + return min(1.0, base_probability + trend_contribution + env_contribution) + + @staticmethod + def optimize_maintenance_schedule( + infrastructure_conditions: List[Dict], + budget_constraint: float + ) -> List[Dict]: + """Optimize maintenance schedule across infrastructure network""" + # Sort by urgency (safety_factor * cost_efficiency) + prioritized = sorted( + infrastructure_conditions, + key=lambda x: (1 - x.get("safety_factor", 1)) * (1 / x.get("cost", 1)), + reverse=True + ) + + scheduled = [] + total_cost = 0 + + for infra in prioritized: + if total_cost + infra.get("cost", 0) <= budget_constraint: + scheduled.append({ + "infrastructure_id": infra["id"], + "scheduled_date": infra["recommended_date"], + "priority": "high" if infra["safety_factor"] < 0.7 else "medium" + }) + total_cost += infra.get("cost", 0) + + return scheduled + + +# Example usage +if __name__ == "__main__": + # Monitor tunnel network + tunnel_ids = [f"TUN-{i:03d}" for i in range(1, 6)] + traffic = { + "peak_volume": 3000, + "off_peak_volume": 800, + "peak_hours": [7, 9, 17, 19] + } + + result = infrastructure_monitoring_workflow( + tunnel_ids, + InfrastructureType.TUNNEL, + traffic + ) + print("Infrastructure monitoring result:", result) + + # Multi-modal adaptation examples + bridge_monitor = MultiModalTransportPatterns.adapt_for_bridge_monitoring() + railway_monitor = MultiModalTransportPatterns.adapt_for_railway_tracks() + runway_monitor = MultiModalTransportPatterns.adapt_for_airport_runways() + + # Traffic management example + maintenance = {"start_time": "22:00", "duration": 8} + traffic_plan = TrafficManagementPatterns.optimize_traffic_flow(maintenance, traffic) + print("Traffic optimization plan:", traffic_plan) + + # Predictive maintenance example + displacement_history = [2.0, 2.3, 2.8, 3.5, 4.3] # mm over months + environment = {"temp_variation": 30, "humidity": 75} + + failure_prob = PredictiveMaintenancePatterns.predict_failure_probability( + displacement_history, environment + ) + print(f"Failure probability: {failure_prob:.2%}") \ No newline at end of file