Cost MonitoringΒΆ

Clustrix provides comprehensive cost monitoring and optimization features for major cloud providers. This module enables automatic cost tracking, resource utilization monitoring, and cost optimization recommendations.

OverviewΒΆ

The cost monitoring system supports:

  • AWS: EC2 instances, Batch, spot instances

  • Google Cloud: Compute Engine, preemptible VMs, sustained use discounts

  • Azure: Virtual Machines, Batch, spot VMs

  • Lambda Cloud: GPU instances with utilization tracking

Core ClassesΒΆ

ResourceUsageΒΆ

class clustrix.cost_monitoring.ResourceUsage(cpu_percent, memory_used_mb, memory_total_mb, memory_percent, gpu_stats=None, network_io_mb=None, disk_io_mb=None)[source]ΒΆ

Bases: object

Resource utilization metrics.

Data class containing resource utilization metrics.

Attributes:

  • cpu_percent: CPU utilization percentage

  • memory_used_mb: Memory usage in MB

  • memory_total_mb: Total memory in MB

  • memory_percent: Memory utilization percentage

  • gpu_stats: Optional GPU utilization data

  • network_io_mb: Optional network I/O in MB

  • disk_io_mb: Optional disk I/O in MB

cpu_percent: floatΒΆ
memory_used_mb: intΒΆ
memory_total_mb: intΒΆ
memory_percent: floatΒΆ
gpu_stats: Optional[List[Dict[str, Any]]] = NoneΒΆ
network_io_mb: Optional[float] = NoneΒΆ
disk_io_mb: Optional[float] = NoneΒΆ
__init__(cpu_percent, memory_used_mb, memory_total_mb, memory_percent, gpu_stats=None, network_io_mb=None, disk_io_mb=None)ΒΆ

CostEstimateΒΆ

class clustrix.cost_monitoring.CostEstimate(instance_type, hourly_rate, hours_used, estimated_cost, currency='USD', last_updated=None, pricing_source='api', pricing_warning=None)[source]ΒΆ

Bases: object

Cost estimation information.

Data class containing cost estimation information.

Attributes:

  • instance_type: Cloud instance type

  • hourly_rate: Cost per hour in USD

  • hours_used: Number of hours used

  • estimated_cost: Total estimated cost

  • currency: Currency (default: β€œUSD”)

  • last_updated: Last update timestamp

instance_type: strΒΆ
hourly_rate: floatΒΆ
hours_used: floatΒΆ
estimated_cost: floatΒΆ
currency: str = 'USD'ΒΆ
last_updated: Optional[datetime] = NoneΒΆ
pricing_source: str = 'api'ΒΆ
pricing_warning: Optional[str] = NoneΒΆ
__init__(instance_type, hourly_rate, hours_used, estimated_cost, currency='USD', last_updated=None, pricing_source='api', pricing_warning=None)ΒΆ

CostReportΒΆ

class clustrix.cost_monitoring.CostReport(timestamp, duration_seconds, resource_usage, cost_estimate, provider, region=None, recommendations=None, metadata=None)[source]ΒΆ

Bases: object

Comprehensive cost and usage report.

Comprehensive cost and usage report.

Attributes:

  • timestamp: Report generation time

  • duration_seconds: Monitoring duration

  • resource_usage: Resource utilization data

  • cost_estimate: Cost estimation data

  • provider: Cloud provider name

  • region: Optional region information

  • recommendations: Cost optimization suggestions

  • metadata: Additional metadata

timestamp: datetimeΒΆ
duration_seconds: floatΒΆ
resource_usage: ResourceUsageΒΆ
cost_estimate: CostEstimateΒΆ
provider: strΒΆ
region: Optional[str] = NoneΒΆ
recommendations: Optional[List[str]] = NoneΒΆ
metadata: Optional[Dict[str, Any]] = NoneΒΆ
__init__(timestamp, duration_seconds, resource_usage, cost_estimate, provider, region=None, recommendations=None, metadata=None)ΒΆ

