Source code for clustrix.cost_providers.azure

"""
Azure cost monitoring implementation.
"""

import logging
from datetime import datetime
from typing import Dict, List, Any

from ..cost_monitoring import BaseCostMonitor, ResourceUsage, CostEstimate
from ..pricing_clients.azure_pricing import AzurePricingClient

logger = logging.getLogger(__name__)


[docs] class AzureCostMonitor(BaseCostMonitor): """Cost monitoring for Azure Virtual Machines and Batch."""
[docs] def __init__(self, region: str = "eastus", use_pricing_api: bool = True): super().__init__("Azure") self.region = region self.use_pricing_api = use_pricing_api # Initialize pricing client self.pricing_client = AzurePricingClient() if use_pricing_api else None # Azure VM pricing (East US, as of 2025, approximate rates in USD/hour) # These serve as fallback when API is unavailable self.vm_pricing = { # B-series (Burstable) "Standard_B1s": 0.0104, "Standard_B1ms": 0.0208, "Standard_B2s": 0.0416, "Standard_B2ms": 0.0832, "Standard_B4ms": 0.1664, "Standard_B8ms": 0.3328, # D-series (General Purpose) "Standard_D2s_v3": 0.096, "Standard_D4s_v3": 0.192, "Standard_D8s_v3": 0.384, "Standard_D16s_v3": 0.768, "Standard_D32s_v3": 1.536, "Standard_D64s_v3": 3.072, # F-series (Compute Optimized) "Standard_F2s_v2": 0.085, "Standard_F4s_v2": 0.169, "Standard_F8s_v2": 0.338, "Standard_F16s_v2": 0.676, "Standard_F32s_v2": 1.352, "Standard_F64s_v2": 2.704, # E-series (Memory Optimized) "Standard_E2s_v3": 0.126, "Standard_E4s_v3": 0.252, "Standard_E8s_v3": 0.504, "Standard_E16s_v3": 1.008, "Standard_E32s_v3": 2.016, "Standard_E64s_v3": 4.032, # GPU VMs "Standard_NC6s_v3": 3.06, # 1 V100 "Standard_NC12s_v3": 6.12, # 2 V100 "Standard_NC24s_v3": 12.24, # 4 V100 "Standard_ND40rs_v2": 22.0, # 8 V100 "Standard_NC6s_v2": 0.90, # 1 P100 "Standard_NC12s_v2": 1.80, # 2 P100 "Standard_NC24s_v2": 3.60, # 4 P100 # Default fallback "default": 0.10, } # Spot VM discount factors (approximate) self.spot_discounts = { "Standard_B": 0.8, # ~20% discount "Standard_D": 0.7, # ~30% discount "Standard_F": 0.65, # ~35% discount "Standard_E": 0.6, # ~40% discount "Standard_NC": 0.3, # ~70% discount "Standard_ND": 0.35, # ~65% discount "default": 0.7, # ~30% discount } # Instance metadata self.instance_metadata = { "Standard_NC6s_v3": { "gpus": 1, "gpu_type": "V100", "gpu_memory": "16GB", "cpu_cores": 6, "ram": "112GB", }, "Standard_NC12s_v3": { "gpus": 2, "gpu_type": "V100", "gpu_memory": "32GB", "cpu_cores": 12, "ram": "224GB", }, "Standard_NC24s_v3": { "gpus": 4, "gpu_type": "V100", "gpu_memory": "64GB", "cpu_cores": 24, "ram": "448GB", }, "Standard_ND40rs_v2": { "gpus": 8, "gpu_type": "V100", "gpu_memory": "128GB", "cpu_cores": 40, "ram": "672GB", }, "Standard_NC6s_v2": { "gpus": 1, "gpu_type": "P100", "gpu_memory": "16GB", "cpu_cores": 6, "ram": "112GB", }, }
[docs] def get_resource_usage(self) -> ResourceUsage: """Get current resource utilization for Azure VM.""" # Get CPU and memory usage cpu_percent, mem_used_mb, mem_total_mb, mem_percent = ( self.get_cpu_memory_usage() ) # Get GPU utilization (if available) gpu_stats = self.get_gpu_utilization() return ResourceUsage( cpu_percent=cpu_percent, memory_used_mb=mem_used_mb, memory_total_mb=mem_total_mb, memory_percent=mem_percent, gpu_stats=gpu_stats, )
[docs] def estimate_cost( self, instance_type: str, hours_used: float, use_spot: bool = False ) -> CostEstimate: """Estimate cost for Azure VM usage.""" hourly_rate = None pricing_source = "hardcoded" pricing_warning = None # Try to get pricing from API first if self.pricing_client and not use_spot: try: hourly_rate = self.pricing_client.get_instance_pricing( instance_type=instance_type, region=self.region ) if hourly_rate: logger.debug( f"Got pricing for {instance_type} from Azure API: ${hourly_rate}/hr" ) pricing_source = "api" except Exception as e: logger.debug(f"Failed to get pricing from Azure API: {e}") # Handle spot pricing if use_spot and self.pricing_client: try: hourly_rate = self.pricing_client.get_spot_pricing( instance_type, self.region ) if hourly_rate: pricing_source = "api" except Exception as e: logger.debug(f"Failed to get spot pricing from Azure API: {e}") # Fall back to hardcoded pricing if API failed if hourly_rate is None: hourly_rate = self.vm_pricing.get(instance_type, self.vm_pricing["default"]) pricing_source = "hardcoded" if self.pricing_client and self.pricing_client.is_pricing_data_outdated(): pricing_warning = ( f"Using potentially outdated pricing data (last updated: " f"{self.pricing_client._hardcoded_pricing_date}). " f"Current prices may differ. Consider checking Azure pricing page." ) logger.warning(pricing_warning) # Apply spot discount if requested and using hardcoded pricing if use_spot: instance_family = ( instance_type.split("_")[1] if "_" in instance_type else instance_type ) # Match instance family prefix discount_key = next( ( k for k in self.spot_discounts.keys() if instance_family.startswith(k.replace("Standard_", "")) ), "default", ) discount_factor = self.spot_discounts[discount_key] hourly_rate *= discount_factor if pricing_source == "hardcoded": pricing_warning = ( pricing_warning or "" ) + " Spot pricing is estimated and may vary significantly." # Calculate estimated cost estimated_cost = hourly_rate * hours_used pricing_type = "Spot" if use_spot else "Pay-as-you-go" return CostEstimate( instance_type=f"{instance_type} ({pricing_type})", hourly_rate=hourly_rate, hours_used=hours_used, estimated_cost=estimated_cost, currency="USD", last_updated=datetime.now(), pricing_source=pricing_source, pricing_warning=pricing_warning, )
[docs] def get_pricing_info(self) -> Dict[str, float]: """Get Azure VM pricing information.""" return self.vm_pricing.copy()
[docs] def get_spot_pricing_info(self) -> Dict[str, float]: """Get estimated Azure Spot VM pricing.""" spot_pricing = {} for instance_type, on_demand_price in self.vm_pricing.items(): if instance_type != "default": instance_family = ( instance_type.split("_")[1] if "_" in instance_type else instance_type ) discount_key = next( ( k for k in self.spot_discounts.keys() if instance_family.startswith(k.replace("Standard_", "")) ), "default", ) discount_factor = self.spot_discounts[discount_key] spot_pricing[instance_type] = on_demand_price * discount_factor return spot_pricing
[docs] def get_cost_optimization_recommendations( self, resource_usage: ResourceUsage, cost_estimate: CostEstimate ) -> List[str]: """Get Azure-specific cost optimization recommendations.""" recommendations = super().get_cost_optimization_recommendations( resource_usage, cost_estimate ) # Azure-specific recommendations recommendations.extend( [ "Consider using Azure Spot VMs for fault-tolerant workloads (up to 90% savings)", "Use Reserved VM Instances for predictable workloads (up to 72% savings)", "Consider Azure Batch for large-scale parallel processing", "Use Azure CycleCloud for HPC workloads and cluster management", "Enable auto-shutdown for development VMs to avoid unnecessary costs", "Use Azure Monitor for detailed cost tracking and alerts", "Consider B-series burstable VMs for variable workloads", "Use managed disks with appropriate performance tiers", "Implement lifecycle policies for blob storage to reduce storage costs", ] ) # Instance-specific recommendations current_instance = cost_estimate.instance_type.split(" ")[ 0 ] # Remove pricing type if current_instance.startswith("Standard_NC") or current_instance.startswith( "Standard_ND" ): if resource_usage.gpu_stats: avg_gpu_util = sum( gpu.get("utilization_percent", 0) for gpu in resource_usage.gpu_stats ) / len(resource_usage.gpu_stats) if avg_gpu_util < 50: recommendations.append( "Low GPU utilization on expensive GPU VM. Consider Standard_D or Standard_F series VMs." ) return recommendations
[docs] def estimate_batch_cost( self, pool_name: str, vm_size: str, target_nodes: int, estimated_duration_hours: float, ) -> Dict[str, Any]: """Estimate costs for Azure Batch workloads.""" # Get VM hourly cost vm_hourly_cost = self.vm_pricing.get(vm_size, self.vm_pricing["default"]) # Calculate total cost total_compute_hours = target_nodes * estimated_duration_hours estimated_cost = total_compute_hours * vm_hourly_cost return { "pool_name": pool_name, "vm_size": vm_size, "target_nodes": target_nodes, "estimated_duration_hours": estimated_duration_hours, "total_compute_hours": total_compute_hours, "vm_hourly_cost": vm_hourly_cost, "estimated_cost": estimated_cost, "cost_per_node_hour": vm_hourly_cost, "recommendations": [ "Use Low Priority VMs in Batch pools for additional savings", "Implement auto-scaling to optimize node utilization", "Use appropriate VM sizes for different task characteristics", "Monitor task efficiency and optimize task packaging", "Consider using VMSS (Virtual Machine Scale Sets) for flexibility", ], }
[docs] def get_region_pricing_comparison( self, instance_type: str ) -> Dict[str, Dict[str, Any]]: """Compare pricing across Azure regions (simplified).""" # Regional pricing multipliers (approximate) region_multipliers = { "East US": 1.0, # Baseline "West US 2": 1.0, # Same as East US "Central US": 0.95, # Slightly cheaper "West Europe": 1.1, # ~10% more expensive "Southeast Asia": 1.15, # ~15% more expensive "Japan East": 1.2, # ~20% more expensive } base_price = self.vm_pricing.get(instance_type, self.vm_pricing["default"]) regional_pricing = {} for region, multiplier in region_multipliers.items(): regional_pricing[region] = { "pay_as_you_go_hourly": base_price * multiplier, "estimated_spot_hourly": base_price * multiplier * 0.7, # Rough spot estimate "region_name": region, } return regional_pricing
[docs] def get_azure_specific_metrics(self) -> Dict[str, Any]: """Get Azure-specific cost and performance metrics.""" return { "region": self.region, "availability_zone": "auto-detect", # Would detect from instance metadata "vm_lifecycle": "pay-as-you-go", # or 'spot' or 'reserved' "managed_disks": True, "accelerated_networking": True, "proximity_placement_group": None, "cost_optimization_score": self._calculate_cost_optimization_score(), }
[docs] def get_azure_consumption_api_integration(self) -> Dict[str, Any]: """Framework for Azure Consumption API integration.""" # This would integrate with Azure Consumption APIs for real billing data # For now, provide a framework structure return { "billing_period": datetime.now().strftime("%Y-%m"), "subscription_id": "auto-detect", "resource_group": "auto-detect", "cost_to_date": 0.0, # Would fetch from API "forecasted_cost": 0.0, # Would calculate based on usage "budget_alerts": [], # Would fetch configured alerts "cost_breakdown": { "compute": 0.0, "storage": 0.0, "networking": 0.0, "other": 0.0, }, "api_available": False, # Would check API connectivity "last_updated": datetime.now().isoformat(), }
def _calculate_cost_optimization_score(self) -> float: """Calculate a cost optimization score (0-100).""" # This would analyze various factors: # - Resource utilization # - VM size appropriateness # - Spot vs pay-as-you-go usage # - Reserved instance coverage # - Auto-shutdown configurations # For now, return a placeholder return 72.0