Kubernetes TutorialΒΆ
This tutorial demonstrates how to use Clustrix with Kubernetes clusters for containerized distributed computing.
PrerequisitesΒΆ
Access to a Kubernetes cluster
kubectl configured for your cluster
Clustrix installed with Kubernetes support:
pip install clustrix[kubernetes]
[ ]:
# Install Clustrix with Kubernetes support (uncomment if needed)
# !pip install clustrix[kubernetes]
import clustrix
from clustrix import cluster, configure
import numpy as np
Kubernetes ConfigurationΒΆ
Configure Clustrix for your Kubernetes cluster:
[ ]:
# Configure for Kubernetes cluster
configure(
cluster_type="kubernetes",
# Kubernetes-specific settings
k8s_namespace="default", # Kubernetes namespace
k8s_config_file="~/.kube/config", # Path to kubeconfig
# Default resource requirements
default_cores=2,
default_memory="4Gi", # Kubernetes format (Gi, Mi)
default_cpu_limit=4, # CPU limit (can be > cores)
default_memory_limit="8Gi", # Memory limit
# Container settings
container_image="python:3.11-slim", # Base Python image
image_pull_policy="IfNotPresent", # Image pull policy
# Job settings
job_ttl_seconds=3600, # Job cleanup after 1 hour
backoff_limit=3, # Retry failed jobs up to 3 times
# Cleanup
cleanup_on_success=True,
max_parallel_jobs=20
)
print("Kubernetes cluster configured successfully!")
Example 1: Containerized Machine LearningΒΆ
Train machine learning models in Kubernetes pods:
[ ]:
@cluster(
cores=4,
memory="8Gi",
cpu_limit=6,
memory_limit="12Gi",
container_image="python:3.11",
job_name="ml-training" # Custom job name
)
def distributed_ml_training(model_type="random_forest", n_estimators=200, dataset_size=50000):
"""
Distributed machine learning training in Kubernetes.
"""
import numpy as np
import os
import json
from datetime import datetime
# Install required packages within the container
os.system("pip install scikit-learn pandas numpy")
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import StandardScaler
import pandas as pd
print(f"Starting ML training: {model_type}, {n_estimators} estimators, {dataset_size:,} samples")
print(f"Pod started at: {datetime.now()}")
# Generate synthetic dataset
print("Generating synthetic dataset...")
X, y = make_classification(
n_samples=dataset_size,
n_features=50,
n_informative=30,
n_redundant=10,
n_classes=3,
n_clusters_per_class=2,
flip_y=0.05, # Add some noise
random_state=42
)
# Split the data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Feature scaling for SVM and MLP
if model_type in ['svm', 'mlp']:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
print(f"Dataset: {X_train.shape[0]:,} training, {X_test.shape[0]:,} test samples")
# Model selection and configuration
models = {
'random_forest': {
'model': RandomForestClassifier,
'params': {
'n_estimators': n_estimators,
'max_depth': 20,
'min_samples_split': 5,
'min_samples_leaf': 2,
'n_jobs': -1,
'random_state': 42
},
'param_grid': {
'max_depth': [15, 20, 25],
'min_samples_split': [2, 5, 10]
}
},
'gradient_boosting': {
'model': GradientBoostingClassifier,
'params': {
'n_estimators': n_estimators,
'learning_rate': 0.1,
'max_depth': 6,
'random_state': 42
},
'param_grid': {
'learning_rate': [0.05, 0.1, 0.2],
'max_depth': [4, 6, 8]
}
},
'svm': {
'model': SVC,
'params': {
'kernel': 'rbf',
'C': 1.0,
'gamma': 'scale',
'random_state': 42
},
'param_grid': {
'C': [0.1, 1.0, 10.0],
'gamma': ['scale', 'auto']
}
},
'mlp': {
'model': MLPClassifier,
'params': {
'hidden_layer_sizes': (100, 50),
'activation': 'relu',
'solver': 'adam',
'alpha': 0.0001,
'max_iter': 1000,
'random_state': 42
},
'param_grid': {
'hidden_layer_sizes': [(50,), (100,), (100, 50)],
'alpha': [0.0001, 0.001, 0.01]
}
}
}
if model_type not in models:
model_type = 'random_forest' # Default fallback
model_config = models[model_type]
# Train base model
print(f"Training {model_type} model...")
start_time = datetime.now()
base_model = model_config['model'](**model_config['params'])
base_model.fit(X_train, y_train)
training_time = (datetime.now() - start_time).total_seconds()
print(f"Base model training completed in {training_time:.2f} seconds")
# Base model evaluation
y_pred = base_model.predict(X_test)
base_accuracy = accuracy_score(y_test, y_pred)
base_precision = precision_score(y_test, y_pred, average='weighted')
base_recall = recall_score(y_test, y_pred, average='weighted')
base_f1 = f1_score(y_test, y_pred, average='weighted')
print(f"Base model performance: Accuracy={base_accuracy:.4f}")
# Cross-validation
print("Performing cross-validation...")
cv_scores = cross_val_score(base_model, X_train, y_train, cv=5, n_jobs=-1)
# Hyperparameter optimization
print("Optimizing hyperparameters...")
grid_search = GridSearchCV(
model_config['model'](),
model_config['param_grid'],
cv=3,
scoring='accuracy',
n_jobs=-1,
verbose=0
)
grid_search.fit(X_train, y_train)
best_model = grid_search.best_estimator_
# Best model evaluation
y_pred_best = best_model.predict(X_test)
best_accuracy = accuracy_score(y_test, y_pred_best)
best_precision = precision_score(y_test, y_pred_best, average='weighted')
best_recall = recall_score(y_test, y_pred_best, average='weighted')
best_f1 = f1_score(y_test, y_pred_best, average='weighted')
print(f"Optimized model performance: Accuracy={best_accuracy:.4f}")
# Feature importance (if available)
feature_importance = None
if hasattr(best_model, 'feature_importances_'):
feature_importance = best_model.feature_importances_.tolist()
top_features = sorted(enumerate(feature_importance),
key=lambda x: x[1], reverse=True)[:10]
print(f"Top 5 features: {[f'Feature_{i}' for i, _ in top_features[:5]]}")
# Model complexity analysis
def analyze_model_complexity(model, model_type):
complexity_metrics = {}
if model_type == 'random_forest':
complexity_metrics = {
'n_estimators': model.n_estimators,
'max_depth': model.max_depth,
'total_nodes': sum(tree.tree_.node_count for tree in model.estimators_),
'avg_depth': np.mean([tree.tree_.max_depth for tree in model.estimators_])
}
elif model_type == 'gradient_boosting':
complexity_metrics = {
'n_estimators': model.n_estimators,
'max_depth': model.max_depth,
'learning_rate': model.learning_rate,
'total_nodes': sum(tree[0].tree_.node_count for tree in model.estimators_)
}
elif model_type == 'svm':
complexity_metrics = {
'n_support_vectors': model.n_support_.sum(),
'kernel': model.kernel,
'C': model.C,
'gamma': model.gamma
}
elif model_type == 'mlp':
complexity_metrics = {
'hidden_layers': len(model.hidden_layer_sizes),
'total_parameters': sum(layer.size for layer in model.coefs_) +
sum(layer.size for layer in model.intercepts_),
'n_iterations': model.n_iter_,
'loss': model.loss_
}
return complexity_metrics
complexity_metrics = analyze_model_complexity(best_model, model_type)
# Compile results
training_results = {
'model_info': {
'model_type': model_type,
'dataset_size': dataset_size,
'n_features': X.shape[1],
'n_classes': len(np.unique(y)),
'training_samples': X_train.shape[0],
'test_samples': X_test.shape[0]
},
'training_metrics': {
'training_time_seconds': training_time,
'hyperparameter_optimization': True,
'cross_validation_folds': 5
},
'base_model_performance': {
'accuracy': base_accuracy,
'precision': base_precision,
'recall': base_recall,
'f1_score': base_f1
},
'optimized_model_performance': {
'accuracy': best_accuracy,
'precision': best_precision,
'recall': best_recall,
'f1_score': best_f1,
'improvement_over_base': best_accuracy - base_accuracy
},
'cross_validation': {
'cv_scores': cv_scores.tolist(),
'cv_mean': np.mean(cv_scores),
'cv_std': np.std(cv_scores)
},
'best_hyperparameters': grid_search.best_params_,
'model_complexity': complexity_metrics,
'feature_importance': feature_importance,
'kubernetes_info': {
'pod_name': os.environ.get('HOSTNAME', 'unknown'),
'namespace': os.environ.get('KUBERNETES_NAMESPACE', 'default'),
'completion_time': datetime.now().isoformat()
}
}
return training_results
# Run ML training in Kubernetes
ml_results = distributed_ml_training(
model_type="random_forest",
n_estimators=150,
dataset_size=30000
)
print(f"\nMACHINE LEARNING TRAINING COMPLETE")
model_info = ml_results['model_info']
print(f"Model: {model_info['model_type']}")
print(f"Dataset: {model_info['dataset_size']:,} samples, {model_info['n_features']} features")
base_perf = ml_results['base_model_performance']
opt_perf = ml_results['optimized_model_performance']
print(f"\nPerformance Comparison:")
print(f" Base model accuracy: {base_perf['accuracy']:.4f}")
print(f" Optimized accuracy: {opt_perf['accuracy']:.4f}")
print(f" Improvement: +{opt_perf['improvement_over_base']:.4f}")
cv = ml_results['cross_validation']
print(f"\nCross-validation: {cv['cv_mean']:.4f} Β± {cv['cv_std']:.4f}")
k8s_info = ml_results['kubernetes_info']
print(f"\nKubernetes Info:")
print(f" Pod: {k8s_info['pod_name']}")
print(f" Namespace: {k8s_info['namespace']}")
Example 2: Distributed Data ProcessingΒΆ
Process large datasets using Kubernetes job parallelization:
[ ]:
@cluster(
cores=6,
memory="12Gi",
cpu_limit=8,
memory_limit="16Gi",
parallel=True, # Enable automatic parallelization
job_name="data-processing",
parallelism=3, # Run up to 3 pods simultaneously
completions=10 # Total number of completions needed
)
def distributed_data_analysis(data_chunks=100, chunk_size=10000):
"""
Distributed data analysis across multiple Kubernetes pods.
"""
import numpy as np
import os
import json
from datetime import datetime, timedelta
import random
import math
# Install required packages
os.system("pip install pandas scipy numpy")
import pandas as pd
from scipy import stats
print(f"Starting distributed data analysis: {data_chunks} chunks of {chunk_size:,} records each")
print(f"Total data points: {data_chunks * chunk_size:,}")
def generate_synthetic_timeseries_data(chunk_id, chunk_size):
"""Generate synthetic time-series data for analysis"""
np.random.seed(chunk_id * 123) # Reproducible but different per chunk
# Generate timestamps (1 year of hourly data)
start_date = datetime(2023, 1, 1) + timedelta(days=chunk_id * 10)
timestamps = [start_date + timedelta(hours=i) for i in range(chunk_size)]
# Generate multiple correlated time series
base_trend = np.linspace(100, 200, chunk_size) # Long-term trend
seasonal = 20 * np.sin(2 * np.pi * np.arange(chunk_size) / (24 * 7)) # Weekly seasonality
daily = 10 * np.sin(2 * np.pi * np.arange(chunk_size) / 24) # Daily pattern
# Add different noise patterns
noise = np.random.normal(0, 5, chunk_size)
# Primary metric (e.g., web traffic, sales, etc.)
primary_metric = base_trend + seasonal + daily + noise
primary_metric = np.maximum(0, primary_metric) # Ensure non-negative
# Secondary metrics correlated with primary
secondary_metric = primary_metric * 0.7 + np.random.normal(0, 3, chunk_size)
tertiary_metric = primary_metric * 1.2 + np.random.normal(10, 8, chunk_size)
# Categorical data
categories = ['A', 'B', 'C', 'D', 'E']
category_weights = [0.3, 0.25, 0.2, 0.15, 0.1]
categories_data = np.random.choice(categories, chunk_size, p=category_weights)
# Geographic regions
regions = ['North', 'South', 'East', 'West', 'Central']
region_weights = [0.2, 0.2, 0.25, 0.2, 0.15]
regions_data = np.random.choice(regions, chunk_size, p=region_weights)
# Create DataFrame
data = pd.DataFrame({
'timestamp': timestamps,
'primary_metric': primary_metric,
'secondary_metric': secondary_metric,
'tertiary_metric': tertiary_metric,
'category': categories_data,
'region': regions_data,
'chunk_id': chunk_id
})
return data
def analyze_chunk_statistics(chunk_data):
"""Comprehensive statistical analysis of a data chunk"""
numeric_cols = ['primary_metric', 'secondary_metric', 'tertiary_metric']
statistics = {}
# Basic descriptive statistics
for col in numeric_cols:
series = chunk_data[col]
statistics[col] = {
'count': len(series),
'mean': float(np.mean(series)),
'median': float(np.median(series)),
'std': float(np.std(series)),
'min': float(np.min(series)),
'max': float(np.max(series)),
'q25': float(np.percentile(series, 25)),
'q75': float(np.percentile(series, 75)),
'skewness': float(stats.skew(series)),
'kurtosis': float(stats.kurtosis(series))
}
# Correlation analysis
correlation_matrix = chunk_data[numeric_cols].corr()
statistics['correlations'] = {
'primary_secondary': float(correlation_matrix.loc['primary_metric', 'secondary_metric']),
'primary_tertiary': float(correlation_matrix.loc['primary_metric', 'tertiary_metric']),
'secondary_tertiary': float(correlation_matrix.loc['secondary_metric', 'tertiary_metric'])
}
# Categorical analysis
category_stats = chunk_data['category'].value_counts()
region_stats = chunk_data['region'].value_counts()
statistics['categorical'] = {
'category_distribution': category_stats.to_dict(),
'region_distribution': region_stats.to_dict(),
'category_entropy': float(-sum(p * np.log2(p) for p in category_stats / len(chunk_data) if p > 0)),
'region_entropy': float(-sum(p * np.log2(p) for p in region_stats / len(chunk_data) if p > 0))
}
# Time-based analysis
chunk_data['hour'] = chunk_data['timestamp'].dt.hour
chunk_data['day_of_week'] = chunk_data['timestamp'].dt.dayofweek
hourly_pattern = chunk_data.groupby('hour')['primary_metric'].mean()
daily_pattern = chunk_data.groupby('day_of_week')['primary_metric'].mean()
statistics['temporal'] = {
'hourly_peak': int(hourly_pattern.idxmax()),
'hourly_trough': int(hourly_pattern.idxmin()),
'hourly_variation': float(hourly_pattern.std()),
'daily_peak': int(daily_pattern.idxmax()), # 0=Monday, 6=Sunday
'daily_variation': float(daily_pattern.std())
}
# Anomaly detection (simple threshold-based)
for col in numeric_cols:
series = chunk_data[col]
q1, q3 = np.percentile(series, [25, 75])
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
outliers = series[(series < lower_bound) | (series > upper_bound)]
statistics[col]['outliers'] = {
'count': len(outliers),
'percentage': float(len(outliers) / len(series) * 100),
'lower_bound': float(lower_bound),
'upper_bound': float(upper_bound)
}
return statistics
def detect_patterns_and_trends(chunk_data):
"""Advanced pattern detection and trend analysis"""
patterns = {}
# Trend analysis using linear regression
time_index = np.arange(len(chunk_data))
for col in ['primary_metric', 'secondary_metric', 'tertiary_metric']:
slope, intercept, r_value, p_value, std_err = stats.linregress(time_index, chunk_data[col])
patterns[f'{col}_trend'] = {
'slope': float(slope),
'r_squared': float(r_value ** 2),
'p_value': float(p_value),
'trend_direction': 'increasing' if slope > 0 else 'decreasing',
'trend_strength': 'strong' if abs(r_value) > 0.7 else 'moderate' if abs(r_value) > 0.3 else 'weak'
}
# Seasonality detection (simplified)
primary_hourly = chunk_data.groupby(chunk_data['timestamp'].dt.hour)['primary_metric'].mean()
hourly_variation = primary_hourly.std() / primary_hourly.mean()
patterns['seasonality'] = {
'hourly_coefficient_of_variation': float(hourly_variation),
'has_daily_pattern': hourly_variation > 0.15, # Threshold for daily seasonality
'peak_hours': [int(hour) for hour in primary_hourly.nlargest(3).index.tolist()],
'trough_hours': [int(hour) for hour in primary_hourly.nsmallest(3).index.tolist()]
}
# Change point detection (simplified)
def detect_change_points(series, window=100):
if len(series) < 2 * window:
return []
change_points = []
for i in range(window, len(series) - window):
before = series[i-window:i]
after = series[i:i+window]
# Statistical test for difference in means
t_stat, p_val = stats.ttest_ind(before, after)
if p_val < 0.01: # Significant change
change_points.append(i)
return change_points
change_points = detect_change_points(chunk_data['primary_metric'].values)
patterns['change_points'] = {
'detected_points': len(change_points),
'positions': change_points[:5] if change_points else [], # First 5
'has_significant_changes': len(change_points) > 0
}
return patterns
# Process chunks (this loop will be automatically parallelized)
chunk_results = []
for chunk_id in range(data_chunks):
if chunk_id % 10 == 0:
print(f"Processing chunk {chunk_id + 1}/{data_chunks}...")
# Generate data for this chunk
chunk_data = generate_synthetic_timeseries_data(chunk_id, chunk_size)
# Analyze the chunk
chunk_stats = analyze_chunk_statistics(chunk_data)
chunk_patterns = detect_patterns_and_trends(chunk_data)
chunk_result = {
'chunk_id': chunk_id,
'chunk_size': len(chunk_data),
'statistics': chunk_stats,
'patterns': chunk_patterns,
'processing_timestamp': datetime.now().isoformat()
}
chunk_results.append(chunk_result)
# Aggregate results across all chunks
def aggregate_chunk_results(chunk_results):
"""Aggregate statistics across all processed chunks"""
total_records = sum(chunk['chunk_size'] for chunk in chunk_results)
# Aggregate basic statistics
metrics = ['primary_metric', 'secondary_metric', 'tertiary_metric']
aggregated_stats = {}
for metric in metrics:
means = [chunk['statistics'][metric]['mean'] for chunk in chunk_results]
stds = [chunk['statistics'][metric]['std'] for chunk in chunk_results]
aggregated_stats[metric] = {
'global_mean': float(np.mean(means)),
'mean_std': float(np.std(means)),
'avg_within_chunk_std': float(np.mean(stds)),
'total_variation': float(np.std(means) + np.mean(stds))
}
# Aggregate patterns
trend_directions = {}
for metric in metrics:
directions = [chunk['patterns'][f'{metric}_trend']['trend_direction']
for chunk in chunk_results]
trend_directions[metric] = {
'increasing_chunks': directions.count('increasing'),
'decreasing_chunks': directions.count('decreasing'),
'dominant_trend': 'increasing' if directions.count('increasing') > directions.count('decreasing') else 'decreasing'
}
# Aggregate seasonality
seasonal_chunks = sum(1 for chunk in chunk_results
if chunk['patterns']['seasonality']['has_daily_pattern'])
# Aggregate change points
total_change_points = sum(chunk['patterns']['change_points']['detected_points']
for chunk in chunk_results)
aggregated_results = {
'processing_summary': {
'total_chunks': len(chunk_results),
'total_records': total_records,
'avg_records_per_chunk': total_records / len(chunk_results),
'processing_completed': datetime.now().isoformat()
},
'aggregated_statistics': aggregated_stats,
'global_patterns': {
'trend_analysis': trend_directions,
'seasonality': {
'chunks_with_daily_patterns': seasonal_chunks,
'percentage_seasonal': float(seasonal_chunks / len(chunk_results) * 100)
},
'change_points': {
'total_detected': total_change_points,
'avg_per_chunk': float(total_change_points / len(chunk_results))
}
},
'data_quality': {
'chunks_processed': len(chunk_results),
'processing_success_rate': 100.0, # All chunks processed successfully
'data_consistency_score': float(np.mean([chunk['statistics']['primary_metric']['std']
for chunk in chunk_results]) /
np.std([chunk['statistics']['primary_metric']['mean']
for chunk in chunk_results])) if len(chunk_results) > 1 else 1.0
},
'kubernetes_execution': {
'pod_hostname': os.environ.get('HOSTNAME', 'unknown'),
'parallel_execution': True,
'chunk_distribution': 'automatic_parallelization'
}
}
return aggregated_results
final_results = aggregate_chunk_results(chunk_results)
final_results['individual_chunks'] = chunk_results[:5] # Include first 5 for inspection
return final_results
# Run distributed data analysis
data_results = distributed_data_analysis(data_chunks=50, chunk_size=5000)
print(f"\nDISTRIBUTED DATA ANALYSIS COMPLETE")
summary = data_results['processing_summary']
print(f"Chunks processed: {summary['total_chunks']}")
print(f"Total records: {summary['total_records']:,}")
print(f"Avg records per chunk: {summary['avg_records_per_chunk']:,.0f}")
patterns = data_results['global_patterns']
print(f"\nGlobal Patterns:")
print(f" Chunks with daily seasonality: {patterns['seasonality']['chunks_with_daily_patterns']} ({patterns['seasonality']['percentage_seasonal']:.1f}%)")
print(f" Total change points detected: {patterns['change_points']['total_detected']}")
print(f" Average change points per chunk: {patterns['change_points']['avg_per_chunk']:.2f}")
quality = data_results['data_quality']
print(f"\nData Quality:")
print(f" Processing success rate: {quality['processing_success_rate']:.1f}%")
print(f" Data consistency score: {quality['data_consistency_score']:.3f}")
k8s_exec = data_results['kubernetes_execution']
print(f"\nKubernetes Execution:")
print(f" Pod hostname: {k8s_exec['pod_hostname']}")
print(f" Parallel execution: {k8s_exec['parallel_execution']}")
Example 3: Fault-Tolerant Scientific ComputingΒΆ
Demonstrate Kubernetesβ fault tolerance and job retry capabilities:
[ ]:
@cluster(
cores=4,
memory="8Gi",
cpu_limit=6,
memory_limit="12Gi",
backoff_limit=5, # Retry up to 5 times on failure
restart_policy="OnFailure",
job_name="fault-tolerant-computation"
)
def fault_tolerant_monte_carlo(n_simulations=1000000, failure_probability=0.1, checkpoint_interval=100000):
"""
Fault-tolerant Monte Carlo simulation with checkpointing.
"""
import numpy as np
import os
import json
import pickle
import random
import time
from datetime import datetime
print(f"Starting fault-tolerant Monte Carlo: {n_simulations:,} simulations")
print(f"Failure probability: {failure_probability}, Checkpoint interval: {checkpoint_interval:,}")
# Simulate random failures for demonstration
def simulate_random_failure():
if random.random() < failure_probability:
failure_types = [
"Simulated network timeout",
"Simulated memory pressure",
"Simulated compute node failure",
"Simulated resource exhaustion"
]
failure_type = random.choice(failure_types)
print(f"WARNING: {failure_type} - continuing with fault tolerance...")
time.sleep(2) # Simulate recovery time
return True
return False
# Checkpoint management
checkpoint_file = "/tmp/monte_carlo_checkpoint.pkl"
def save_checkpoint(iteration, results, random_state):
"""Save current progress to checkpoint"""
checkpoint_data = {
'iteration': iteration,
'results': results,
'random_state': random_state,
'timestamp': datetime.now().isoformat()
}
try:
with open(checkpoint_file, 'wb') as f:
pickle.dump(checkpoint_data, f)
print(f"Checkpoint saved at iteration {iteration:,}")
except Exception as e:
print(f"Failed to save checkpoint: {e}")
def load_checkpoint():
"""Load progress from checkpoint if available"""
if os.path.exists(checkpoint_file):
try:
with open(checkpoint_file, 'rb') as f:
checkpoint_data = pickle.load(f)
print(f"Checkpoint loaded from iteration {checkpoint_data['iteration']:,}")
return checkpoint_data
except Exception as e:
print(f"Failed to load checkpoint: {e}")
return None
# Monte Carlo simulation functions
def estimate_pi_sample():
"""Single sample for pi estimation"""
x, y = np.random.random(2)
return 1 if x*x + y*y <= 1 else 0
def option_pricing_sample(S0=100, K=105, T=1, r=0.05, sigma=0.2):
"""Single Monte Carlo sample for option pricing"""
# Geometric Brownian Motion
dt = T
z = np.random.standard_normal()
ST = S0 * np.exp((r - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * z)
payoff = max(ST - K, 0) # Call option payoff
return payoff * np.exp(-r * T) # Discounted payoff
def portfolio_var_sample(returns_mean=0.08, returns_std=0.2, portfolio_value=1000000):
"""Single sample for portfolio Value at Risk calculation"""
daily_return = np.random.normal(returns_mean/252, returns_std/np.sqrt(252))
portfolio_change = portfolio_value * daily_return
return portfolio_change
def percolation_sample(grid_size=50, p=0.593):
"""Single sample for percolation theory"""
# Simplified 2D percolation
grid = np.random.random((grid_size, grid_size)) < p
# Check if there's a path from top to bottom (simplified)
# This is a very simplified percolation check
top_row = grid[0, :]
bottom_row = grid[-1, :]
return 1 if np.any(top_row) and np.any(bottom_row) else 0
# Load checkpoint if available
checkpoint = load_checkpoint()
if checkpoint:
start_iteration = checkpoint['iteration']
pi_samples = checkpoint['results']['pi_samples']
option_prices = checkpoint['results']['option_prices']
portfolio_changes = checkpoint['results']['portfolio_changes']
percolation_samples = checkpoint['results']['percolation_samples']
# Restore random state
np.random.set_state(checkpoint['random_state'])
print(f"Resuming from iteration {start_iteration:,}")
else:
start_iteration = 0
pi_samples = []
option_prices = []
portfolio_changes = []
percolation_samples = []
# Main simulation loop with fault tolerance
failure_count = 0
successful_simulations = start_iteration
for i in range(start_iteration, n_simulations):
if i % (n_simulations // 20) == 0:
print(f"Progress: {i:,}/{n_simulations:,} ({100*i/n_simulations:.1f}%)")
# Simulate potential failures
if simulate_random_failure():
failure_count += 1
continue # Skip this iteration but continue
# Perform Monte Carlo samples
try:
pi_sample = estimate_pi_sample()
option_price = option_pricing_sample()
portfolio_change = portfolio_var_sample()
percolation = percolation_sample()
pi_samples.append(pi_sample)
option_prices.append(option_price)
portfolio_changes.append(portfolio_change)
percolation_samples.append(percolation)
successful_simulations += 1
except Exception as e:
print(f"Simulation error at iteration {i}: {e}")
failure_count += 1
continue
# Checkpoint periodically
if (i + 1) % checkpoint_interval == 0:
results = {
'pi_samples': pi_samples,
'option_prices': option_prices,
'portfolio_changes': portfolio_changes,
'percolation_samples': percolation_samples
}
save_checkpoint(i + 1, results, np.random.get_state())
# Final calculations
print(f"Simulation completed. Successful: {successful_simulations:,}, Failures: {failure_count}")
# Pi estimation
pi_estimate = 4 * np.mean(pi_samples) if pi_samples else 0
pi_error = abs(pi_estimate - np.pi) if pi_samples else 0
pi_confidence_interval = 1.96 * np.sqrt(np.var(pi_samples) / len(pi_samples)) if len(pi_samples) > 1 else 0
# Option pricing
option_price_mean = np.mean(option_prices) if option_prices else 0
option_price_std = np.std(option_prices) if len(option_prices) > 1 else 0
option_confidence_interval = 1.96 * option_price_std / np.sqrt(len(option_prices)) if len(option_prices) > 1 else 0
# Portfolio VaR (95% confidence)
if portfolio_changes:
portfolio_changes_sorted = sorted(portfolio_changes)
var_95 = portfolio_changes_sorted[int(0.05 * len(portfolio_changes))]
expected_shortfall = np.mean(portfolio_changes_sorted[:int(0.05 * len(portfolio_changes))])
else:
var_95 = 0
expected_shortfall = 0
# Percolation probability
percolation_probability = np.mean(percolation_samples) if percolation_samples else 0
# Cleanup checkpoint file
try:
os.remove(checkpoint_file)
print("Checkpoint file cleaned up")
except:
pass
fault_tolerant_results = {
'simulation_parameters': {
'total_simulations_requested': n_simulations,
'successful_simulations': successful_simulations,
'simulated_failures': failure_count,
'success_rate': successful_simulations / n_simulations if n_simulations > 0 else 0,
'checkpoint_interval': checkpoint_interval
},
'pi_estimation': {
'estimate': pi_estimate,
'true_value': float(np.pi),
'absolute_error': pi_error,
'relative_error_percent': (pi_error / np.pi) * 100,
'confidence_interval_95': pi_confidence_interval * 4, # Scale for pi
'samples_used': len(pi_samples)
},
'option_pricing': {
'estimated_price': option_price_mean,
'price_std_dev': option_price_std,
'confidence_interval_95': option_confidence_interval,
'samples_used': len(option_prices)
},
'portfolio_risk': {
'value_at_risk_95': var_95,
'expected_shortfall': expected_shortfall,
'daily_volatility': np.std(portfolio_changes) if len(portfolio_changes) > 1 else 0,
'samples_used': len(portfolio_changes)
},
'percolation_analysis': {
'percolation_probability': percolation_probability,
'theoretical_threshold': 0.593, # 2D percolation threshold
'samples_used': len(percolation_samples)
},
'fault_tolerance': {
'checkpoint_saves': successful_simulations // checkpoint_interval,
'recovery_successful': checkpoint is not None,
'resilience_score': (successful_simulations / (successful_simulations + failure_count)) if (successful_simulations + failure_count) > 0 else 0
},
'kubernetes_info': {
'pod_name': os.environ.get('HOSTNAME', 'unknown'),
'restart_count': int(os.environ.get('RESTART_COUNT', '0')),
'completion_time': datetime.now().isoformat()
}
}
return fault_tolerant_results
# Run fault-tolerant Monte Carlo simulation
mc_results = fault_tolerant_monte_carlo(
n_simulations=500000,
failure_probability=0.05, # 5% chance of simulated failure
checkpoint_interval=50000
)
print(f"\nFAULT-TOLERANT MONTE CARLO COMPLETE")
sim_params = mc_results['simulation_parameters']
print(f"Requested simulations: {sim_params['total_simulations_requested']:,}")
print(f"Successful simulations: {sim_params['successful_simulations']:,}")
print(f"Simulated failures: {sim_params['simulated_failures']}")
print(f"Success rate: {sim_params['success_rate']*100:.1f}%")
pi_est = mc_results['pi_estimation']
print(f"\nPi Estimation:")
print(f" Estimate: {pi_est['estimate']:.6f}")
print(f" True value: {pi_est['true_value']:.6f}")
print(f" Error: {pi_est['relative_error_percent']:.4f}%")
option = mc_results['option_pricing']
print(f"\nOption Pricing:")
print(f" Estimated price: ${option['estimated_price']:.2f}")
print(f" Standard deviation: ${option['price_std_dev']:.2f}")
risk = mc_results['portfolio_risk']
print(f"\nPortfolio Risk:")
print(f" VaR (95%): ${risk['value_at_risk_95']:,.0f}")
print(f" Expected shortfall: ${risk['expected_shortfall']:,.0f}")
fault_tol = mc_results['fault_tolerance']
print(f"\nFault Tolerance:")
print(f" Checkpoints saved: {fault_tol['checkpoint_saves']}")
print(f" Resilience score: {fault_tol['resilience_score']:.3f}")
k8s_info = mc_results['kubernetes_info']
print(f"\nKubernetes Info:")
print(f" Pod: {k8s_info['pod_name']}")
print(f" Restart count: {k8s_info['restart_count']}")
Kubernetes Resource Management and Best PracticesΒΆ
[ ]:
# Example Kubernetes resource configurations
def get_kubernetes_resource_examples():
"""
Examples of resource configurations for different workload types.
"""
examples = {
'cpu_intensive': {
'cores': 8,
'memory': '16Gi',
'cpu_limit': 8,
'memory_limit': '20Gi'
},
'memory_intensive': {
'cores': 4,
'memory': '32Gi',
'cpu_limit': 6,
'memory_limit': '40Gi'
},
'ml_training': {
'cores': 6,
'memory': '24Gi',
'cpu_limit': 8,
'memory_limit': '32Gi'
}
}
return examples
# Example usage:
# resources = get_kubernetes_resource_examples()
# print(f"Available resource patterns: {list(resources.keys())}")
Clustrix Kubernetes Configuration ExamplesΒΆ
Basic ComputationΒΆ
Use case: Simple mathematical computations
@cluster(
cores=2,
memory="4Gi",
cpu_limit=3,
memory_limit="6Gi",
container_image="python:3.11-slim"
)
ML TrainingΒΆ
Use case: Machine learning model training with fault tolerance
@cluster(
cores=8,
memory="32Gi",
cpu_limit=12,
memory_limit="40Gi",
container_image="python:3.11",
job_name="ml-training",
backoff_limit=3
)
Parallel ProcessingΒΆ
Use case: Embarrassingly parallel data processing
@cluster(
cores=4,
memory="16Gi",
parallel=True,
parallelism=5,
completions=20,
job_name="parallel-processing"
)
Fault TolerantΒΆ
Use case: Long-running computations with automatic retry
@cluster(
cores=6,
memory="24Gi",
backoff_limit=5,
restart_policy="OnFailure",
job_ttl_seconds=7200,
active_deadline_seconds=3600
)
Kubernetes Job PatternsΒΆ
Single JobΒΆ
Description: Single pod, run-to-completion
Best for: One-off computations, small datasets
Parameters:
completions: 1
parallelism: 1
backoff_limit: 3
Parallel JobΒΆ
Description: Multiple pods running simultaneously
Best for: Independent parallel tasks, embarrassingly parallel problems
Parameters:
completions: 10
parallelism: 5
backoff_limit: 2
Queue JobΒΆ
Description: Work queue pattern with multiple workers
Best for: Dynamic workloads, task queues, streaming data
Parameters:
completions: None (no fixed completion count)
parallelism: 3
backoff_limit: 5
Indexed JobΒΆ
Description: Jobs with completion index for task assignment
Best for: Parameter sweeps, data partitioning, batch processing
Parameters:
completion_mode: Indexed
completions: 20
parallelism: 4
Kubernetes Resource Management GuidelinesΒΆ
CPU Intensive WorkloadsΒΆ
Description: Mathematical computations, simulations, optimization
Resource ratio: cores β cpu_limit, memory moderate
Example configuration:
cores: 8
memory: 16Gi
cpu_limit: 8
memory_limit: 20Gi
Use cases: Monte Carlo simulations, Genetic algorithms, Scientific computing
Memory Intensive WorkloadsΒΆ
Description: Large dataset processing, in-memory analytics
Resource ratio: memory >> cores, higher memory limits
Example configuration:
cores: 4
memory: 32Gi
cpu_limit: 6
memory_limit: 40Gi
Use cases: Big data processing, Large ML models, Genomics analysis
I/O Intensive WorkloadsΒΆ
Description: File processing, database operations, network I/O
Resource ratio: moderate cores and memory, focus on concurrency
Example configuration:
cores: 2
memory: 8Gi
cpu_limit: 4
memory_limit: 12Gi
Use cases: Data ingestion, ETL pipelines, Web scraping
ML Training WorkloadsΒΆ
Description: Machine learning model training
Resource ratio: balanced cores and memory, burst capacity
Example configuration:
cores: 6
memory: 24Gi
cpu_limit: 8
memory_limit: 32Gi
Use cases: Deep learning, Model hyperparameter tuning, Feature engineering
Kubernetes Cluster MonitoringΒΆ
[ ]:
def check_kubernetes_cluster_status():
"""
Check Kubernetes cluster status and resources.
Note: This requires kubectl to be configured properly.
"""
import subprocess
import json
def run_kubectl_command(cmd):
"""Run kubectl command and return output"""
try:
result = subprocess.run(
f"kubectl {cmd}",
shell=True,
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
return result.stdout.strip()
else:
return f"Error: {result.stderr.strip()}"
except subprocess.TimeoutExpired:
return "Error: Command timed out"
except Exception as e:
return f"Error: {str(e)}"
print("Kubernetes Cluster Status Check:")
print("=" * 40)
# Check cluster info
print("\n1. Cluster Info:")
cluster_info = run_kubectl_command("cluster-info")
if "Error" not in cluster_info:
lines = cluster_info.split('\n')[:3] # First 3 lines
for line in lines:
print(f" {line}")
else:
print(f" {cluster_info}")
# Check nodes
print("\n2. Node Status:")
nodes = run_kubectl_command("get nodes -o wide")
if "Error" not in nodes:
lines = nodes.split('\n')[:6] # Header + first 5 nodes
for line in lines:
print(f" {line}")
else:
print(f" {nodes}")
# Check namespaces
print("\n3. Namespaces:")
namespaces = run_kubectl_command("get namespaces")
if "Error" not in namespaces:
lines = namespaces.split('\n')[:8] # Header + first 7 namespaces
for line in lines:
print(f" {line}")
else:
print(f" {namespaces}")
# Check current context
print("\n4. Current Context:")
context = run_kubectl_command("config current-context")
print(f" {context}")
# Check resource quotas
print("\n5. Resource Quotas (default namespace):")
quotas = run_kubectl_command("get resourcequota -n default")
if "No resources found" in quotas:
print(" No resource quotas configured")
else:
print(f" {quotas}")
# Check running jobs
print("\n6. Running Jobs (default namespace):")
jobs = run_kubectl_command("get jobs -n default")
if "No resources found" in jobs:
print(" No jobs currently running")
else:
lines = jobs.split('\n')[:6] # Header + first 5 jobs
for line in lines:
print(f" {line}")
# Check running pods
print("\n7. Running Pods (default namespace):")
pods = run_kubectl_command("get pods -n default")
if "No resources found" in pods:
print(" No pods currently running")
else:
lines = pods.split('\n')[:6] # Header + first 5 pods
for line in lines:
print(f" {line}")
# Check node resource usage
print("\n8. Node Resource Usage:")
top_nodes = run_kubectl_command("top nodes")
if "Error" not in top_nodes and "not available" not in top_nodes:
lines = top_nodes.split('\n')[:6] # Header + first 5 nodes
for line in lines:
print(f" {line}")
else:
print(" Resource metrics not available (metrics-server may not be installed)")
# Check cluster status
try:
check_kubernetes_cluster_status()
except Exception as e:
print(f"Failed to check Kubernetes cluster status: {e}")
print("Make sure kubectl is installed and configured for your cluster")
SummaryΒΆ
This tutorial covered Kubernetes usage with Clustrix:
Kubernetes Configuration - Setting up Clustrix for container-based computing
Machine Learning Training - Distributed ML workflows in pods
Data Processing - Large-scale data analysis with automatic parallelization
Fault Tolerance - Robust computing with checkpointing and retry mechanisms
Resource Management - Intelligent resource allocation and limits
Job Patterns - Different Kubernetes job execution patterns
Cluster Monitoring - Status checking and resource monitoring
Key Kubernetes Advantages:ΒΆ
Containerization: Consistent execution environments across clusters
Scalability: Automatic scaling based on workload demands
Fault Tolerance: Built-in restart and retry mechanisms
Resource Management: Fine-grained CPU and memory control
Isolation: Secure, isolated execution environments
Portability: Run on any Kubernetes cluster (cloud or on-premises)
Best Practices:ΒΆ
Resource Limits: Always set both requests and limits for predictable scheduling
Container Images: Use specific, lightweight base images for faster startup
Job Patterns: Choose appropriate job patterns for your workload type
Fault Tolerance: Implement checkpointing for long-running computations
Monitoring: Regular cluster health and resource usage monitoring
Cleanup: Set TTL for automatic job cleanup to prevent resource buildup
Kubernetes-Specific Features:ΒΆ
``cpu_limit`` and ``memory_limit``: Resource limits for burst capacity
``backoff_limit``: Automatic retry on failures
``parallelism`` and ``completions``: Parallel job execution control
``job_ttl_seconds``: Automatic cleanup of completed jobs
``restart_policy``: Pod restart behavior on failure
``active_deadline_seconds``: Maximum job runtime limit
Next Steps:ΒΆ
Compare with SLURM Tutorial for HPC-style clusters
Explore PBS Tutorial for traditional batch systems
Try SSH Tutorial for simple remote execution
Check the Configuration Guide for advanced settings
For more information, visit the Clustrix Documentation.