Cloud Cost Monitoring and Optimization

This tutorial demonstrates Clustrix’s comprehensive cost monitoring features for cloud platforms. Learn how to track expenses, optimize resource usage, and make informed decisions about cloud infrastructure.

Open In Colab

Overview

Clustrix provides built-in cost monitoring for multiple cloud platforms:

  • Amazon Web Services (AWS): EC2, ECS, Batch, Lambda, SageMaker

  • Google Cloud Platform (GCP): Compute Engine, GKE, Cloud Batch, Vertex AI

  • Microsoft Azure: Virtual Machines, AKS, Batch, ML Compute

  • Lambda Cloud: GPU instances for ML workloads

  • Hugging Face Spaces: Inference endpoints and Spaces hardware

Key Features

  • Automatic Cost Tracking: Decorator-based cost monitoring

  • Real-time Pricing: Up-to-date pricing information

  • Regional Comparisons: Find the most cost-effective regions

  • Optimization Recommendations: Automatic suggestions for cost savings

  • Multi-cloud Support: Compare costs across different providers

Installation

Install Clustrix with cost monitoring support:

[ ]:
# Install Clustrix
!pip install clustrix

# Import cost monitoring functions
from clustrix import (
    cost_tracking_decorator,
    get_cost_monitor,
    start_cost_monitoring,
    generate_cost_report,
    get_pricing_info
)

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time

Basic Cost Monitoring

Getting Pricing Information

[ ]:
# Get pricing information for different cloud providers
print("=== AWS EC2 Pricing (Top 10 Instance Types) ===")
aws_pricing = get_pricing_info('aws')
for instance_type, price in list(aws_pricing.items())[:10]:
    print(f"{instance_type:20} ${price:.4f}/hour")

print("\n=== GCP Compute Engine Pricing (Top 10 Instance Types) ===")
gcp_pricing = get_pricing_info('gcp')
for instance_type, price in list(gcp_pricing.items())[:10]:
    print(f"{instance_type:20} ${price:.4f}/hour")

print("\n=== Azure VM Pricing (Top 10 Instance Types) ===")
azure_pricing = get_pricing_info('azure')
for instance_type, price in list(azure_pricing.items())[:10]:
    print(f"{instance_type:20} ${price:.4f}/hour")

print(f"\nTotal instance types available:")
print(f"  AWS: {len(aws_pricing)}")
print(f"  GCP: {len(gcp_pricing)}")
print(f"  Azure: {len(azure_pricing)}")

Manual Cost Monitoring

[ ]:
# Example: Manual cost monitoring for a computation
def simulate_computation(duration_seconds=5):
    """Simulate a computation that takes some time."""
    start_time = time.time()

    # Simulate CPU-intensive work
    result = 0
    while time.time() - start_time < duration_seconds:
        result += np.random.random((1000, 1000)).sum()

    return result

# Monitor cost for AWS
print("=== AWS Cost Monitoring Example ===")
monitor = start_cost_monitoring('aws')

# Run computation
result = simulate_computation(3)

# Generate cost report
cost_report = generate_cost_report('aws', 't3.medium', duration_seconds=3)
print(f"Instance Type: {cost_report['instance_type']}")
print(f"Duration: {cost_report['duration_seconds']} seconds")
print(f"Hourly Rate: ${cost_report['cost_estimate']['hourly_rate']:.4f}")
print(f"Estimated Cost: ${cost_report['cost_estimate']['estimated_cost']:.6f}")

# Compare costs across providers for same duration
print("\n=== Cost Comparison Across Providers (3 seconds) ===")
providers_and_instances = [
    ('aws', 't3.medium'),
    ('gcp', 'n2-standard-2'),
    ('azure', 'Standard_D2s_v3')
]

for provider, instance in providers_and_instances:
    report = generate_cost_report(provider, instance, duration_seconds=3)
    print(f"{provider.upper():5} {instance:20} ${report['cost_estimate']['estimated_cost']:.6f}")

Automatic Cost Tracking with Decorators

The easiest way to track costs is using the @cost_tracking_decorator:

