Cloud Cost Monitoring and Optimization¶
This tutorial demonstrates Clustrix’s comprehensive cost monitoring features for cloud platforms. Learn how to track expenses, optimize resource usage, and make informed decisions about cloud infrastructure.
Overview¶
Clustrix provides built-in cost monitoring for multiple cloud platforms:
Amazon Web Services (AWS): EC2, ECS, Batch, Lambda, SageMaker
Google Cloud Platform (GCP): Compute Engine, GKE, Cloud Batch, Vertex AI
Microsoft Azure: Virtual Machines, AKS, Batch, ML Compute
Lambda Cloud: GPU instances for ML workloads
Hugging Face Spaces: Inference endpoints and Spaces hardware
Key Features¶
Automatic Cost Tracking: Decorator-based cost monitoring
Real-time Pricing: Up-to-date pricing information
Regional Comparisons: Find the most cost-effective regions
Optimization Recommendations: Automatic suggestions for cost savings
Multi-cloud Support: Compare costs across different providers
Installation¶
Install Clustrix with cost monitoring support:
[ ]:
# Install Clustrix
!pip install clustrix
# Import cost monitoring functions
from clustrix import (
cost_tracking_decorator,
get_cost_monitor,
start_cost_monitoring,
generate_cost_report,
get_pricing_info
)
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
Basic Cost Monitoring¶
Getting Pricing Information¶
[ ]:
# Get pricing information for different cloud providers
print("=== AWS EC2 Pricing (Top 10 Instance Types) ===")
aws_pricing = get_pricing_info('aws')
for instance_type, price in list(aws_pricing.items())[:10]:
print(f"{instance_type:20} ${price:.4f}/hour")
print("\n=== GCP Compute Engine Pricing (Top 10 Instance Types) ===")
gcp_pricing = get_pricing_info('gcp')
for instance_type, price in list(gcp_pricing.items())[:10]:
print(f"{instance_type:20} ${price:.4f}/hour")
print("\n=== Azure VM Pricing (Top 10 Instance Types) ===")
azure_pricing = get_pricing_info('azure')
for instance_type, price in list(azure_pricing.items())[:10]:
print(f"{instance_type:20} ${price:.4f}/hour")
print(f"\nTotal instance types available:")
print(f" AWS: {len(aws_pricing)}")
print(f" GCP: {len(gcp_pricing)}")
print(f" Azure: {len(azure_pricing)}")
Manual Cost Monitoring¶
[ ]:
# Example: Manual cost monitoring for a computation
def simulate_computation(duration_seconds=5):
"""Simulate a computation that takes some time."""
start_time = time.time()
# Simulate CPU-intensive work
result = 0
while time.time() - start_time < duration_seconds:
result += np.random.random((1000, 1000)).sum()
return result
# Monitor cost for AWS
print("=== AWS Cost Monitoring Example ===")
monitor = start_cost_monitoring('aws')
# Run computation
result = simulate_computation(3)
# Generate cost report
cost_report = generate_cost_report('aws', 't3.medium', duration_seconds=3)
print(f"Instance Type: {cost_report['instance_type']}")
print(f"Duration: {cost_report['duration_seconds']} seconds")
print(f"Hourly Rate: ${cost_report['cost_estimate']['hourly_rate']:.4f}")
print(f"Estimated Cost: ${cost_report['cost_estimate']['estimated_cost']:.6f}")
# Compare costs across providers for same duration
print("\n=== Cost Comparison Across Providers (3 seconds) ===")
providers_and_instances = [
('aws', 't3.medium'),
('gcp', 'n2-standard-2'),
('azure', 'Standard_D2s_v3')
]
for provider, instance in providers_and_instances:
report = generate_cost_report(provider, instance, duration_seconds=3)
print(f"{provider.upper():5} {instance:20} ${report['cost_estimate']['estimated_cost']:.6f}")
Automatic Cost Tracking with Decorators¶
The easiest way to track costs is using the @cost_tracking_decorator:
[ ]:
# Example 1: AWS Cost Tracking
@cost_tracking_decorator('aws', 't3.xlarge')
def aws_ml_training():
"""Example ML training with automatic AWS cost tracking."""
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
import time
# Generate dataset
X, y = make_classification(n_samples=10000, n_features=20, n_classes=3, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train model
start_time = time.time()
model = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
model.fit(X_train, y_train)
training_time = time.time() - start_time
# Evaluate
accuracy = model.score(X_test, y_test)
return {
'accuracy': accuracy,
'training_time': training_time,
'samples_trained': len(X_train)
}
# Example 2: GCP Cost Tracking
@cost_tracking_decorator('gcp', 'a2-highgpu-1g')
def gcp_gpu_computation():
"""Example GPU computation with automatic GCP cost tracking."""
import numpy as np
import time
start_time = time.time()
# Simulate GPU-intensive work
matrices = []
for i in range(10):
A = np.random.rand(1000, 1000)
B = np.random.rand(1000, 1000)
C = np.dot(A, B)
matrices.append(C)
result = np.mean([m.sum() for m in matrices])
computation_time = time.time() - start_time
return {
'result': result,
'computation_time': computation_time,
'matrices_processed': len(matrices)
}
# Example 3: Azure Cost Tracking
@cost_tracking_decorator('azure', 'Standard_NC6')
def azure_deep_learning():
"""Example deep learning with automatic Azure cost tracking."""
import numpy as np
import time
# Simulate neural network training
start_time = time.time()
# Simulate epochs
losses = []
for epoch in range(5):
# Simulate batch processing
batch_losses = []
for batch in range(100):
# Simulate forward and backward pass
loss = np.random.exponential(1.0) * np.exp(-epoch * 0.1)
batch_losses.append(loss)
epoch_loss = np.mean(batch_losses)
losses.append(epoch_loss)
training_time = time.time() - start_time
return {
'final_loss': losses[-1],
'all_losses': losses,
'training_time': training_time,
'epochs': len(losses)
}
# Run examples and display costs
print("=== Running Cost-Tracked Functions ===")
# AWS Example
print("\n1. AWS ML Training:")
aws_result = aws_ml_training()
if aws_result['success']:
print(f" ✓ Accuracy: {aws_result['result']['accuracy']:.4f}")
print(f" ✓ Duration: {aws_result['cost_report']['duration_seconds']:.2f}s")
print(f" 💰 Cost: ${aws_result['cost_report']['cost_estimate']['estimated_cost']:.6f}")
# GCP Example
print("\n2. GCP GPU Computation:")
gcp_result = gcp_gpu_computation()
if gcp_result['success']:
print(f" ✓ Matrices Processed: {gcp_result['result']['matrices_processed']}")
print(f" ✓ Duration: {gcp_result['cost_report']['duration_seconds']:.2f}s")
print(f" 💰 Cost: ${gcp_result['cost_report']['cost_estimate']['estimated_cost']:.6f}")
# Azure Example
print("\n3. Azure Deep Learning:")
azure_result = azure_deep_learning()
if azure_result['success']:
print(f" ✓ Final Loss: {azure_result['result']['final_loss']:.4f}")
print(f" ✓ Duration: {azure_result['cost_report']['duration_seconds']:.2f}s")
print(f" 💰 Cost: ${azure_result['cost_report']['cost_estimate']['estimated_cost']:.6f}")
Advanced Cost Analysis¶
Regional Pricing Comparison¶
[ ]:
# AWS Regional Pricing Comparison
aws_monitor = get_cost_monitor('aws')
print("=== AWS Regional Pricing Comparison (t3.large) ===")
instance_type = 't3.large'
regions = ['us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1', 'sa-east-1']
regional_prices = []
for region in regions:
pricing = aws_monitor.get_region_pricing(region)
if instance_type in pricing:
price = pricing[instance_type]
regional_prices.append((region, price))
print(f"{region:15} ${price:.4f}/hour")
# Find cheapest and most expensive regions
regional_prices.sort(key=lambda x: x[1])
print(f"\nCheapest: {regional_prices[0][0]} (${regional_prices[0][1]:.4f}/hour)")
print(f"Most Expensive: {regional_prices[-1][0]} (${regional_prices[-1][1]:.4f}/hour)")
savings = (1 - regional_prices[0][1] / regional_prices[-1][1]) * 100
print(f"Potential Savings: {savings:.1f}%")
# GCP Regional Pricing Comparison
gcp_monitor = get_cost_monitor('gcp')
print("\n=== GCP Regional Pricing Comparison (n2-standard-4) ===")
gcp_regional_pricing = gcp_monitor.get_region_pricing_comparison('n2-standard-4')
for region, pricing in list(gcp_regional_pricing.items())[:5]:
print(f"{region:20} On-Demand: ${pricing['on_demand_hourly']:.4f}/hr, "
f"Preemptible: ${pricing['preemptible_hourly']:.4f}/hr")
Spot/Preemptible Instance Savings¶
[ ]:
# Compare on-demand vs spot/preemptible pricing
print("=== On-Demand vs Spot/Preemptible Pricing Comparison ===")
# AWS Spot Instances
print("\nAWS Spot Instances:")
aws_instances = ['t3.large', 'm5.xlarge', 'c5.2xlarge', 'r5.large']
for instance in aws_instances:
on_demand = aws_monitor.estimate_cost(instance, 1.0)
spot = aws_monitor.estimate_cost(instance, 1.0, use_spot=True)
savings = (1 - spot.hourly_rate / on_demand.hourly_rate) * 100
print(f"{instance:15} On-Demand: ${on_demand.hourly_rate:.4f}/hr, "
f"Spot: ${spot.hourly_rate:.4f}/hr ({savings:.0f}% savings)")
# GCP Preemptible VMs
print("\nGCP Preemptible VMs:")
gcp_instances = ['n2-standard-4', 'c2-standard-4', 'n2-highmem-4', 'a2-highgpu-1g']
for instance in gcp_instances:
on_demand = gcp_monitor.estimate_cost(instance, 1.0)
preemptible = gcp_monitor.estimate_cost(instance, 1.0, use_preemptible=True)
savings = (1 - preemptible.hourly_rate / on_demand.hourly_rate) * 100
print(f"{instance:20} On-Demand: ${on_demand.hourly_rate:.4f}/hr, "
f"Preemptible: ${preemptible.hourly_rate:.4f}/hr ({savings:.0f}% savings)")
# Azure Spot VMs
azure_monitor = get_cost_monitor('azure')
print("\nAzure Spot VMs:")
azure_instances = ['Standard_D4s_v3', 'Standard_E4s_v3', 'Standard_F4s_v2']
for instance in azure_instances:
on_demand = azure_monitor.estimate_cost(instance, 1.0)
spot = azure_monitor.estimate_cost(instance, 1.0, use_spot=True)
savings = (1 - spot.hourly_rate / on_demand.hourly_rate) * 100
print(f"{instance:20} On-Demand: ${on_demand.hourly_rate:.4f}/hr, "
f"Spot: ${spot.hourly_rate:.4f}/hr ({savings:.0f}% savings)")
Batch Job Cost Estimation¶
[ ]:
# Estimate costs for batch processing jobs
def estimate_batch_job_costs(job_config):
"""Estimate costs for a batch processing job across multiple providers."""
results = {}
# AWS Batch
aws_batch_cost = aws_monitor.estimate_batch_cost(
job_name=job_config['name'],
machine_type=job_config['aws_instance'],
instance_count=job_config['instance_count'],
estimated_duration_hours=job_config['duration_hours']
)
results['aws'] = aws_batch_cost
# GCP Batch
gcp_batch_cost = gcp_monitor.estimate_batch_cost(
job_name=job_config['name'],
machine_type=job_config['gcp_instance'],
instance_count=job_config['instance_count'],
estimated_duration_hours=job_config['duration_hours']
)
results['gcp'] = gcp_batch_cost
# Azure Batch
azure_batch_cost = azure_monitor.estimate_batch_cost(
job_name=job_config['name'],
machine_type=job_config['azure_instance'],
instance_count=job_config['instance_count'],
estimated_duration_hours=job_config['duration_hours']
)
results['azure'] = azure_batch_cost
return results
# Example batch job configuration
batch_job = {
'name': 'large-scale-data-processing',
'instance_count': 50,
'duration_hours': 4.5,
'aws_instance': 'c5.4xlarge',
'gcp_instance': 'c2-standard-16',
'azure_instance': 'Standard_F16s_v2'
}
print("=== Batch Job Cost Estimation ===")
print(f"Job: {batch_job['name']}")
print(f"Instances: {batch_job['instance_count']}")
print(f"Duration: {batch_job['duration_hours']} hours\n")
batch_costs = estimate_batch_job_costs(batch_job)
for provider, cost_info in batch_costs.items():
print(f"{provider.upper()}:")
print(f" Instance Type: {cost_info['machine_type']}")
print(f" Total Compute Hours: {cost_info['total_compute_hours']}")
print(f" Estimated Cost: ${cost_info['estimated_cost']:.2f}")
print(f" Cost per Instance-Hour: ${cost_info['cost_per_instance_hour']:.4f}")
print()
# Find most cost-effective provider
cheapest = min(batch_costs.items(), key=lambda x: x[1]['estimated_cost'])
print(f"Most cost-effective: {cheapest[0].upper()} (${cheapest[1]['estimated_cost']:.2f})")
Cost Optimization Strategies¶
Sustained Use and Reserved Instance Analysis¶
[ ]:
# AWS Reserved Instance Savings
print("=== AWS Reserved Instance Savings Analysis ===")
instance_type = 'm5.xlarge'
monthly_hours = 720 # Full month
# Calculate costs for different commitment levels
on_demand_monthly = aws_monitor.estimate_cost(instance_type, monthly_hours).total_cost
ri_1yr_no_upfront = on_demand_monthly * 0.62 # ~38% discount
ri_3yr_no_upfront = on_demand_monthly * 0.50 # ~50% discount
ri_3yr_all_upfront = on_demand_monthly * 0.38 # ~62% discount
print(f"Instance Type: {instance_type}")
print(f"Monthly Usage: {monthly_hours} hours\n")
print(f"On-Demand: ${on_demand_monthly:.2f}/month")
print(f"1-Year RI (No Up): ${ri_1yr_no_upfront:.2f}/month (38% savings)")
print(f"3-Year RI (No Up): ${ri_3yr_no_upfront:.2f}/month (50% savings)")
print(f"3-Year RI (All Up): ${ri_3yr_all_upfront:.2f}/month (62% savings)")
# GCP Sustained Use Discounts
print("\n=== GCP Sustained Use Discount Analysis ===")
usage_levels = [25, 50, 75, 100] # Percentage of month
for usage_pct in usage_levels:
hours = (usage_pct / 100) * monthly_hours
discount_info = gcp_monitor.estimate_sustained_use_discount(hours)
base_cost = gcp_monitor.estimate_cost('n2-standard-4', hours).total_cost
discounted_cost = base_cost * (1 - discount_info['discount_percentage'] / 100)
print(f"{usage_pct}% usage ({hours:.0f} hours): "
f"{discount_info['discount_percentage']:.0f}% discount, "
f"${base_cost:.2f} → ${discounted_cost:.2f}")
# Azure Reserved Instance Analysis
print("\n=== Azure Reserved Instance Savings ===")
azure_instance = 'Standard_D4s_v3'
azure_on_demand = azure_monitor.estimate_cost(azure_instance, monthly_hours).total_cost
print(f"Instance Type: {azure_instance}")
print(f"On-Demand: ${azure_on_demand:.2f}/month")
print(f"1-Year Reserved: ${azure_on_demand * 0.62:.2f}/month (38% savings)")
print(f"3-Year Reserved: ${azure_on_demand * 0.42:.2f}/month (58% savings)")
Workload-Specific Recommendations¶
[ ]:
def get_cost_optimization_recommendations(workload_type, requirements):
"""Get cost optimization recommendations based on workload characteristics."""
recommendations = []
if workload_type == 'batch_processing':
recommendations.extend([
"Use spot/preemptible instances for up to 80% savings",
"Implement checkpointing to handle instance termination",
"Consider time-flexible scheduling for lowest spot prices",
"Use auto-scaling to optimize resource utilization"
])
elif workload_type == 'ml_training':
recommendations.extend([
"Use GPU instances only when necessary",
"Consider using preemptible GPUs for experimentation",
"Implement gradient checkpointing for long training runs",
"Use mixed precision training to reduce memory usage"
])
elif workload_type == 'web_service':
recommendations.extend([
"Use reserved instances for predictable base load",
"Implement auto-scaling for variable traffic",
"Consider serverless options for sporadic workloads",
"Use CDN to reduce compute requirements"
])
elif workload_type == 'data_processing':
recommendations.extend([
"Use memory-optimized instances for in-memory processing",
"Consider data locality to reduce transfer costs",
"Implement data compression to reduce storage costs",
"Use lifecycle policies to archive old data"
])
# Add requirement-specific recommendations
if requirements.get('fault_tolerant', False):
recommendations.append("Leverage spot/preemptible instances aggressively")
if requirements.get('gpu_required', False):
recommendations.append("Compare GPU instance prices across regions and providers")
if requirements.get('long_running', False):
recommendations.append("Use reserved instances or committed use discounts")
return recommendations
# Example workload analysis
print("=== Workload-Specific Cost Optimization Recommendations ===")
workloads = [
{
'type': 'batch_processing',
'name': 'Nightly Data Pipeline',
'requirements': {'fault_tolerant': True, 'gpu_required': False}
},
{
'type': 'ml_training',
'name': 'Deep Learning Model Training',
'requirements': {'gpu_required': True, 'long_running': True}
},
{
'type': 'web_service',
'name': 'API Backend Service',
'requirements': {'fault_tolerant': False, 'long_running': True}
}
]
for workload in workloads:
print(f"\n{workload['name']} ({workload['type']}):")
recommendations = get_cost_optimization_recommendations(
workload['type'],
workload['requirements']
)
for i, rec in enumerate(recommendations, 1):
print(f" {i}. {rec}")
Visualizing Cost Data¶
Cost Comparison Charts¶
[ ]:
# Create cost comparison visualizations
import matplotlib.pyplot as plt
import numpy as np
# Prepare data for visualization
providers = ['AWS', 'GCP', 'Azure']
instance_types = {
'AWS': ['t3.medium', 't3.large', 't3.xlarge', 'm5.large', 'm5.xlarge'],
'GCP': ['n2-standard-2', 'n2-standard-4', 'n2-standard-8', 'n2-standard-16', 'n2-standard-32'],
'Azure': ['Standard_D2s_v3', 'Standard_D4s_v3', 'Standard_D8s_v3', 'Standard_D16s_v3', 'Standard_D32s_v3']
}
# Collect pricing data
pricing_data = {}
for provider in providers:
monitor = get_cost_monitor(provider.lower())
prices = []
for instance in instance_types[provider]:
cost_estimate = monitor.estimate_cost(instance, 1.0)
prices.append(cost_estimate.hourly_rate)
pricing_data[provider] = prices
# Create comparison chart
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# Bar chart comparison
x = np.arange(len(instance_types['AWS']))
width = 0.25
for i, provider in enumerate(providers):
ax1.bar(x + i*width, pricing_data[provider], width, label=provider)
ax1.set_xlabel('Instance Size')
ax1.set_ylabel('Cost per Hour ($)')
ax1.set_title('Cloud Provider Cost Comparison by Instance Size')
ax1.set_xticks(x + width)
ax1.set_xticklabels(['Small', 'Medium', 'Large', 'XLarge', '2XLarge'])
ax1.legend()
ax1.grid(True, alpha=0.3)
# Spot vs On-Demand savings visualization
spot_savings = {
'AWS': [65, 70, 72, 68, 71],
'GCP': [60, 65, 68, 70, 72],
'Azure': [58, 62, 65, 67, 70]
}
for i, provider in enumerate(providers):
ax2.plot(instance_types[provider], spot_savings[provider],
marker='o', linewidth=2, markersize=8, label=provider)
ax2.set_xlabel('Instance Type')
ax2.set_ylabel('Spot/Preemptible Savings (%)')
ax2.set_title('Spot Instance Savings by Provider')
ax2.legend()
ax2.grid(True, alpha=0.3)
ax2.set_xticklabels(['Small', 'Medium', 'Large', 'XLarge', '2XLarge'])
plt.tight_layout()
plt.show()
# Monthly cost projection
fig, ax = plt.subplots(figsize=(10, 6))
hours_per_day = np.arange(1, 25)
days_per_month = 30
for provider in providers:
monitor = get_cost_monitor(provider.lower())
instance = instance_types[provider][2] # Large instance
monthly_costs = []
for hours in hours_per_day:
total_hours = hours * days_per_month
cost = monitor.estimate_cost(instance, total_hours).total_cost
monthly_costs.append(cost)
ax.plot(hours_per_day, monthly_costs, marker='o', label=f'{provider} ({instance})')
ax.set_xlabel('Hours per Day')
ax.set_ylabel('Monthly Cost ($)')
ax.set_title('Monthly Cost Projection by Daily Usage')
ax.legend()
ax.grid(True, alpha=0.3)
# Add cost threshold lines
budget_levels = [100, 500, 1000, 2000]
for budget in budget_levels:
ax.axhline(y=budget, color='red', linestyle='--', alpha=0.5)
ax.text(24.5, budget, f'${budget}', va='center')
plt.tight_layout()
plt.show()
Best Practices for Cost Optimization¶
1. Choose the Right Instance Type¶
Match instance specifications to workload requirements
Avoid over-provisioning resources
Use burstable instances for variable workloads
2. Leverage Spot/Preemptible Instances¶
Use for fault-tolerant batch processing
Implement checkpointing for long-running jobs
Mix on-demand and spot for reliability
3. Optimize for Your Usage Pattern¶
Reserved instances for steady-state workloads
Auto-scaling for variable demand
Scheduled scaling for predictable patterns
4. Monitor and Alert¶
Set up budget alerts
Use Clustrix cost tracking decorators
Regular cost reviews and optimization
5. Multi-Cloud Strategy¶
Compare prices across providers
Use each cloud’s strengths
Avoid vendor lock-in
Real-World Example: Cost-Optimized ML Pipeline¶
[ ]:
# Complete cost-optimized ML pipeline example
class CostOptimizedMLPipeline:
"""Example of a cost-aware ML pipeline using Clustrix."""
def __init__(self, budget_limit=100.0):
self.budget_limit = budget_limit
self.total_cost = 0.0
self.cost_history = []
@cost_tracking_decorator('aws', 't3.medium')
def preprocess_data(self, data_size_gb):
"""Preprocess data on cost-effective instances."""
import time
processing_time = data_size_gb * 0.5 # Simulate processing
time.sleep(min(processing_time, 2)) # Cap at 2 seconds for demo
return {'processed_records': data_size_gb * 1000000}
@cost_tracking_decorator('aws', 'p3.2xlarge')
def train_model(self, model_type='small'):
"""Train model on GPU instances."""
import time
training_times = {'small': 1, 'medium': 2, 'large': 3}
time.sleep(training_times.get(model_type, 1))
return {'model_accuracy': 0.85 + np.random.random() * 0.1}
@cost_tracking_decorator('aws', 't3.small')
def evaluate_model(self, test_size):
"""Evaluate model on small instances."""
import time
time.sleep(0.5)
return {'test_accuracy': 0.82 + np.random.random() * 0.1}
def run_pipeline(self, data_size_gb=10, model_type='small'):
"""Run complete pipeline with cost tracking."""
print(f"Starting ML Pipeline (Budget: ${self.budget_limit})")
results = {}
# Step 1: Preprocess data
print("\n1. Preprocessing data...")
preprocess_result = self.preprocess_data(data_size_gb)
if preprocess_result['success']:
cost = preprocess_result['cost_report']['cost_estimate']['estimated_cost']
self.total_cost += cost
self.cost_history.append(('preprocessing', cost))
print(f" ✓ Processed {preprocess_result['result']['processed_records']:,} records")
print(f" 💰 Cost: ${cost:.4f} (Total: ${self.total_cost:.4f})")
# Check budget
if self.total_cost > self.budget_limit:
print(f"\n❌ Budget exceeded! Stopping pipeline.")
return results
# Step 2: Train model
print("\n2. Training model...")
train_result = self.train_model(model_type)
if train_result['success']:
cost = train_result['cost_report']['cost_estimate']['estimated_cost']
self.total_cost += cost
self.cost_history.append(('training', cost))
print(f" ✓ Model accuracy: {train_result['result']['model_accuracy']:.4f}")
print(f" 💰 Cost: ${cost:.4f} (Total: ${self.total_cost:.4f})")
# Check budget
if self.total_cost > self.budget_limit:
print(f"\n❌ Budget exceeded! Stopping pipeline.")
return results
# Step 3: Evaluate model
print("\n3. Evaluating model...")
eval_result = self.evaluate_model(1000)
if eval_result['success']:
cost = eval_result['cost_report']['cost_estimate']['estimated_cost']
self.total_cost += cost
self.cost_history.append(('evaluation', cost))
print(f" ✓ Test accuracy: {eval_result['result']['test_accuracy']:.4f}")
print(f" 💰 Cost: ${cost:.4f} (Total: ${self.total_cost:.4f})")
# Summary
print("\n=== Pipeline Summary ===")
print(f"Total Cost: ${self.total_cost:.4f}")
print(f"Budget Remaining: ${self.budget_limit - self.total_cost:.4f}")
print("\nCost Breakdown:")
for step, cost in self.cost_history:
pct = (cost / self.total_cost) * 100
print(f" {step:15} ${cost:.4f} ({pct:.1f}%)")
return {
'total_cost': self.total_cost,
'cost_history': self.cost_history,
'under_budget': self.total_cost <= self.budget_limit
}
# Run the cost-optimized pipeline
pipeline = CostOptimizedMLPipeline(budget_limit=0.10) # $0.10 budget for demo
results = pipeline.run_pipeline(data_size_gb=5, model_type='small')
print("\n✅ Pipeline completed successfully!" if results.get('under_budget', False)
else "\n⚠️ Pipeline stopped due to budget constraints.")
Summary¶
This tutorial covered comprehensive cost monitoring and optimization with Clustrix:
Key Features Demonstrated¶
Automatic Cost Tracking: Use
@cost_tracking_decoratorfor seamless monitoringManual Cost Monitoring: Fine-grained control with manual monitoring functions
Multi-Cloud Support: Compare costs across AWS, GCP, Azure, and more
Regional Pricing: Find the most cost-effective regions
Spot/Preemptible Savings: Up to 80% cost reduction
Batch Job Estimation: Plan and budget for large-scale processing
Optimization Recommendations: Workload-specific cost-saving strategies
Best Practices¶
Always use cost tracking decorators for production workloads
Compare prices across providers and regions
Leverage spot/preemptible instances for fault-tolerant workloads
Use reserved instances for predictable, long-running workloads
Monitor costs continuously and set up budget alerts
Implement auto-scaling to match resources to demand
Next Steps¶
Integrate cost monitoring into your existing workflows
Set up budget alerts and cost anomaly detection
Experiment with different instance types and pricing models
Implement cost optimization recommendations
Create cost dashboards for stakeholder visibility
Resources¶
Remember: Every dollar saved on cloud costs is a dollar that can be invested in innovation!