Base Monitor ClassΒΆ

BaseCostMonitorΒΆ

class clustrix.cost_monitoring.BaseCostMonitor(provider_name)[source]ΒΆ

Bases: ABC

Base class for cloud provider cost monitoring.

Abstract base class for cloud provider cost monitors.

Key Methods:

  • get_resource_usage(): Get current resource utilization

  • estimate_cost(): Estimate costs for given usage

  • get_pricing_info(): Get current pricing information

  • start_monitoring(): Begin cost monitoring session

  • stop_monitoring(): End monitoring and generate report

__init__(provider_name)[source]ΒΆ
abstractmethod get_resource_usage()[source]ΒΆ

Get current resource utilization metrics.

Return type:

ResourceUsage

abstractmethod estimate_cost(instance_type, hours_used)[source]ΒΆ

Estimate cost for given instance type and usage duration.

Return type:

CostEstimate

abstractmethod get_pricing_info()[source]ΒΆ

Get current pricing information for different instance types.

Return type:

Dict[str, float]

start_monitoring()[source]ΒΆ

Start cost monitoring session.

stop_monitoring()[source]ΒΆ

Stop monitoring and generate cost report.

Return type:

Optional[CostReport]

get_cost_optimization_recommendations(resource_usage, cost_estimate)[source]ΒΆ

Generate cost optimization recommendations based on usage patterns.

Return type:

List[str]

get_gpu_utilization()[source]ΒΆ

Get GPU utilization metrics using nvidia-smi.

Return type:

List[Dict[str, Any]]

get_cpu_memory_usage()[source]ΒΆ

Get CPU and memory usage using system tools.

Return type:

tuple

Decorators and UtilitiesΒΆ

cost_tracking_decoratorΒΆ

clustrix.cost_monitoring.cost_tracking_decorator(provider, instance_type='default')[source]ΒΆ

Decorator to automatically track costs for Clustrix functions.

Parameters:
  • provider (str) – Cloud provider name (e.g., β€˜lambda’, β€˜aws’, β€˜azure’, β€˜gcp’)

  • instance_type (str) – Instance type for cost estimation

Example:

@cost_tracking_decorator('lambda', 'a100_40gb')
@cluster(cores=8, memory="32GB")
def my_training_function():
    # Your code here
    pass

Decorator for automatic cost tracking of functions.

Parameters:

  • provider: Cloud provider name (β€˜aws’, β€˜gcp’, β€˜azure’, β€˜lambda’)

  • instance_type: Instance type for cost estimation

Example:

@cost_tracking_decorator('aws', 'p3.2xlarge')
@cluster(cores=8, memory='60GB')
def train_model():
    # Your training code here
    pass

# Automatic cost tracking with detailed report
result = train_model()
print(f"Cost: ${result['cost_report']['cost_estimate']['estimated_cost']:.2f}")

Utility FunctionsΒΆ

get_cost_monitorΒΆ

clustrix.cost_monitoring.get_cost_monitor(provider)[source]ΒΆ

Get the appropriate cost monitor for a cloud provider.

Parameters:

provider (str) – Cloud provider name

Return type:

Optional[BaseCostMonitor]

Returns:

Cost monitor instance or None if not available

Get the appropriate cost monitor for a cloud provider.

Parameters:

  • provider: Cloud provider name

Returns:

  • BaseCostMonitor: Provider-specific cost monitor instance

Example:

monitor = get_cost_monitor('gcp')
cost_estimate = monitor.estimate_cost('n2-standard-4', 2.0)

start_cost_monitoringΒΆ

clustrix.cost_monitoring.start_cost_monitoring(provider)[source]ΒΆ

Start cost monitoring for a specific provider.

Return type:

Optional[BaseCostMonitor]

Start cost monitoring for a specific provider.

Parameters:

  • provider: Cloud provider name

Returns:

  • BaseCostMonitor: Active cost monitor instance

