Source code for clustrix.cost_monitoring

"""
Cost monitoring functionality for Clustrix across different cloud providers.

This module provides unified cost tracking, resource utilization monitoring,
and cost optimization recommendations for various cloud platforms.
"""

import time
import subprocess
import logging
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Callable
from functools import wraps
from dataclasses import dataclass, asdict
from datetime import datetime

# Configure logging
logger = logging.getLogger(__name__)


[docs] @dataclass class ResourceUsage: """Resource utilization metrics.""" cpu_percent: float memory_used_mb: int memory_total_mb: int memory_percent: float gpu_stats: Optional[List[Dict[str, Any]]] = None network_io_mb: Optional[float] = None disk_io_mb: Optional[float] = None
[docs] @dataclass class CostEstimate: """Cost estimation information.""" instance_type: str hourly_rate: float hours_used: float estimated_cost: float currency: str = "USD" last_updated: Optional[datetime] = None pricing_source: str = "api" # "api" or "hardcoded" pricing_warning: Optional[str] = None
[docs] @dataclass class CostReport: """Comprehensive cost and usage report.""" timestamp: datetime duration_seconds: float resource_usage: ResourceUsage cost_estimate: CostEstimate provider: str region: Optional[str] = None recommendations: Optional[List[str]] = None metadata: Optional[Dict[str, Any]] = None
[docs] class BaseCostMonitor(ABC): """Base class for cloud provider cost monitoring."""
[docs] def __init__(self, provider_name: str): self.provider_name = provider_name self.start_time = None self.monitoring_enabled = True
[docs] @abstractmethod def get_resource_usage(self) -> ResourceUsage: """Get current resource utilization metrics.""" pass
[docs] @abstractmethod def estimate_cost(self, instance_type: str, hours_used: float) -> CostEstimate: """Estimate cost for given instance type and usage duration.""" pass
[docs] @abstractmethod def get_pricing_info(self) -> Dict[str, float]: """Get current pricing information for different instance types.""" pass
[docs] def start_monitoring(self): """Start cost monitoring session.""" self.start_time = time.time() logger.info(f"Started cost monitoring for {self.provider_name}")
[docs] def stop_monitoring(self) -> Optional[CostReport]: """Stop monitoring and generate cost report.""" if self.start_time is None: logger.warning("Monitoring was not started") return None end_time = time.time() duration = end_time - self.start_time # Get current resource usage resource_usage = self.get_resource_usage() # Estimate cost (requires instance type to be set) cost_estimate = self.estimate_cost("default", duration / 3600) # Generate recommendations recommendations = self.get_cost_optimization_recommendations( resource_usage, cost_estimate ) report = CostReport( timestamp=datetime.now(), duration_seconds=duration, resource_usage=resource_usage, cost_estimate=cost_estimate, provider=self.provider_name, recommendations=recommendations, ) logger.info( f"Completed cost monitoring for {self.provider_name}. " f"Duration: {duration:.2f}s, Estimated cost: ${cost_estimate.estimated_cost:.4f}" ) return report
[docs] def get_cost_optimization_recommendations( self, resource_usage: ResourceUsage, cost_estimate: CostEstimate ) -> List[str]: """Generate cost optimization recommendations based on usage patterns.""" recommendations = [] # CPU utilization recommendations if resource_usage.cpu_percent < 20: recommendations.append( "Low CPU usage detected. Consider using a smaller instance type." ) elif resource_usage.cpu_percent > 90: recommendations.append( "High CPU usage detected. Consider using a larger instance type or optimizing workload." ) # Memory utilization recommendations if resource_usage.memory_percent < 30: recommendations.append( "Low memory usage detected. Consider using an instance with less memory." ) elif resource_usage.memory_percent > 85: recommendations.append( "High memory usage detected. Consider using an instance with more memory." ) # GPU utilization recommendations (if available) if resource_usage.gpu_stats: avg_gpu_util = sum( gpu.get("utilization_percent", 0) for gpu in resource_usage.gpu_stats ) / len(resource_usage.gpu_stats) if avg_gpu_util < 50: recommendations.append( "Low GPU utilization detected. Consider using CPU instances or optimizing GPU workload." ) elif avg_gpu_util > 95: recommendations.append( "High GPU utilization detected. Consider multi-GPU instances for better performance." ) # Cost-based recommendations if cost_estimate.estimated_cost > 10: # $10 threshold recommendations.append( "High estimated cost detected. Consider using spot instances or reserved capacity." ) return recommendations
[docs] def get_gpu_utilization(self) -> List[Dict[str, Any]]: """Get GPU utilization metrics using nvidia-smi.""" try: result = subprocess.run( [ "nvidia-smi", "--query-gpu=utilization.gpu,memory.used,memory.total,temperature.gpu", "--format=csv,noheader,nounits", ], capture_output=True, text=True, timeout=10, ) if result.returncode == 0: lines = result.stdout.strip().split("\n") gpu_stats = [] for i, line in enumerate(lines): if line.strip(): parts = line.split(", ") if len(parts) >= 3: try: gpu_stats.append( { "gpu_id": i, "utilization_percent": int(parts[0]), "memory_used_mb": int(parts[1]), "memory_total_mb": int(parts[2]), "memory_utilization_percent": round( int(parts[1]) / int(parts[2]) * 100, 1 ), "temperature_c": ( int(parts[3]) if len(parts) > 3 else None ), } ) except (ValueError, ZeroDivisionError): continue return gpu_stats except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e: logger.debug(f"Could not get GPU utilization: {e}") return []
[docs] def get_cpu_memory_usage(self) -> tuple: """Get CPU and memory usage using system tools.""" try: # Try to get CPU usage cpu_result = subprocess.run( ["python", "-c", "import psutil; print(f'{psutil.cpu_percent():.1f}')"], capture_output=True, text=True, timeout=5, ) cpu_percent = ( float(cpu_result.stdout.strip()) if cpu_result.returncode == 0 else 0.0 ) # Try to get memory usage mem_result = subprocess.run( [ "python", "-c", "import psutil; m=psutil.virtual_memory(); " "print(f'{m.used//1024//1024},{m.total//1024//1024},{m.percent:.1f}')", ], capture_output=True, text=True, timeout=5, ) if mem_result.returncode == 0: mem_used, mem_total, mem_percent = mem_result.stdout.strip().split(",") return cpu_percent, int(mem_used), int(mem_total), float(mem_percent) except Exception as e: logger.debug(f"Could not get CPU/memory usage: {e}") return 0.0, 0, 0, 0.0
[docs] def cost_tracking_decorator(provider: str, instance_type: str = "default"): """ Decorator to automatically track costs for Clustrix functions. Args: provider: Cloud provider name (e.g., 'lambda', 'aws', 'azure', 'gcp') instance_type: Instance type for cost estimation Example:: @cost_tracking_decorator('lambda', 'a100_40gb') @cluster(cores=8, memory="32GB") def my_training_function(): # Your code here pass """ def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs): # Get the appropriate cost monitor monitor = get_cost_monitor(provider) if monitor is None: logger.warning( f"Cost monitoring not available for provider: {provider}" ) return func(*args, **kwargs) # Start monitoring monitor.start_monitoring() try: # Execute the function result = func(*args, **kwargs) success = True error = None except Exception as e: result = None success = False error = str(e) logger.error(f"Function execution failed: {e}") # Stop monitoring and get report cost_report = monitor.stop_monitoring() # Return enhanced result with cost information return { "result": result, "success": success, "error": error, "cost_report": asdict(cost_report) if cost_report else None, "provider": provider, "instance_type": instance_type, } return wrapper return decorator
[docs] def get_cost_monitor(provider: str) -> Optional[BaseCostMonitor]: """ Get the appropriate cost monitor for a cloud provider. Args: provider: Cloud provider name Returns: Cost monitor instance or None if not available """ provider = provider.lower() if provider == "lambda": from .cost_providers.lambda_cloud import LambdaCostMonitor return LambdaCostMonitor() elif provider == "aws": from .cost_providers.aws import AWSCostMonitor return AWSCostMonitor() elif provider == "azure": from .cost_providers.azure import AzureCostMonitor return AzureCostMonitor() elif provider == "gcp": from .cost_providers.gcp import GCPCostMonitor return GCPCostMonitor() else: logger.warning(f"Unsupported cloud provider: {provider}") return None
# Convenience functions for direct use
[docs] def start_cost_monitoring(provider: str) -> Optional[BaseCostMonitor]: """Start cost monitoring for a specific provider.""" monitor = get_cost_monitor(provider) if monitor: monitor.start_monitoring() return monitor
[docs] def generate_cost_report( provider: str, instance_type: str = "default" ) -> Optional[Dict[str, Any]]: """Generate a cost report for the current session.""" monitor = get_cost_monitor(provider) if monitor: # Get current state without stopping monitoring resource_usage = monitor.get_resource_usage() cost_estimate = monitor.estimate_cost(instance_type, 1.0) # 1 hour estimate recommendations = monitor.get_cost_optimization_recommendations( resource_usage, cost_estimate ) return { "timestamp": datetime.now().isoformat(), "provider": provider, "resource_usage": asdict(resource_usage), "cost_estimate": asdict(cost_estimate), "recommendations": recommendations, } return None
[docs] def get_pricing_info(provider: str) -> Optional[Dict[str, float]]: """Get pricing information for a cloud provider.""" monitor = get_cost_monitor(provider) if monitor: return monitor.get_pricing_info() return None