Source code for clustrix.cost_providers.gcp

"""
Google Cloud Platform cost monitoring implementation.
"""

import logging
from datetime import datetime
from typing import Dict, List, Any

from ..cost_monitoring import BaseCostMonitor, ResourceUsage, CostEstimate
from ..pricing_clients.gcp_pricing import GCPPricingClient

logger = logging.getLogger(__name__)


[docs] class GCPCostMonitor(BaseCostMonitor): """Cost monitoring for Google Cloud Platform Compute Engine instances."""
[docs] def __init__(self, region: str = "us-central1", use_pricing_api: bool = True): super().__init__("Google Cloud Platform") self.region = region self.use_pricing_api = use_pricing_api # Initialize pricing client self.pricing_client = GCPPricingClient() if use_pricing_api else None # GCP Compute Engine pricing (us-central1, as of 2025, approximate rates in USD/hour) # These serve as fallback when API is unavailable self.compute_pricing = { # General Purpose (N2) "n2-standard-2": 0.097, "n2-standard-4": 0.194, "n2-standard-8": 0.389, "n2-standard-16": 0.778, "n2-standard-32": 1.555, "n2-standard-64": 3.110, # High-CPU (N2) "n2-highcpu-16": 0.588, "n2-highcpu-32": 1.177, "n2-highcpu-64": 2.353, # High-Memory (N2) "n2-highmem-2": 0.130, "n2-highmem-4": 0.261, "n2-highmem-8": 0.521, "n2-highmem-16": 1.042, # Compute Optimized (C2) "c2-standard-4": 0.134, "c2-standard-8": 0.268, "c2-standard-16": 0.537, "c2-standard-30": 1.006, "c2-standard-60": 2.013, # Memory Optimized (M2) "m2-ultramem-208": 32.775, "m2-ultramem-416": 65.550, # GPU-attached instances (base compute + GPU cost) "n1-standard-4-k80": 0.294, # + K80 GPU "n1-standard-8-v100": 1.46, # + V100 GPU "n1-standard-16-t4": 0.80, # + T4 GPU "a2-highgpu-1g": 3.673, # 1 A100 GPU "a2-highgpu-2g": 7.347, # 2 A100 GPU "a2-highgpu-4g": 14.694, # 4 A100 GPU "a2-highgpu-8g": 29.387, # 8 A100 GPU # Default fallback "default": 0.10, } # Preemptible instance discount (approximately 80% discount) self.preemptible_discount = 0.2 # Instance metadata self.instance_metadata = { "a2-highgpu-1g": { "gpus": 1, "gpu_type": "A100", "gpu_memory": "40GB", "cpu_cores": 12, "ram": "85GB", }, "a2-highgpu-2g": { "gpus": 2, "gpu_type": "A100", "gpu_memory": "80GB", "cpu_cores": 24, "ram": "170GB", }, "a2-highgpu-4g": { "gpus": 4, "gpu_type": "A100", "gpu_memory": "160GB", "cpu_cores": 48, "ram": "340GB", }, "a2-highgpu-8g": { "gpus": 8, "gpu_type": "A100", "gpu_memory": "320GB", "cpu_cores": 96, "ram": "680GB", }, "n1-standard-8-v100": { "gpus": 1, "gpu_type": "V100", "gpu_memory": "16GB", "cpu_cores": 8, "ram": "30GB", }, "n1-standard-16-t4": { "gpus": 1, "gpu_type": "T4", "gpu_memory": "16GB", "cpu_cores": 16, "ram": "60GB", }, } # Sustained Use Discounts (automatic discounts for sustained usage) self.sustained_use_discounts = { 25: 0.0, # 0-25% of month: no discount 50: 0.10, # 25-50% of month: 10% discount 75: 0.20, # 50-75% of month: 20% discount 100: 0.30, # 75-100% of month: 30% discount }
[docs] def get_resource_usage(self) -> ResourceUsage: """Get current resource utilization for GCP instance.""" # Get CPU and memory usage cpu_percent, mem_used_mb, mem_total_mb, mem_percent = ( self.get_cpu_memory_usage() ) # Get GPU utilization (if available) gpu_stats = self.get_gpu_utilization() return ResourceUsage( cpu_percent=cpu_percent, memory_used_mb=mem_used_mb, memory_total_mb=mem_total_mb, memory_percent=mem_percent, gpu_stats=gpu_stats, )
[docs] def estimate_cost( self, instance_type: str, hours_used: float, use_preemptible: bool = False, sustained_use_percent: float = 0, ) -> CostEstimate: """Estimate cost for GCP instance usage.""" hourly_rate = None pricing_source = "hardcoded" pricing_warning = None # Try to get pricing from API first if self.pricing_client and not use_preemptible: try: hourly_rate = self.pricing_client.get_instance_pricing( instance_type=instance_type, region=self.region ) if hourly_rate: logger.debug( f"Got pricing for {instance_type} from GCP API: ${hourly_rate}/hr" ) pricing_source = "api" except Exception as e: logger.debug(f"Failed to get pricing from GCP API: {e}") # Handle preemptible pricing if use_preemptible and self.pricing_client: try: hourly_rate = self.pricing_client.get_preemptible_pricing( instance_type, self.region ) if hourly_rate: pricing_source = "api" except Exception as e: logger.debug(f"Failed to get preemptible pricing from GCP API: {e}") # Fall back to hardcoded pricing if API failed if hourly_rate is None: hourly_rate = self.compute_pricing.get( instance_type, self.compute_pricing["default"] ) pricing_source = "hardcoded" if self.pricing_client and self.pricing_client.is_pricing_data_outdated(): pricing_warning = ( f"Using potentially outdated pricing data (last updated: " f"{self.pricing_client._hardcoded_pricing_date}). " f"Current prices may differ. Consider checking GCP pricing page." ) logger.warning(pricing_warning) # Apply preemptible discount if requested and using hardcoded pricing if use_preemptible: hourly_rate *= self.preemptible_discount if pricing_source == "hardcoded": pricing_warning = ( (pricing_warning or "") + " Preemptible pricing is estimated and may vary significantly." ) # Apply sustained use discount (GCP feature) discount: float = 0.0 if sustained_use_percent > 25: discount_tier = min( [ k for k in self.sustained_use_discounts.keys() if k >= sustained_use_percent ] ) discount = self.sustained_use_discounts[discount_tier] hourly_rate *= 1 - discount # Calculate estimated cost estimated_cost = hourly_rate * hours_used pricing_type = "Preemptible" if use_preemptible else "On-Demand" if sustained_use_percent > 25: pricing_type += f" (SUD: {discount * 100:.0f}%)" return CostEstimate( instance_type=f"{instance_type} ({pricing_type})", hourly_rate=hourly_rate, hours_used=hours_used, estimated_cost=estimated_cost, currency="USD", last_updated=datetime.now(), pricing_source=pricing_source, pricing_warning=pricing_warning, )
[docs] def get_pricing_info(self) -> Dict[str, float]: """Get GCP Compute Engine pricing information.""" return self.compute_pricing.copy()
[docs] def get_preemptible_pricing_info(self) -> Dict[str, float]: """Get GCP preemptible pricing.""" preemptible_pricing = {} for instance_type, on_demand_price in self.compute_pricing.items(): if instance_type != "default": preemptible_pricing[instance_type] = ( on_demand_price * self.preemptible_discount ) return preemptible_pricing
[docs] def get_cost_optimization_recommendations( self, resource_usage: ResourceUsage, cost_estimate: CostEstimate ) -> List[str]: """Get GCP-specific cost optimization recommendations.""" recommendations = super().get_cost_optimization_recommendations( resource_usage, cost_estimate ) # GCP-specific recommendations recommendations.extend( [ "Consider using Preemptible VMs for fault-tolerant workloads (up to 80% savings)", "Take advantage of automatic Sustained Use Discounts for long-running instances", "Use Committed Use Discounts for predictable workloads (up to 57% savings)", "Consider using custom machine types to optimize CPU/memory ratios", "Use Google Kubernetes Engine for containerized workloads", "Implement instance scheduling to automatically start/stop VMs", "Use Cloud Storage instead of persistent disks for cold data", "Enable detailed monitoring with Cloud Monitoring for cost tracking", "Consider using Sole-tenant nodes for licensing requirements", ] ) # Instance-specific recommendations current_instance = cost_estimate.instance_type.split(" ")[ 0 ] # Remove pricing type if current_instance.startswith("a2-highgpu"): if resource_usage.gpu_stats: avg_gpu_util = sum( gpu.get("utilization_percent", 0) for gpu in resource_usage.gpu_stats ) / len(resource_usage.gpu_stats) if avg_gpu_util < 50: recommendations.append( "Low GPU utilization on expensive A100 instance. Consider n1-standard instances with T4 GPUs." ) return recommendations
[docs] def estimate_sustained_use_discount(self, hours_per_month: float) -> Dict[str, Any]: """Calculate sustained use discount based on monthly usage.""" hours_in_month = 30 * 24 # 720 hours usage_percentage = (hours_per_month / hours_in_month) * 100 discount = 0.0 discount_tier = "None" if usage_percentage >= 75: discount = 0.30 discount_tier = "75-100%" elif usage_percentage >= 50: discount = 0.20 discount_tier = "50-75%" elif usage_percentage >= 25: discount = 0.10 discount_tier = "25-50%" return { "hours_per_month": hours_per_month, "usage_percentage": usage_percentage, "discount_percentage": discount * 100, "discount_tier": discount_tier, "effective_hourly_rate_multiplier": 1 - discount, }
[docs] def get_region_pricing_comparison( self, instance_type: str ) -> Dict[str, Dict[str, Any]]: """Compare pricing across GCP regions (simplified).""" # Regional pricing multipliers (approximate) region_multipliers = { "us-central1": 1.0, # Iowa (baseline) "us-east1": 1.0, # South Carolina "us-west1": 1.05, # Oregon "europe-west1": 1.1, # Belgium "asia-southeast1": 1.15, # Singapore "asia-northeast1": 1.2, # Tokyo } base_price = self.compute_pricing.get( instance_type, self.compute_pricing["default"] ) regional_pricing = {} for region, multiplier in region_multipliers.items(): regional_pricing[region] = { "on_demand_hourly": base_price * multiplier, "preemptible_hourly": base_price * multiplier * self.preemptible_discount, "region_name": region, } return regional_pricing
[docs] def estimate_batch_cost( self, job_name: str, machine_type: str, instance_count: int, estimated_duration_hours: float, ) -> Dict[str, Any]: """Estimate costs for Google Cloud Batch workloads.""" # Get VM hourly cost vm_hourly_cost = self.compute_pricing.get( machine_type, self.compute_pricing["default"] ) # Calculate total cost total_compute_hours = instance_count * estimated_duration_hours estimated_cost = total_compute_hours * vm_hourly_cost return { "job_name": job_name, "machine_type": machine_type, "instance_count": instance_count, "estimated_duration_hours": estimated_duration_hours, "total_compute_hours": total_compute_hours, "vm_hourly_cost": vm_hourly_cost, "estimated_cost": estimated_cost, "cost_per_instance_hour": vm_hourly_cost, "recommendations": [ "Use preemptible instances in batch jobs for significant savings", "Optimize job parallelization to reduce total runtime", "Use appropriate machine types for different job characteristics", "Implement checkpointing for fault tolerance with preemptible instances", "Consider using Google Kubernetes Engine for batch workloads", ], }
[docs] def get_gcp_specific_metrics(self) -> Dict[str, Any]: """Get GCP-specific cost and performance metrics.""" return { "region": self.region, "zone": "auto-detect", # Would detect from instance metadata "vm_lifecycle": "on-demand", # or 'preemptible' "custom_machine_type": False, "sole_tenancy": False, "committed_use_discount": False, "cost_optimization_score": self._calculate_cost_optimization_score(), }
[docs] def get_billing_api_integration(self) -> Dict[str, Any]: """Framework for GCP Billing API integration.""" # This would integrate with GCP Billing APIs for real billing data # For now, provide a framework structure return { "billing_account": "auto-detect", "project_id": "auto-detect", "billing_period": datetime.now().strftime("%Y-%m"), "cost_to_date": 0.0, # Would fetch from API "forecasted_cost": 0.0, # Would calculate based on usage "budget_alerts": [], # Would fetch configured alerts "cost_breakdown": { "compute_engine": 0.0, "storage": 0.0, "networking": 0.0, "other_services": 0.0, }, "api_available": False, # Would check API connectivity "last_updated": datetime.now().isoformat(), }
def _calculate_cost_optimization_score(self) -> float: """Calculate a cost optimization score (0-100).""" # This would analyze various factors: # - Resource utilization # - Machine type appropriateness # - Preemptible vs on-demand usage # - Sustained use discount eligibility # - Committed use discount opportunities # For now, return a placeholder return 78.0