generate_cost_reportΒΆ

clustrix.cost_monitoring.generate_cost_report(provider, instance_type='default')[source]ΒΆ

Generate a cost report for the current session.

Return type:

Optional[Dict[str, Any]]

Generate a cost report for the current session.

Parameters:

  • provider: Cloud provider name

  • instance_type: Instance type for cost estimation

  • duration_seconds: Optional duration override

Returns:

  • dict: Cost report with usage and estimates

get_pricing_infoΒΆ

clustrix.cost_monitoring.get_pricing_info(provider)[source]ΒΆ

Get pricing information for a cloud provider.

Return type:

Optional[Dict[str, float]]

Get pricing information for a cloud provider.

Parameters:

  • provider: Cloud provider name

Returns:

  • dict: Pricing information by instance type

Cloud Provider MonitorsΒΆ

Lambda Cloud MonitorΒΆ

class clustrix.cost_providers.lambda_cloud.LambdaCostMonitor(use_pricing_api=True, api_key=None)[source]ΒΆ

Bases: BaseCostMonitor

Cost monitoring for Lambda Cloud instances.

Cost monitoring for Lambda Cloud GPU instances.

Features:

  • Real-time GPU utilization monitoring

  • Accurate pricing for all Lambda instance types

  • Instance recommendations based on usage patterns

  • Monthly cost estimation tools

__init__(use_pricing_api=True, api_key=None)[source]ΒΆ
get_resource_usage()[source]ΒΆ

Get current resource utilization for Lambda Cloud instance.

Return type:

ResourceUsage

estimate_cost(instance_type, hours_used)[source]ΒΆ

Estimate cost for Lambda Cloud instance usage.

Return type:

CostEstimate

get_pricing_info()[source]ΒΆ

Get Lambda Cloud pricing information.

Return type:

Dict[str, float]

get_instance_recommendations(resource_usage, current_instance=None)[source]ΒΆ

Get instance type recommendations based on current usage.

Return type:

List[str]

get_cost_optimization_tips()[source]ΒΆ

Get general cost optimization tips for Lambda Cloud.

Return type:

List[str]

get_performance_metrics()[source]ΒΆ

Get comprehensive performance metrics for Lambda Cloud instances.

Return type:

Dict[str, Any]

estimate_monthly_cost(instance_type, hours_per_day=8)[source]ΒΆ

Estimate monthly costs for different usage patterns.

Return type:

Dict[str, Any]

AWS Cost MonitorΒΆ

class clustrix.cost_providers.aws.AWSCostMonitor(region='us-east-1', use_pricing_api=True)[source]ΒΆ

Bases: BaseCostMonitor

Cost monitoring for AWS instances (EC2, Batch, etc.).

Cost monitoring for AWS EC2 and Batch services.

Features:

  • On-demand and spot instance pricing

  • AWS Batch cost estimation

  • Regional pricing comparisons

  • Reserved instance recommendations

__init__(region='us-east-1', use_pricing_api=True)[source]ΒΆ
get_resource_usage()[source]ΒΆ

Get current resource utilization for AWS instance.

Return type:

ResourceUsage

estimate_cost(instance_type, hours_used, use_spot=False)[source]ΒΆ

Estimate cost for AWS instance usage.

Return type:

CostEstimate

get_pricing_info()[source]ΒΆ

Get AWS EC2 pricing information.

Return type:

Dict[str, float]

get_spot_pricing_info()[source]ΒΆ

Get estimated AWS spot pricing.

Return type:

Dict[str, float]

get_cost_optimization_recommendations(resource_usage, cost_estimate)[source]ΒΆ

Get AWS-specific cost optimization recommendations.

Return type:

List[str]

estimate_batch_cost(job_queue, compute_environment, estimated_jobs, avg_job_duration_hours)[source]ΒΆ

Estimate costs for AWS Batch workloads.

Return type:

Dict[str, Any]