[ ]:
# Example 1: AWS Cost Tracking
@cost_tracking_decorator('aws', 't3.xlarge')
def aws_ml_training():
    """Example ML training with automatic AWS cost tracking."""
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.datasets import make_classification
    from sklearn.model_selection import train_test_split
    import time

    # Generate dataset
    X, y = make_classification(n_samples=10000, n_features=20, n_classes=3, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Train model
    start_time = time.time()
    model = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
    model.fit(X_train, y_train)
    training_time = time.time() - start_time

    # Evaluate
    accuracy = model.score(X_test, y_test)

    return {
        'accuracy': accuracy,
        'training_time': training_time,
        'samples_trained': len(X_train)
    }

# Example 2: GCP Cost Tracking
@cost_tracking_decorator('gcp', 'a2-highgpu-1g')
def gcp_gpu_computation():
    """Example GPU computation with automatic GCP cost tracking."""
    import numpy as np
    import time

    start_time = time.time()

    # Simulate GPU-intensive work
    matrices = []
    for i in range(10):
        A = np.random.rand(1000, 1000)
        B = np.random.rand(1000, 1000)
        C = np.dot(A, B)
        matrices.append(C)

    result = np.mean([m.sum() for m in matrices])
    computation_time = time.time() - start_time

    return {
        'result': result,
        'computation_time': computation_time,
        'matrices_processed': len(matrices)
    }

# Example 3: Azure Cost Tracking
@cost_tracking_decorator('azure', 'Standard_NC6')
def azure_deep_learning():
    """Example deep learning with automatic Azure cost tracking."""
    import numpy as np
    import time

    # Simulate neural network training
    start_time = time.time()

    # Simulate epochs
    losses = []
    for epoch in range(5):
        # Simulate batch processing
        batch_losses = []
        for batch in range(100):
            # Simulate forward and backward pass
            loss = np.random.exponential(1.0) * np.exp(-epoch * 0.1)
            batch_losses.append(loss)

        epoch_loss = np.mean(batch_losses)
        losses.append(epoch_loss)

    training_time = time.time() - start_time

    return {
        'final_loss': losses[-1],
        'all_losses': losses,
        'training_time': training_time,
        'epochs': len(losses)
    }

# Run examples and display costs
print("=== Running Cost-Tracked Functions ===")

# AWS Example
print("\n1. AWS ML Training:")
aws_result = aws_ml_training()
if aws_result['success']:
    print(f"   ✓ Accuracy: {aws_result['result']['accuracy']:.4f}")
    print(f"   ✓ Duration: {aws_result['cost_report']['duration_seconds']:.2f}s")
    print(f"   💰 Cost: ${aws_result['cost_report']['cost_estimate']['estimated_cost']:.6f}")

# GCP Example
print("\n2. GCP GPU Computation:")
gcp_result = gcp_gpu_computation()
if gcp_result['success']:
    print(f"   ✓ Matrices Processed: {gcp_result['result']['matrices_processed']}")
    print(f"   ✓ Duration: {gcp_result['cost_report']['duration_seconds']:.2f}s")
    print(f"   💰 Cost: ${gcp_result['cost_report']['cost_estimate']['estimated_cost']:.6f}")

# Azure Example
print("\n3. Azure Deep Learning:")
azure_result = azure_deep_learning()
if azure_result['success']:
    print(f"   ✓ Final Loss: {azure_result['result']['final_loss']:.4f}")
    print(f"   ✓ Duration: {azure_result['cost_report']['duration_seconds']:.2f}s")
    print(f"   💰 Cost: ${azure_result['cost_report']['cost_estimate']['estimated_cost']:.6f}")

Advanced Cost Analysis

Regional Pricing Comparison

[ ]:
# AWS Regional Pricing Comparison
aws_monitor = get_cost_monitor('aws')

print("=== AWS Regional Pricing Comparison (t3.large) ===")
instance_type = 't3.large'
regions = ['us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1', 'sa-east-1']

regional_prices = []
for region in regions:
    pricing = aws_monitor.get_region_pricing(region)
    if instance_type in pricing:
        price = pricing[instance_type]
        regional_prices.append((region, price))
        print(f"{region:15} ${price:.4f}/hour")

# Find cheapest and most expensive regions
regional_prices.sort(key=lambda x: x[1])
print(f"\nCheapest: {regional_prices[0][0]} (${regional_prices[0][1]:.4f}/hour)")
print(f"Most Expensive: {regional_prices[-1][0]} (${regional_prices[-1][1]:.4f}/hour)")
savings = (1 - regional_prices[0][1] / regional_prices[-1][1]) * 100
print(f"Potential Savings: {savings:.1f}%")

# GCP Regional Pricing Comparison
gcp_monitor = get_cost_monitor('gcp')

print("\n=== GCP Regional Pricing Comparison (n2-standard-4) ===")
gcp_regional_pricing = gcp_monitor.get_region_pricing_comparison('n2-standard-4')
for region, pricing in list(gcp_regional_pricing.items())[:5]:
    print(f"{region:20} On-Demand: ${pricing['on_demand_hourly']:.4f}/hr, "
          f"Preemptible: ${pricing['preemptible_hourly']:.4f}/hr")

Spot/Preemptible Instance Savings

[ ]:
# Compare on-demand vs spot/preemptible pricing
print("=== On-Demand vs Spot/Preemptible Pricing Comparison ===")

# AWS Spot Instances
print("\nAWS Spot Instances:")
aws_instances = ['t3.large', 'm5.xlarge', 'c5.2xlarge', 'r5.large']
for instance in aws_instances:
    on_demand = aws_monitor.estimate_cost(instance, 1.0)
    spot = aws_monitor.estimate_cost(instance, 1.0, use_spot=True)
    savings = (1 - spot.hourly_rate / on_demand.hourly_rate) * 100
    print(f"{instance:15} On-Demand: ${on_demand.hourly_rate:.4f}/hr, "
          f"Spot: ${spot.hourly_rate:.4f}/hr ({savings:.0f}% savings)")

# GCP Preemptible VMs
print("\nGCP Preemptible VMs:")
gcp_instances = ['n2-standard-4', 'c2-standard-4', 'n2-highmem-4', 'a2-highgpu-1g']
for instance in gcp_instances:
    on_demand = gcp_monitor.estimate_cost(instance, 1.0)
    preemptible = gcp_monitor.estimate_cost(instance, 1.0, use_preemptible=True)
    savings = (1 - preemptible.hourly_rate / on_demand.hourly_rate) * 100
    print(f"{instance:20} On-Demand: ${on_demand.hourly_rate:.4f}/hr, "
          f"Preemptible: ${preemptible.hourly_rate:.4f}/hr ({savings:.0f}% savings)")

# Azure Spot VMs
azure_monitor = get_cost_monitor('azure')
print("\nAzure Spot VMs:")
azure_instances = ['Standard_D4s_v3', 'Standard_E4s_v3', 'Standard_F4s_v2']
for instance in azure_instances:
    on_demand = azure_monitor.estimate_cost(instance, 1.0)
    spot = azure_monitor.estimate_cost(instance, 1.0, use_spot=True)
    savings = (1 - spot.hourly_rate / on_demand.hourly_rate) * 100
    print(f"{instance:20} On-Demand: ${on_demand.hourly_rate:.4f}/hr, "
          f"Spot: ${spot.hourly_rate:.4f}/hr ({savings:.0f}% savings)")

Batch Job Cost Estimation

[ ]:
# Estimate costs for batch processing jobs
def estimate_batch_job_costs(job_config):
    """Estimate costs for a batch processing job across multiple providers."""
    results = {}

    # AWS Batch
    aws_batch_cost = aws_monitor.estimate_batch_cost(
        job_name=job_config['name'],
        machine_type=job_config['aws_instance'],
        instance_count=job_config['instance_count'],
        estimated_duration_hours=job_config['duration_hours']
    )
    results['aws'] = aws_batch_cost

    # GCP Batch
    gcp_batch_cost = gcp_monitor.estimate_batch_cost(
        job_name=job_config['name'],
        machine_type=job_config['gcp_instance'],
        instance_count=job_config['instance_count'],
        estimated_duration_hours=job_config['duration_hours']
    )
    results['gcp'] = gcp_batch_cost

    # Azure Batch
    azure_batch_cost = azure_monitor.estimate_batch_cost(
        job_name=job_config['name'],
        machine_type=job_config['azure_instance'],
        instance_count=job_config['instance_count'],
        estimated_duration_hours=job_config['duration_hours']
    )
    results['azure'] = azure_batch_cost

    return results

# Example batch job configuration
batch_job = {
    'name': 'large-scale-data-processing',
    'instance_count': 50,
    'duration_hours': 4.5,
    'aws_instance': 'c5.4xlarge',
    'gcp_instance': 'c2-standard-16',
    'azure_instance': 'Standard_F16s_v2'
}

print("=== Batch Job Cost Estimation ===")
print(f"Job: {batch_job['name']}")
print(f"Instances: {batch_job['instance_count']}")
print(f"Duration: {batch_job['duration_hours']} hours\n")

batch_costs = estimate_batch_job_costs(batch_job)

for provider, cost_info in batch_costs.items():
    print(f"{provider.upper()}:")
    print(f"  Instance Type: {cost_info['machine_type']}")
    print(f"  Total Compute Hours: {cost_info['total_compute_hours']}")
    print(f"  Estimated Cost: ${cost_info['estimated_cost']:.2f}")
    print(f"  Cost per Instance-Hour: ${cost_info['cost_per_instance_hour']:.4f}")
    print()

# Find most cost-effective provider
cheapest = min(batch_costs.items(), key=lambda x: x[1]['estimated_cost'])
print(f"Most cost-effective: {cheapest[0].upper()} (${cheapest[1]['estimated_cost']:.2f})")

Cost Optimization Strategies

Sustained Use and Reserved Instance Analysis

[ ]:
# AWS Reserved Instance Savings
print("=== AWS Reserved Instance Savings Analysis ===")
instance_type = 'm5.xlarge'
monthly_hours = 720  # Full month

# Calculate costs for different commitment levels
on_demand_monthly = aws_monitor.estimate_cost(instance_type, monthly_hours).total_cost
ri_1yr_no_upfront = on_demand_monthly * 0.62  # ~38% discount
ri_3yr_no_upfront = on_demand_monthly * 0.50  # ~50% discount
ri_3yr_all_upfront = on_demand_monthly * 0.38  # ~62% discount

print(f"Instance Type: {instance_type}")
print(f"Monthly Usage: {monthly_hours} hours\n")
print(f"On-Demand:           ${on_demand_monthly:.2f}/month")
print(f"1-Year RI (No Up):   ${ri_1yr_no_upfront:.2f}/month (38% savings)")
print(f"3-Year RI (No Up):   ${ri_3yr_no_upfront:.2f}/month (50% savings)")
print(f"3-Year RI (All Up):  ${ri_3yr_all_upfront:.2f}/month (62% savings)")

# GCP Sustained Use Discounts
print("\n=== GCP Sustained Use Discount Analysis ===")
usage_levels = [25, 50, 75, 100]  # Percentage of month

for usage_pct in usage_levels:
    hours = (usage_pct / 100) * monthly_hours
    discount_info = gcp_monitor.estimate_sustained_use_discount(hours)

    base_cost = gcp_monitor.estimate_cost('n2-standard-4', hours).total_cost
    discounted_cost = base_cost * (1 - discount_info['discount_percentage'] / 100)

    print(f"{usage_pct}% usage ({hours:.0f} hours): "
          f"{discount_info['discount_percentage']:.0f}% discount, "
          f"${base_cost:.2f} → ${discounted_cost:.2f}")

# Azure Reserved Instance Analysis
print("\n=== Azure Reserved Instance Savings ===")
azure_instance = 'Standard_D4s_v3'
azure_on_demand = azure_monitor.estimate_cost(azure_instance, monthly_hours).total_cost

print(f"Instance Type: {azure_instance}")
print(f"On-Demand:        ${azure_on_demand:.2f}/month")
print(f"1-Year Reserved:  ${azure_on_demand * 0.62:.2f}/month (38% savings)")
print(f"3-Year Reserved:  ${azure_on_demand * 0.42:.2f}/month (58% savings)")

Workload-Specific Recommendations

[ ]:
def get_cost_optimization_recommendations(workload_type, requirements):
    """Get cost optimization recommendations based on workload characteristics."""
    recommendations = []

    if workload_type == 'batch_processing':
        recommendations.extend([
            "Use spot/preemptible instances for up to 80% savings",
            "Implement checkpointing to handle instance termination",
            "Consider time-flexible scheduling for lowest spot prices",
            "Use auto-scaling to optimize resource utilization"
        ])

    elif workload_type == 'ml_training':
        recommendations.extend([
            "Use GPU instances only when necessary",
            "Consider using preemptible GPUs for experimentation",
            "Implement gradient checkpointing for long training runs",
            "Use mixed precision training to reduce memory usage"
        ])

    elif workload_type == 'web_service':
        recommendations.extend([
            "Use reserved instances for predictable base load",
            "Implement auto-scaling for variable traffic",
            "Consider serverless options for sporadic workloads",
            "Use CDN to reduce compute requirements"
        ])

    elif workload_type == 'data_processing':
        recommendations.extend([
            "Use memory-optimized instances for in-memory processing",
            "Consider data locality to reduce transfer costs",
            "Implement data compression to reduce storage costs",
            "Use lifecycle policies to archive old data"
        ])

    # Add requirement-specific recommendations
    if requirements.get('fault_tolerant', False):
        recommendations.append("Leverage spot/preemptible instances aggressively")

    if requirements.get('gpu_required', False):
        recommendations.append("Compare GPU instance prices across regions and providers")

    if requirements.get('long_running', False):
        recommendations.append("Use reserved instances or committed use discounts")

    return recommendations

# Example workload analysis
print("=== Workload-Specific Cost Optimization Recommendations ===")

workloads = [
    {
        'type': 'batch_processing',
        'name': 'Nightly Data Pipeline',
        'requirements': {'fault_tolerant': True, 'gpu_required': False}
    },
    {
        'type': 'ml_training',
        'name': 'Deep Learning Model Training',
        'requirements': {'gpu_required': True, 'long_running': True}
    },
    {
        'type': 'web_service',
        'name': 'API Backend Service',
        'requirements': {'fault_tolerant': False, 'long_running': True}
    }
]

for workload in workloads:
    print(f"\n{workload['name']} ({workload['type']}):")
    recommendations = get_cost_optimization_recommendations(
        workload['type'],
        workload['requirements']
    )
    for i, rec in enumerate(recommendations, 1):
        print(f"  {i}. {rec}")

Visualizing Cost Data

Cost Comparison Charts

[ ]:
# Create cost comparison visualizations
import matplotlib.pyplot as plt
import numpy as np

# Prepare data for visualization
providers = ['AWS', 'GCP', 'Azure']
instance_types = {
    'AWS': ['t3.medium', 't3.large', 't3.xlarge', 'm5.large', 'm5.xlarge'],
    'GCP': ['n2-standard-2', 'n2-standard-4', 'n2-standard-8', 'n2-standard-16', 'n2-standard-32'],
    'Azure': ['Standard_D2s_v3', 'Standard_D4s_v3', 'Standard_D8s_v3', 'Standard_D16s_v3', 'Standard_D32s_v3']
}

# Collect pricing data
pricing_data = {}
for provider in providers:
    monitor = get_cost_monitor(provider.lower())
    prices = []
    for instance in instance_types[provider]:
        cost_estimate = monitor.estimate_cost(instance, 1.0)
        prices.append(cost_estimate.hourly_rate)
    pricing_data[provider] = prices

# Create comparison chart
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

# Bar chart comparison
x = np.arange(len(instance_types['AWS']))
width = 0.25

for i, provider in enumerate(providers):
    ax1.bar(x + i*width, pricing_data[provider], width, label=provider)

ax1.set_xlabel('Instance Size')
ax1.set_ylabel('Cost per Hour ($)')
ax1.set_title('Cloud Provider Cost Comparison by Instance Size')
ax1.set_xticks(x + width)
ax1.set_xticklabels(['Small', 'Medium', 'Large', 'XLarge', '2XLarge'])
ax1.legend()
ax1.grid(True, alpha=0.3)

# Spot vs On-Demand savings visualization
spot_savings = {
    'AWS': [65, 70, 72, 68, 71],
    'GCP': [60, 65, 68, 70, 72],
    'Azure': [58, 62, 65, 67, 70]
}

for i, provider in enumerate(providers):
    ax2.plot(instance_types[provider], spot_savings[provider],
             marker='o', linewidth=2, markersize=8, label=provider)

ax2.set_xlabel('Instance Type')
ax2.set_ylabel('Spot/Preemptible Savings (%)')
ax2.set_title('Spot Instance Savings by Provider')
ax2.legend()
ax2.grid(True, alpha=0.3)
ax2.set_xticklabels(['Small', 'Medium', 'Large', 'XLarge', '2XLarge'])

plt.tight_layout()
plt.show()

# Monthly cost projection
fig, ax = plt.subplots(figsize=(10, 6))

hours_per_day = np.arange(1, 25)
days_per_month = 30

for provider in providers:
    monitor = get_cost_monitor(provider.lower())
    instance = instance_types[provider][2]  # Large instance

    monthly_costs = []
    for hours in hours_per_day:
        total_hours = hours * days_per_month
        cost = monitor.estimate_cost(instance, total_hours).total_cost
        monthly_costs.append(cost)

    ax.plot(hours_per_day, monthly_costs, marker='o', label=f'{provider} ({instance})')

ax.set_xlabel('Hours per Day')
ax.set_ylabel('Monthly Cost ($)')
ax.set_title('Monthly Cost Projection by Daily Usage')
ax.legend()
ax.grid(True, alpha=0.3)

# Add cost threshold lines
budget_levels = [100, 500, 1000, 2000]
for budget in budget_levels:
    ax.axhline(y=budget, color='red', linestyle='--', alpha=0.5)
    ax.text(24.5, budget, f'${budget}', va='center')

plt.tight_layout()
plt.show()

Best Practices for Cost Optimization

1. Choose the Right Instance Type

  • Match instance specifications to workload requirements

  • Avoid over-provisioning resources

  • Use burstable instances for variable workloads

2. Leverage Spot/Preemptible Instances

  • Use for fault-tolerant batch processing

  • Implement checkpointing for long-running jobs

  • Mix on-demand and spot for reliability

3. Optimize for Your Usage Pattern

  • Reserved instances for steady-state workloads

  • Auto-scaling for variable demand

  • Scheduled scaling for predictable patterns

4. Monitor and Alert

  • Set up budget alerts

  • Use Clustrix cost tracking decorators

  • Regular cost reviews and optimization

5. Multi-Cloud Strategy

  • Compare prices across providers

  • Use each cloud’s strengths

  • Avoid vendor lock-in

Real-World Example: Cost-Optimized ML Pipeline

[ ]:
# Complete cost-optimized ML pipeline example
class CostOptimizedMLPipeline:
    """Example of a cost-aware ML pipeline using Clustrix."""

    def __init__(self, budget_limit=100.0):
        self.budget_limit = budget_limit
        self.total_cost = 0.0
        self.cost_history = []

    @cost_tracking_decorator('aws', 't3.medium')
    def preprocess_data(self, data_size_gb):
        """Preprocess data on cost-effective instances."""
        import time
        processing_time = data_size_gb * 0.5  # Simulate processing
        time.sleep(min(processing_time, 2))  # Cap at 2 seconds for demo
        return {'processed_records': data_size_gb * 1000000}

    @cost_tracking_decorator('aws', 'p3.2xlarge')
    def train_model(self, model_type='small'):
        """Train model on GPU instances."""
        import time
        training_times = {'small': 1, 'medium': 2, 'large': 3}
        time.sleep(training_times.get(model_type, 1))
        return {'model_accuracy': 0.85 + np.random.random() * 0.1}

    @cost_tracking_decorator('aws', 't3.small')
    def evaluate_model(self, test_size):
        """Evaluate model on small instances."""
        import time
        time.sleep(0.5)
        return {'test_accuracy': 0.82 + np.random.random() * 0.1}

    def run_pipeline(self, data_size_gb=10, model_type='small'):
        """Run complete pipeline with cost tracking."""
        print(f"Starting ML Pipeline (Budget: ${self.budget_limit})")
        results = {}

        # Step 1: Preprocess data
        print("\n1. Preprocessing data...")
        preprocess_result = self.preprocess_data(data_size_gb)
        if preprocess_result['success']:
            cost = preprocess_result['cost_report']['cost_estimate']['estimated_cost']
            self.total_cost += cost
            self.cost_history.append(('preprocessing', cost))
            print(f"   ✓ Processed {preprocess_result['result']['processed_records']:,} records")
            print(f"   💰 Cost: ${cost:.4f} (Total: ${self.total_cost:.4f})")

        # Check budget
        if self.total_cost > self.budget_limit:
            print(f"\n❌ Budget exceeded! Stopping pipeline.")
            return results

        # Step 2: Train model
        print("\n2. Training model...")
        train_result = self.train_model(model_type)
        if train_result['success']:
            cost = train_result['cost_report']['cost_estimate']['estimated_cost']
            self.total_cost += cost
            self.cost_history.append(('training', cost))
            print(f"   ✓ Model accuracy: {train_result['result']['model_accuracy']:.4f}")
            print(f"   💰 Cost: ${cost:.4f} (Total: ${self.total_cost:.4f})")

        # Check budget
        if self.total_cost > self.budget_limit:
            print(f"\n❌ Budget exceeded! Stopping pipeline.")
            return results

        # Step 3: Evaluate model
        print("\n3. Evaluating model...")
        eval_result = self.evaluate_model(1000)
        if eval_result['success']:
            cost = eval_result['cost_report']['cost_estimate']['estimated_cost']
            self.total_cost += cost
            self.cost_history.append(('evaluation', cost))
            print(f"   ✓ Test accuracy: {eval_result['result']['test_accuracy']:.4f}")
            print(f"   💰 Cost: ${cost:.4f} (Total: ${self.total_cost:.4f})")

        # Summary
        print("\n=== Pipeline Summary ===")
        print(f"Total Cost: ${self.total_cost:.4f}")
        print(f"Budget Remaining: ${self.budget_limit - self.total_cost:.4f}")
        print("\nCost Breakdown:")
        for step, cost in self.cost_history:
            pct = (cost / self.total_cost) * 100
            print(f"  {step:15} ${cost:.4f} ({pct:.1f}%)")

        return {
            'total_cost': self.total_cost,
            'cost_history': self.cost_history,
            'under_budget': self.total_cost <= self.budget_limit
        }

# Run the cost-optimized pipeline
pipeline = CostOptimizedMLPipeline(budget_limit=0.10)  # $0.10 budget for demo
results = pipeline.run_pipeline(data_size_gb=5, model_type='small')

print("\n✅ Pipeline completed successfully!" if results.get('under_budget', False)
      else "\n⚠️ Pipeline stopped due to budget constraints.")

Summary

This tutorial covered comprehensive cost monitoring and optimization with Clustrix:

Key Features Demonstrated

  1. Automatic Cost Tracking: Use @cost_tracking_decorator for seamless monitoring

  2. Manual Cost Monitoring: Fine-grained control with manual monitoring functions

  3. Multi-Cloud Support: Compare costs across AWS, GCP, Azure, and more

  4. Regional Pricing: Find the most cost-effective regions

  5. Spot/Preemptible Savings: Up to 80% cost reduction

  6. Batch Job Estimation: Plan and budget for large-scale processing

  7. Optimization Recommendations: Workload-specific cost-saving strategies

Best Practices

  • Always use cost tracking decorators for production workloads

  • Compare prices across providers and regions

  • Leverage spot/preemptible instances for fault-tolerant workloads

  • Use reserved instances for predictable, long-running workloads

  • Monitor costs continuously and set up budget alerts

  • Implement auto-scaling to match resources to demand

Next Steps

  1. Integrate cost monitoring into your existing workflows

  2. Set up budget alerts and cost anomaly detection

  3. Experiment with different instance types and pricing models

  4. Implement cost optimization recommendations

  5. Create cost dashboards for stakeholder visibility

Resources

Remember: Every dollar saved on cloud costs is a dollar that can be invested in innovation!