Source code for clustrix.cost_providers.aws

"""
AWS cost monitoring implementation.
"""

import logging
from datetime import datetime
from typing import Dict, List, Any

from ..cost_monitoring import BaseCostMonitor, ResourceUsage, CostEstimate
from ..pricing_clients.aws_pricing import AWSPricingClient

logger = logging.getLogger(__name__)


[docs] class AWSCostMonitor(BaseCostMonitor): """Cost monitoring for AWS instances (EC2, Batch, etc.)."""
[docs] def __init__(self, region: str = "us-east-1", use_pricing_api: bool = True): super().__init__("AWS") self.region = region self.use_pricing_api = use_pricing_api # Initialize pricing client self.pricing_client = AWSPricingClient() if use_pricing_api else None # AWS EC2 On-Demand pricing (us-east-1, as of 2025, approximate rates in USD/hour) # These serve as fallback when API is unavailable self.ec2_pricing = { # General Purpose "t3.micro": 0.0104, "t3.small": 0.0208, "t3.medium": 0.0416, "t3.large": 0.0832, "t3.xlarge": 0.1664, "t3.2xlarge": 0.3328, # Compute Optimized "c5.large": 0.085, "c5.xlarge": 0.17, "c5.2xlarge": 0.34, "c5.4xlarge": 0.68, "c5.9xlarge": 1.53, "c5.18xlarge": 3.06, # Memory Optimized "r5.large": 0.126, "r5.xlarge": 0.252, "r5.2xlarge": 0.504, "r5.4xlarge": 1.008, "r5.8xlarge": 2.016, "r5.16xlarge": 4.032, # GPU Instances "p3.2xlarge": 3.06, # 1 V100 "p3.8xlarge": 12.24, # 4 V100 "p3.16xlarge": 24.48, # 8 V100 "p4d.24xlarge": 32.77, # 8 A100 "g4dn.xlarge": 0.526, # 1 T4 "g4dn.2xlarge": 0.752, # 1 T4 "g4dn.4xlarge": 1.204, # 1 T4 "g4dn.8xlarge": 2.176, # 1 T4 "g4dn.12xlarge": 3.912, # 4 T4 "g4dn.16xlarge": 4.352, # 1 T4 # Default fallback "default": 0.10, } # Spot instance discount factors (approximate) self.spot_discounts = { "t3": 0.7, # ~30% discount "c5": 0.65, # ~35% discount "r5": 0.6, # ~40% discount "p3": 0.3, # ~70% discount "p4d": 0.35, # ~65% discount "g4dn": 0.4, # ~60% discount "default": 0.7, # ~30% discount } # Instance metadata self.instance_metadata = { "p3.2xlarge": { "gpus": 1, "gpu_type": "V100", "gpu_memory": "16GB", "cpu_cores": 8, "ram": "61GB", }, "p3.8xlarge": { "gpus": 4, "gpu_type": "V100", "gpu_memory": "64GB", "cpu_cores": 32, "ram": "244GB", }, "p3.16xlarge": { "gpus": 8, "gpu_type": "V100", "gpu_memory": "128GB", "cpu_cores": 64, "ram": "488GB", }, "p4d.24xlarge": { "gpus": 8, "gpu_type": "A100", "gpu_memory": "320GB", "cpu_cores": 96, "ram": "1152GB", }, "g4dn.xlarge": { "gpus": 1, "gpu_type": "T4", "gpu_memory": "16GB", "cpu_cores": 4, "ram": "16GB", }, "g4dn.2xlarge": { "gpus": 1, "gpu_type": "T4", "gpu_memory": "16GB", "cpu_cores": 8, "ram": "32GB", }, "g4dn.12xlarge": { "gpus": 4, "gpu_type": "T4", "gpu_memory": "64GB", "cpu_cores": 48, "ram": "192GB", }, }
[docs] def get_resource_usage(self) -> ResourceUsage: """Get current resource utilization for AWS instance.""" # Get CPU and memory usage cpu_percent, mem_used_mb, mem_total_mb, mem_percent = ( self.get_cpu_memory_usage() ) # Get GPU utilization (if available) gpu_stats = self.get_gpu_utilization() return ResourceUsage( cpu_percent=cpu_percent, memory_used_mb=mem_used_mb, memory_total_mb=mem_total_mb, memory_percent=mem_percent, gpu_stats=gpu_stats, )
[docs] def estimate_cost( self, instance_type: str, hours_used: float, use_spot: bool = False ) -> CostEstimate: """Estimate cost for AWS instance usage.""" hourly_rate = None pricing_source = "hardcoded" pricing_warning = None # Try to get pricing from API first if self.pricing_client and not use_spot: try: hourly_rate = self.pricing_client.get_instance_pricing( instance_type=instance_type, region=self.region ) if hourly_rate: logger.debug( f"Got pricing for {instance_type} from API: ${hourly_rate}/hr" ) pricing_source = "api" except Exception as e: logger.debug(f"Failed to get pricing from API: {e}") # Fall back to hardcoded pricing if API failed if hourly_rate is None: hourly_rate = self.ec2_pricing.get( instance_type, self.ec2_pricing["default"] ) pricing_source = "hardcoded" if self.pricing_client and self.pricing_client.is_pricing_data_outdated(): pricing_warning = ( f"Using potentially outdated pricing data (last updated: " f"{self.pricing_client._hardcoded_pricing_date}). " f"Current prices may differ. Consider checking AWS pricing page." ) logger.warning(pricing_warning) # Apply spot discount if requested if use_spot: instance_family = instance_type.split(".")[0] discount_factor = self.spot_discounts.get( instance_family, self.spot_discounts["default"] ) hourly_rate *= discount_factor if pricing_source == "hardcoded": pricing_warning = ( pricing_warning or "" ) + " Spot pricing is estimated and may vary significantly." # Calculate estimated cost estimated_cost = hourly_rate * hours_used pricing_type = "Spot" if use_spot else "On-Demand" return CostEstimate( instance_type=f"{instance_type} ({pricing_type})", hourly_rate=hourly_rate, hours_used=hours_used, estimated_cost=estimated_cost, currency="USD", last_updated=datetime.now(), pricing_source=pricing_source, pricing_warning=pricing_warning, )
[docs] def get_pricing_info(self) -> Dict[str, float]: """Get AWS EC2 pricing information.""" # For now, return hardcoded pricing with a warning if outdated # A full implementation would query the API for all instance types if self.pricing_client and self.pricing_client.is_pricing_data_outdated(): logger.warning( "Pricing data may be outdated. Consider refreshing from AWS Pricing API." ) return self.ec2_pricing.copy()
[docs] def get_spot_pricing_info(self) -> Dict[str, float]: """Get estimated AWS spot pricing.""" spot_pricing = {} for instance_type, on_demand_price in self.ec2_pricing.items(): if instance_type != "default": instance_family = instance_type.split(".")[0] discount_factor = self.spot_discounts.get( instance_family, self.spot_discounts["default"] ) spot_pricing[instance_type] = on_demand_price * discount_factor return spot_pricing
[docs] def get_cost_optimization_recommendations( self, resource_usage: ResourceUsage, cost_estimate: CostEstimate ) -> List[str]: """Get AWS-specific cost optimization recommendations.""" recommendations = super().get_cost_optimization_recommendations( resource_usage, cost_estimate ) # AWS-specific recommendations recommendations.extend( [ "Consider using Spot Instances for fault-tolerant workloads (up to 90% savings)", "Use Reserved Instances for predictable workloads (up to 75% savings)", "Consider AWS Batch for large-scale batch processing", "Use EBS-optimized instances for I/O intensive workloads", "Enable detailed monitoring to track resource utilization", "Consider using AWS ParallelCluster for HPC workloads", "Use S3 for data storage instead of expensive EBS volumes when possible", "Set up CloudWatch alarms for cost monitoring and auto-termination", ] ) # Instance-specific recommendations current_instance = cost_estimate.instance_type.split(" ")[ 0 ] # Remove pricing type if current_instance.startswith("p3") or current_instance.startswith("p4d"): if resource_usage.gpu_stats: avg_gpu_util = sum( gpu.get("utilization_percent", 0) for gpu in resource_usage.gpu_stats ) / len(resource_usage.gpu_stats) if avg_gpu_util < 50: recommendations.append( "Low GPU utilization on expensive GPU instance. Consider g4dn instances or CPU instances." ) return recommendations
[docs] def estimate_batch_cost( self, job_queue: str, compute_environment: str, estimated_jobs: int, avg_job_duration_hours: float, ) -> Dict[str, Any]: """Estimate costs for AWS Batch workloads.""" # This would typically integrate with AWS Batch APIs # For now, provide a basic estimation framework base_instance_cost = self.ec2_pricing.get( "c5.large", 0.085 ) # Default compute instance total_compute_hours = estimated_jobs * avg_job_duration_hours estimated_cost = total_compute_hours * base_instance_cost return { "job_queue": job_queue, "compute_environment": compute_environment, "estimated_jobs": estimated_jobs, "avg_job_duration_hours": avg_job_duration_hours, "total_compute_hours": total_compute_hours, "estimated_cost": estimated_cost, "cost_per_job": ( estimated_cost / estimated_jobs if estimated_jobs > 0 else 0 ), "recommendations": [ "Use Spot instances in Batch compute environments for additional savings", "Optimize job packaging to reduce overhead", "Monitor job efficiency and resource utilization", "Use appropriate instance types for different job characteristics", ], }
[docs] def get_region_pricing_comparison( self, instance_type: str ) -> Dict[str, Dict[str, Any]]: """Compare pricing across AWS regions (simplified).""" # Regional pricing multipliers (approximate) region_multipliers = { "us-east-1": 1.0, # N. Virginia (baseline) "us-west-2": 1.05, # Oregon "eu-west-1": 1.1, # Ireland "ap-southeast-1": 1.15, # Singapore "ap-northeast-1": 1.2, # Tokyo } base_price = self.ec2_pricing.get(instance_type, self.ec2_pricing["default"]) regional_pricing = {} for region, multiplier in region_multipliers.items(): regional_pricing[region] = { "on_demand_hourly": base_price * multiplier, "estimated_spot_hourly": base_price * multiplier * 0.7, # Rough spot estimate "region_name": region, } return regional_pricing
[docs] def get_aws_specific_metrics(self) -> Dict[str, Any]: """Get AWS-specific cost and performance metrics.""" return { "region": self.region, "availability_zone": "auto-detect", # Would detect from instance metadata "instance_lifecycle": "on-demand", # or 'spot' "ebs_optimized": True, "enhanced_networking": True, "placement_group": None, "cost_optimization_score": self._calculate_cost_optimization_score(), }
def _calculate_cost_optimization_score(self) -> float: """Calculate a cost optimization score (0-100).""" # This would analyze various factors: # - Resource utilization # - Instance type appropriateness # - Spot vs on-demand usage # - Reserved instance coverage # For now, return a placeholder return 75.0