get_region_pricing_comparison(instance_type)[source]ΒΆ

Compare pricing across AWS regions (simplified).

Return type:

Dict[str, Dict[str, Any]]

get_aws_specific_metrics()[source]ΒΆ

Get AWS-specific cost and performance metrics.

Return type:

Dict[str, Any]

Azure Cost MonitorΒΆ

class clustrix.cost_providers.azure.AzureCostMonitor(region='eastus', use_pricing_api=True)[source]ΒΆ

Bases: BaseCostMonitor

Cost monitoring for Azure Virtual Machines and Batch.

Cost monitoring for Azure Virtual Machines and Batch.

Features:

  • Pay-as-you-go and spot VM pricing

  • Azure Batch cost estimation

  • Regional pricing analysis

  • Cost optimization recommendations

__init__(region='eastus', use_pricing_api=True)[source]ΒΆ
get_resource_usage()[source]ΒΆ

Get current resource utilization for Azure VM.

Return type:

ResourceUsage

estimate_cost(instance_type, hours_used, use_spot=False)[source]ΒΆ

Estimate cost for Azure VM usage.

Return type:

CostEstimate

get_pricing_info()[source]ΒΆ

Get Azure VM pricing information.

Return type:

Dict[str, float]

get_spot_pricing_info()[source]ΒΆ

Get estimated Azure Spot VM pricing.

Return type:

Dict[str, float]

get_cost_optimization_recommendations(resource_usage, cost_estimate)[source]ΒΆ

Get Azure-specific cost optimization recommendations.

Return type:

List[str]

estimate_batch_cost(pool_name, vm_size, target_nodes, estimated_duration_hours)[source]ΒΆ

Estimate costs for Azure Batch workloads.

Return type:

Dict[str, Any]

get_region_pricing_comparison(instance_type)[source]ΒΆ

Compare pricing across Azure regions (simplified).

Return type:

Dict[str, Dict[str, Any]]

get_azure_specific_metrics()[source]ΒΆ

Get Azure-specific cost and performance metrics.

Return type:

Dict[str, Any]

get_azure_consumption_api_integration()[source]ΒΆ

Framework for Azure Consumption API integration.

Return type:

Dict[str, Any]

GCP Cost MonitorΒΆ

class clustrix.cost_providers.gcp.GCPCostMonitor(region='us-central1', use_pricing_api=True)[source]ΒΆ

Bases: BaseCostMonitor

Cost monitoring for Google Cloud Platform Compute Engine instances.

Cost monitoring for Google Cloud Compute Engine.

Features:

  • On-demand and preemptible instance pricing

  • Sustained use discount calculations

  • Regional pricing comparisons

  • Google Cloud Batch cost estimation

__init__(region='us-central1', use_pricing_api=True)[source]ΒΆ
get_resource_usage()[source]ΒΆ

Get current resource utilization for GCP instance.

Return type:

ResourceUsage

estimate_cost(instance_type, hours_used, use_preemptible=False, sustained_use_percent=0)[source]ΒΆ

Estimate cost for GCP instance usage.

Return type:

CostEstimate

get_pricing_info()[source]ΒΆ

Get GCP Compute Engine pricing information.

Return type:

Dict[str, float]

get_preemptible_pricing_info()[source]ΒΆ

Get GCP preemptible pricing.

Return type:

Dict[str, float]

get_cost_optimization_recommendations(resource_usage, cost_estimate)[source]ΒΆ

Get GCP-specific cost optimization recommendations.

Return type:

List[str]

estimate_sustained_use_discount(hours_per_month)[source]ΒΆ

Calculate sustained use discount based on monthly usage.

Return type:

Dict[str, Any]

get_region_pricing_comparison(instance_type)[source]ΒΆ

Compare pricing across GCP regions (simplified).

Return type:

Dict[str, Dict[str, Any]]

estimate_batch_cost(job_name, machine_type, instance_count, estimated_duration_hours)[source]ΒΆ

Estimate costs for Google Cloud Batch workloads.

Return type:

Dict[str, Any]

get_gcp_specific_metrics()[source]ΒΆ

Get GCP-specific cost and performance metrics.

Return type:

Dict[str, Any]

get_billing_api_integration()[source]ΒΆ

Framework for GCP Billing API integration.

Return type:

Dict[str, Any]

Usage ExamplesΒΆ

Basic Cost MonitoringΒΆ

from clustrix import get_cost_monitor

# Get AWS cost monitor
monitor = get_cost_monitor('aws')

# Estimate costs
cost_estimate = monitor.estimate_cost('p3.2xlarge', hours_used=2.0)
print(f"Cost: ${cost_estimate.estimated_cost:.2f}")

# Get current resource usage
usage = monitor.get_resource_usage()
print(f"CPU: {usage.cpu_percent}%, Memory: {usage.memory_percent}%")

Automatic Cost TrackingΒΆ

from clustrix import cost_tracking_decorator, cluster

@cost_tracking_decorator('gcp', 'n2-standard-8')
@cluster(cores=8, memory='32GB')
def data_processing():
    # Your data processing code
    import pandas as pd
    df = pd.read_csv('large_dataset.csv')
    return df.groupby('category').sum()

# Execute with automatic cost tracking
result = data_processing()
if result['success']:
    print(f"Processing completed successfully")
    print(f"Estimated cost: ${result['cost_report']['cost_estimate']['estimated_cost']:.2f}")
    print(f"Duration: {result['cost_report']['duration_seconds']:.1f} seconds")

Manual Session MonitoringΒΆ

from clustrix import start_cost_monitoring, generate_cost_report

# Start monitoring
monitor = start_cost_monitoring('azure')

# Run your workload
# ... your code here ...

# Generate report
report = generate_cost_report('azure', 'Standard_NC6s_v3')
print(f"Session cost: ${report['cost_estimate']['estimated_cost']:.2f}")

Cost OptimizationΒΆ

from clustrix import get_cost_monitor

monitor = get_cost_monitor('aws')

# Get pricing information
pricing = monitor.get_pricing_info()

# Compare spot vs on-demand pricing
on_demand = monitor.estimate_cost('p3.2xlarge', 1.0, use_spot=False)
spot = monitor.estimate_cost('p3.2xlarge', 1.0, use_spot=True)

savings = ((on_demand.hourly_rate - spot.hourly_rate) / on_demand.hourly_rate) * 100
print(f"Spot instance savings: {savings:.1f}%")

Regional Pricing ComparisonΒΆ

monitor = get_cost_monitor('gcp')

# Compare pricing across regions
regional_pricing = monitor.get_region_pricing_comparison('n2-standard-4')

for region, pricing in regional_pricing.items():
    print(f"{region}: ${pricing['on_demand_hourly']:.3f}/hour")

Error HandlingΒΆ

The cost monitoring system includes robust error handling:

from clustrix import get_cost_monitor

try:
    monitor = get_cost_monitor('unsupported_provider')
except ValueError as e:
    print(f"Provider not supported: {e}")

try:
    cost_estimate = monitor.estimate_cost('invalid_instance', 1.0)
except KeyError as e:
    print(f"Instance type not found: {e}")

Best PracticesΒΆ

  1. Use Decorators: For automatic tracking of cluster functions

  2. Monitor Long Jobs: Use manual monitoring for jobs over 1 hour

  3. Check Recommendations: Review cost optimization suggestions regularly

  4. Compare Pricing: Use regional and instance type comparisons

  5. Track Trends: Save reports to analyze cost trends over time

NotesΒΆ

  • Cost estimates are based on current public pricing and may vary

  • Resource utilization requires appropriate permissions on the target system

  • GPU monitoring requires nvidia-sml on the target system

  • Some cloud providers may have rate limits on pricing API calls

  • Spot/preemptible instance availability and pricing can change frequently