Kubernetes TutorialΒΆ

Open In Colab

This tutorial demonstrates how to use Clustrix with Kubernetes clusters for containerized distributed computing.

PrerequisitesΒΆ

  • Access to a Kubernetes cluster

  • kubectl configured for your cluster

  • Clustrix installed with Kubernetes support: pip install clustrix[kubernetes]

[ ]:
# Install Clustrix with Kubernetes support (uncomment if needed)
# !pip install clustrix[kubernetes]

import clustrix
from clustrix import cluster, configure
import numpy as np

Kubernetes ConfigurationΒΆ

Configure Clustrix for your Kubernetes cluster:

[ ]:
# Configure for Kubernetes cluster
configure(
    cluster_type="kubernetes",

    # Kubernetes-specific settings
    k8s_namespace="default",              # Kubernetes namespace
    k8s_config_file="~/.kube/config",     # Path to kubeconfig

    # Default resource requirements
    default_cores=2,
    default_memory="4Gi",                 # Kubernetes format (Gi, Mi)
    default_cpu_limit=4,                  # CPU limit (can be > cores)
    default_memory_limit="8Gi",           # Memory limit

    # Container settings
    container_image="python:3.11-slim",  # Base Python image
    image_pull_policy="IfNotPresent",     # Image pull policy

    # Job settings
    job_ttl_seconds=3600,                 # Job cleanup after 1 hour
    backoff_limit=3,                      # Retry failed jobs up to 3 times

    # Cleanup
    cleanup_on_success=True,
    max_parallel_jobs=20
)

print("Kubernetes cluster configured successfully!")

Example 1: Containerized Machine LearningΒΆ

Train machine learning models in Kubernetes pods:

[ ]:
@cluster(
    cores=4,
    memory="8Gi",
    cpu_limit=6,
    memory_limit="12Gi",
    container_image="python:3.11",
    job_name="ml-training"  # Custom job name
)
def distributed_ml_training(model_type="random_forest", n_estimators=200, dataset_size=50000):
    """
    Distributed machine learning training in Kubernetes.
    """
    import numpy as np
    import os
    import json
    from datetime import datetime

    # Install required packages within the container
    os.system("pip install scikit-learn pandas numpy")

    from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
    from sklearn.svm import SVC
    from sklearn.neural_network import MLPClassifier
    from sklearn.datasets import make_classification
    from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
    from sklearn.preprocessing import StandardScaler
    import pandas as pd

    print(f"Starting ML training: {model_type}, {n_estimators} estimators, {dataset_size:,} samples")
    print(f"Pod started at: {datetime.now()}")

    # Generate synthetic dataset
    print("Generating synthetic dataset...")
    X, y = make_classification(
        n_samples=dataset_size,
        n_features=50,
        n_informative=30,
        n_redundant=10,
        n_classes=3,
        n_clusters_per_class=2,
        flip_y=0.05,  # Add some noise
        random_state=42
    )

    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # Feature scaling for SVM and MLP
    if model_type in ['svm', 'mlp']:
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)

    print(f"Dataset: {X_train.shape[0]:,} training, {X_test.shape[0]:,} test samples")

    # Model selection and configuration
    models = {
        'random_forest': {
            'model': RandomForestClassifier,
            'params': {
                'n_estimators': n_estimators,
                'max_depth': 20,
                'min_samples_split': 5,
                'min_samples_leaf': 2,
                'n_jobs': -1,
                'random_state': 42
            },
            'param_grid': {
                'max_depth': [15, 20, 25],
                'min_samples_split': [2, 5, 10]
            }
        },
        'gradient_boosting': {
            'model': GradientBoostingClassifier,
            'params': {
                'n_estimators': n_estimators,
                'learning_rate': 0.1,
                'max_depth': 6,
                'random_state': 42
            },
            'param_grid': {
                'learning_rate': [0.05, 0.1, 0.2],
                'max_depth': [4, 6, 8]
            }
        },
        'svm': {
            'model': SVC,
            'params': {
                'kernel': 'rbf',
                'C': 1.0,
                'gamma': 'scale',
                'random_state': 42
            },
            'param_grid': {
                'C': [0.1, 1.0, 10.0],
                'gamma': ['scale', 'auto']
            }
        },
        'mlp': {
            'model': MLPClassifier,
            'params': {
                'hidden_layer_sizes': (100, 50),
                'activation': 'relu',
                'solver': 'adam',
                'alpha': 0.0001,
                'max_iter': 1000,
                'random_state': 42
            },
            'param_grid': {
                'hidden_layer_sizes': [(50,), (100,), (100, 50)],
                'alpha': [0.0001, 0.001, 0.01]
            }
        }
    }

    if model_type not in models:
        model_type = 'random_forest'  # Default fallback

    model_config = models[model_type]

    # Train base model
    print(f"Training {model_type} model...")
    start_time = datetime.now()

    base_model = model_config['model'](**model_config['params'])
    base_model.fit(X_train, y_train)

    training_time = (datetime.now() - start_time).total_seconds()
    print(f"Base model training completed in {training_time:.2f} seconds")

    # Base model evaluation
    y_pred = base_model.predict(X_test)
    base_accuracy = accuracy_score(y_test, y_pred)
    base_precision = precision_score(y_test, y_pred, average='weighted')
    base_recall = recall_score(y_test, y_pred, average='weighted')
    base_f1 = f1_score(y_test, y_pred, average='weighted')

    print(f"Base model performance: Accuracy={base_accuracy:.4f}")

    # Cross-validation
    print("Performing cross-validation...")
    cv_scores = cross_val_score(base_model, X_train, y_train, cv=5, n_jobs=-1)

    # Hyperparameter optimization
    print("Optimizing hyperparameters...")
    grid_search = GridSearchCV(
        model_config['model'](),
        model_config['param_grid'],
        cv=3,
        scoring='accuracy',
        n_jobs=-1,
        verbose=0
    )

    grid_search.fit(X_train, y_train)
    best_model = grid_search.best_estimator_

    # Best model evaluation
    y_pred_best = best_model.predict(X_test)
    best_accuracy = accuracy_score(y_test, y_pred_best)
    best_precision = precision_score(y_test, y_pred_best, average='weighted')
    best_recall = recall_score(y_test, y_pred_best, average='weighted')
    best_f1 = f1_score(y_test, y_pred_best, average='weighted')

    print(f"Optimized model performance: Accuracy={best_accuracy:.4f}")

    # Feature importance (if available)
    feature_importance = None
    if hasattr(best_model, 'feature_importances_'):
        feature_importance = best_model.feature_importances_.tolist()
        top_features = sorted(enumerate(feature_importance),
                            key=lambda x: x[1], reverse=True)[:10]
        print(f"Top 5 features: {[f'Feature_{i}' for i, _ in top_features[:5]]}")

    # Model complexity analysis
    def analyze_model_complexity(model, model_type):
        complexity_metrics = {}

        if model_type == 'random_forest':
            complexity_metrics = {
                'n_estimators': model.n_estimators,
                'max_depth': model.max_depth,
                'total_nodes': sum(tree.tree_.node_count for tree in model.estimators_),
                'avg_depth': np.mean([tree.tree_.max_depth for tree in model.estimators_])
            }
        elif model_type == 'gradient_boosting':
            complexity_metrics = {
                'n_estimators': model.n_estimators,
                'max_depth': model.max_depth,
                'learning_rate': model.learning_rate,
                'total_nodes': sum(tree[0].tree_.node_count for tree in model.estimators_)
            }
        elif model_type == 'svm':
            complexity_metrics = {
                'n_support_vectors': model.n_support_.sum(),
                'kernel': model.kernel,
                'C': model.C,
                'gamma': model.gamma
            }
        elif model_type == 'mlp':
            complexity_metrics = {
                'hidden_layers': len(model.hidden_layer_sizes),
                'total_parameters': sum(layer.size for layer in model.coefs_) +
                                  sum(layer.size for layer in model.intercepts_),
                'n_iterations': model.n_iter_,
                'loss': model.loss_
            }

        return complexity_metrics

    complexity_metrics = analyze_model_complexity(best_model, model_type)

    # Compile results
    training_results = {
        'model_info': {
            'model_type': model_type,
            'dataset_size': dataset_size,
            'n_features': X.shape[1],
            'n_classes': len(np.unique(y)),
            'training_samples': X_train.shape[0],
            'test_samples': X_test.shape[0]
        },
        'training_metrics': {
            'training_time_seconds': training_time,
            'hyperparameter_optimization': True,
            'cross_validation_folds': 5
        },
        'base_model_performance': {
            'accuracy': base_accuracy,
            'precision': base_precision,
            'recall': base_recall,
            'f1_score': base_f1
        },
        'optimized_model_performance': {
            'accuracy': best_accuracy,
            'precision': best_precision,
            'recall': best_recall,
            'f1_score': best_f1,
            'improvement_over_base': best_accuracy - base_accuracy
        },
        'cross_validation': {
            'cv_scores': cv_scores.tolist(),
            'cv_mean': np.mean(cv_scores),
            'cv_std': np.std(cv_scores)
        },
        'best_hyperparameters': grid_search.best_params_,
        'model_complexity': complexity_metrics,
        'feature_importance': feature_importance,
        'kubernetes_info': {
            'pod_name': os.environ.get('HOSTNAME', 'unknown'),
            'namespace': os.environ.get('KUBERNETES_NAMESPACE', 'default'),
            'completion_time': datetime.now().isoformat()
        }
    }

    return training_results

# Run ML training in Kubernetes
ml_results = distributed_ml_training(
    model_type="random_forest",
    n_estimators=150,
    dataset_size=30000
)

print(f"\nMACHINE LEARNING TRAINING COMPLETE")
model_info = ml_results['model_info']
print(f"Model: {model_info['model_type']}")
print(f"Dataset: {model_info['dataset_size']:,} samples, {model_info['n_features']} features")

base_perf = ml_results['base_model_performance']
opt_perf = ml_results['optimized_model_performance']
print(f"\nPerformance Comparison:")
print(f"  Base model accuracy: {base_perf['accuracy']:.4f}")
print(f"  Optimized accuracy: {opt_perf['accuracy']:.4f}")
print(f"  Improvement: +{opt_perf['improvement_over_base']:.4f}")

cv = ml_results['cross_validation']
print(f"\nCross-validation: {cv['cv_mean']:.4f} Β± {cv['cv_std']:.4f}")

k8s_info = ml_results['kubernetes_info']
print(f"\nKubernetes Info:")
print(f"  Pod: {k8s_info['pod_name']}")
print(f"  Namespace: {k8s_info['namespace']}")

Example 2: Distributed Data ProcessingΒΆ

Process large datasets using Kubernetes job parallelization:

[ ]:
@cluster(
    cores=6,
    memory="12Gi",
    cpu_limit=8,
    memory_limit="16Gi",
    parallel=True,  # Enable automatic parallelization
    job_name="data-processing",
    parallelism=3,  # Run up to 3 pods simultaneously
    completions=10  # Total number of completions needed
)
def distributed_data_analysis(data_chunks=100, chunk_size=10000):
    """
    Distributed data analysis across multiple Kubernetes pods.
    """
    import numpy as np
    import os
    import json
    from datetime import datetime, timedelta
    import random
    import math

    # Install required packages
    os.system("pip install pandas scipy numpy")

    import pandas as pd
    from scipy import stats

    print(f"Starting distributed data analysis: {data_chunks} chunks of {chunk_size:,} records each")
    print(f"Total data points: {data_chunks * chunk_size:,}")

    def generate_synthetic_timeseries_data(chunk_id, chunk_size):
        """Generate synthetic time-series data for analysis"""
        np.random.seed(chunk_id * 123)  # Reproducible but different per chunk

        # Generate timestamps (1 year of hourly data)
        start_date = datetime(2023, 1, 1) + timedelta(days=chunk_id * 10)
        timestamps = [start_date + timedelta(hours=i) for i in range(chunk_size)]

        # Generate multiple correlated time series
        base_trend = np.linspace(100, 200, chunk_size)  # Long-term trend
        seasonal = 20 * np.sin(2 * np.pi * np.arange(chunk_size) / (24 * 7))  # Weekly seasonality
        daily = 10 * np.sin(2 * np.pi * np.arange(chunk_size) / 24)  # Daily pattern

        # Add different noise patterns
        noise = np.random.normal(0, 5, chunk_size)

        # Primary metric (e.g., web traffic, sales, etc.)
        primary_metric = base_trend + seasonal + daily + noise
        primary_metric = np.maximum(0, primary_metric)  # Ensure non-negative

        # Secondary metrics correlated with primary
        secondary_metric = primary_metric * 0.7 + np.random.normal(0, 3, chunk_size)
        tertiary_metric = primary_metric * 1.2 + np.random.normal(10, 8, chunk_size)

        # Categorical data
        categories = ['A', 'B', 'C', 'D', 'E']
        category_weights = [0.3, 0.25, 0.2, 0.15, 0.1]
        categories_data = np.random.choice(categories, chunk_size, p=category_weights)

        # Geographic regions
        regions = ['North', 'South', 'East', 'West', 'Central']
        region_weights = [0.2, 0.2, 0.25, 0.2, 0.15]
        regions_data = np.random.choice(regions, chunk_size, p=region_weights)

        # Create DataFrame
        data = pd.DataFrame({
            'timestamp': timestamps,
            'primary_metric': primary_metric,
            'secondary_metric': secondary_metric,
            'tertiary_metric': tertiary_metric,
            'category': categories_data,
            'region': regions_data,
            'chunk_id': chunk_id
        })

        return data

    def analyze_chunk_statistics(chunk_data):
        """Comprehensive statistical analysis of a data chunk"""
        numeric_cols = ['primary_metric', 'secondary_metric', 'tertiary_metric']

        statistics = {}

        # Basic descriptive statistics
        for col in numeric_cols:
            series = chunk_data[col]
            statistics[col] = {
                'count': len(series),
                'mean': float(np.mean(series)),
                'median': float(np.median(series)),
                'std': float(np.std(series)),
                'min': float(np.min(series)),
                'max': float(np.max(series)),
                'q25': float(np.percentile(series, 25)),
                'q75': float(np.percentile(series, 75)),
                'skewness': float(stats.skew(series)),
                'kurtosis': float(stats.kurtosis(series))
            }

        # Correlation analysis
        correlation_matrix = chunk_data[numeric_cols].corr()
        statistics['correlations'] = {
            'primary_secondary': float(correlation_matrix.loc['primary_metric', 'secondary_metric']),
            'primary_tertiary': float(correlation_matrix.loc['primary_metric', 'tertiary_metric']),
            'secondary_tertiary': float(correlation_matrix.loc['secondary_metric', 'tertiary_metric'])
        }

        # Categorical analysis
        category_stats = chunk_data['category'].value_counts()
        region_stats = chunk_data['region'].value_counts()

        statistics['categorical'] = {
            'category_distribution': category_stats.to_dict(),
            'region_distribution': region_stats.to_dict(),
            'category_entropy': float(-sum(p * np.log2(p) for p in category_stats / len(chunk_data) if p > 0)),
            'region_entropy': float(-sum(p * np.log2(p) for p in region_stats / len(chunk_data) if p > 0))
        }

        # Time-based analysis
        chunk_data['hour'] = chunk_data['timestamp'].dt.hour
        chunk_data['day_of_week'] = chunk_data['timestamp'].dt.dayofweek

        hourly_pattern = chunk_data.groupby('hour')['primary_metric'].mean()
        daily_pattern = chunk_data.groupby('day_of_week')['primary_metric'].mean()

        statistics['temporal'] = {
            'hourly_peak': int(hourly_pattern.idxmax()),
            'hourly_trough': int(hourly_pattern.idxmin()),
            'hourly_variation': float(hourly_pattern.std()),
            'daily_peak': int(daily_pattern.idxmax()),  # 0=Monday, 6=Sunday
            'daily_variation': float(daily_pattern.std())
        }

        # Anomaly detection (simple threshold-based)
        for col in numeric_cols:
            series = chunk_data[col]
            q1, q3 = np.percentile(series, [25, 75])
            iqr = q3 - q1
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr

            outliers = series[(series < lower_bound) | (series > upper_bound)]
            statistics[col]['outliers'] = {
                'count': len(outliers),
                'percentage': float(len(outliers) / len(series) * 100),
                'lower_bound': float(lower_bound),
                'upper_bound': float(upper_bound)
            }

        return statistics

    def detect_patterns_and_trends(chunk_data):
        """Advanced pattern detection and trend analysis"""
        patterns = {}

        # Trend analysis using linear regression
        time_index = np.arange(len(chunk_data))

        for col in ['primary_metric', 'secondary_metric', 'tertiary_metric']:
            slope, intercept, r_value, p_value, std_err = stats.linregress(time_index, chunk_data[col])

            patterns[f'{col}_trend'] = {
                'slope': float(slope),
                'r_squared': float(r_value ** 2),
                'p_value': float(p_value),
                'trend_direction': 'increasing' if slope > 0 else 'decreasing',
                'trend_strength': 'strong' if abs(r_value) > 0.7 else 'moderate' if abs(r_value) > 0.3 else 'weak'
            }

        # Seasonality detection (simplified)
        primary_hourly = chunk_data.groupby(chunk_data['timestamp'].dt.hour)['primary_metric'].mean()
        hourly_variation = primary_hourly.std() / primary_hourly.mean()

        patterns['seasonality'] = {
            'hourly_coefficient_of_variation': float(hourly_variation),
            'has_daily_pattern': hourly_variation > 0.15,  # Threshold for daily seasonality
            'peak_hours': [int(hour) for hour in primary_hourly.nlargest(3).index.tolist()],
            'trough_hours': [int(hour) for hour in primary_hourly.nsmallest(3).index.tolist()]
        }

        # Change point detection (simplified)
        def detect_change_points(series, window=100):
            if len(series) < 2 * window:
                return []

            change_points = []
            for i in range(window, len(series) - window):
                before = series[i-window:i]
                after = series[i:i+window]

                # Statistical test for difference in means
                t_stat, p_val = stats.ttest_ind(before, after)
                if p_val < 0.01:  # Significant change
                    change_points.append(i)

            return change_points

        change_points = detect_change_points(chunk_data['primary_metric'].values)
        patterns['change_points'] = {
            'detected_points': len(change_points),
            'positions': change_points[:5] if change_points else [],  # First 5
            'has_significant_changes': len(change_points) > 0
        }

        return patterns

    # Process chunks (this loop will be automatically parallelized)
    chunk_results = []

    for chunk_id in range(data_chunks):
        if chunk_id % 10 == 0:
            print(f"Processing chunk {chunk_id + 1}/{data_chunks}...")

        # Generate data for this chunk
        chunk_data = generate_synthetic_timeseries_data(chunk_id, chunk_size)

        # Analyze the chunk
        chunk_stats = analyze_chunk_statistics(chunk_data)
        chunk_patterns = detect_patterns_and_trends(chunk_data)

        chunk_result = {
            'chunk_id': chunk_id,
            'chunk_size': len(chunk_data),
            'statistics': chunk_stats,
            'patterns': chunk_patterns,
            'processing_timestamp': datetime.now().isoformat()
        }

        chunk_results.append(chunk_result)

    # Aggregate results across all chunks
    def aggregate_chunk_results(chunk_results):
        """Aggregate statistics across all processed chunks"""

        total_records = sum(chunk['chunk_size'] for chunk in chunk_results)

        # Aggregate basic statistics
        metrics = ['primary_metric', 'secondary_metric', 'tertiary_metric']
        aggregated_stats = {}

        for metric in metrics:
            means = [chunk['statistics'][metric]['mean'] for chunk in chunk_results]
            stds = [chunk['statistics'][metric]['std'] for chunk in chunk_results]

            aggregated_stats[metric] = {
                'global_mean': float(np.mean(means)),
                'mean_std': float(np.std(means)),
                'avg_within_chunk_std': float(np.mean(stds)),
                'total_variation': float(np.std(means) + np.mean(stds))
            }

        # Aggregate patterns
        trend_directions = {}
        for metric in metrics:
            directions = [chunk['patterns'][f'{metric}_trend']['trend_direction']
                         for chunk in chunk_results]
            trend_directions[metric] = {
                'increasing_chunks': directions.count('increasing'),
                'decreasing_chunks': directions.count('decreasing'),
                'dominant_trend': 'increasing' if directions.count('increasing') > directions.count('decreasing') else 'decreasing'
            }

        # Aggregate seasonality
        seasonal_chunks = sum(1 for chunk in chunk_results
                            if chunk['patterns']['seasonality']['has_daily_pattern'])

        # Aggregate change points
        total_change_points = sum(chunk['patterns']['change_points']['detected_points']
                                for chunk in chunk_results)

        aggregated_results = {
            'processing_summary': {
                'total_chunks': len(chunk_results),
                'total_records': total_records,
                'avg_records_per_chunk': total_records / len(chunk_results),
                'processing_completed': datetime.now().isoformat()
            },
            'aggregated_statistics': aggregated_stats,
            'global_patterns': {
                'trend_analysis': trend_directions,
                'seasonality': {
                    'chunks_with_daily_patterns': seasonal_chunks,
                    'percentage_seasonal': float(seasonal_chunks / len(chunk_results) * 100)
                },
                'change_points': {
                    'total_detected': total_change_points,
                    'avg_per_chunk': float(total_change_points / len(chunk_results))
                }
            },
            'data_quality': {
                'chunks_processed': len(chunk_results),
                'processing_success_rate': 100.0,  # All chunks processed successfully
                'data_consistency_score': float(np.mean([chunk['statistics']['primary_metric']['std']
                                                       for chunk in chunk_results]) /
                                               np.std([chunk['statistics']['primary_metric']['mean']
                                                      for chunk in chunk_results])) if len(chunk_results) > 1 else 1.0
            },
            'kubernetes_execution': {
                'pod_hostname': os.environ.get('HOSTNAME', 'unknown'),
                'parallel_execution': True,
                'chunk_distribution': 'automatic_parallelization'
            }
        }

        return aggregated_results

    final_results = aggregate_chunk_results(chunk_results)
    final_results['individual_chunks'] = chunk_results[:5]  # Include first 5 for inspection

    return final_results

# Run distributed data analysis
data_results = distributed_data_analysis(data_chunks=50, chunk_size=5000)

print(f"\nDISTRIBUTED DATA ANALYSIS COMPLETE")
summary = data_results['processing_summary']
print(f"Chunks processed: {summary['total_chunks']}")
print(f"Total records: {summary['total_records']:,}")
print(f"Avg records per chunk: {summary['avg_records_per_chunk']:,.0f}")

patterns = data_results['global_patterns']
print(f"\nGlobal Patterns:")
print(f"  Chunks with daily seasonality: {patterns['seasonality']['chunks_with_daily_patterns']} ({patterns['seasonality']['percentage_seasonal']:.1f}%)")
print(f"  Total change points detected: {patterns['change_points']['total_detected']}")
print(f"  Average change points per chunk: {patterns['change_points']['avg_per_chunk']:.2f}")

quality = data_results['data_quality']
print(f"\nData Quality:")
print(f"  Processing success rate: {quality['processing_success_rate']:.1f}%")
print(f"  Data consistency score: {quality['data_consistency_score']:.3f}")

k8s_exec = data_results['kubernetes_execution']
print(f"\nKubernetes Execution:")
print(f"  Pod hostname: {k8s_exec['pod_hostname']}")
print(f"  Parallel execution: {k8s_exec['parallel_execution']}")

Example 3: Fault-Tolerant Scientific ComputingΒΆ

Demonstrate Kubernetes’ fault tolerance and job retry capabilities:

[ ]:
@cluster(
    cores=4,
    memory="8Gi",
    cpu_limit=6,
    memory_limit="12Gi",
    backoff_limit=5,  # Retry up to 5 times on failure
    restart_policy="OnFailure",
    job_name="fault-tolerant-computation"
)
def fault_tolerant_monte_carlo(n_simulations=1000000, failure_probability=0.1, checkpoint_interval=100000):
    """
    Fault-tolerant Monte Carlo simulation with checkpointing.
    """
    import numpy as np
    import os
    import json
    import pickle
    import random
    import time
    from datetime import datetime

    print(f"Starting fault-tolerant Monte Carlo: {n_simulations:,} simulations")
    print(f"Failure probability: {failure_probability}, Checkpoint interval: {checkpoint_interval:,}")

    # Simulate random failures for demonstration
    def simulate_random_failure():
        if random.random() < failure_probability:
            failure_types = [
                "Simulated network timeout",
                "Simulated memory pressure",
                "Simulated compute node failure",
                "Simulated resource exhaustion"
            ]
            failure_type = random.choice(failure_types)
            print(f"WARNING: {failure_type} - continuing with fault tolerance...")
            time.sleep(2)  # Simulate recovery time
            return True
        return False

    # Checkpoint management
    checkpoint_file = "/tmp/monte_carlo_checkpoint.pkl"

    def save_checkpoint(iteration, results, random_state):
        """Save current progress to checkpoint"""
        checkpoint_data = {
            'iteration': iteration,
            'results': results,
            'random_state': random_state,
            'timestamp': datetime.now().isoformat()
        }

        try:
            with open(checkpoint_file, 'wb') as f:
                pickle.dump(checkpoint_data, f)
            print(f"Checkpoint saved at iteration {iteration:,}")
        except Exception as e:
            print(f"Failed to save checkpoint: {e}")

    def load_checkpoint():
        """Load progress from checkpoint if available"""
        if os.path.exists(checkpoint_file):
            try:
                with open(checkpoint_file, 'rb') as f:
                    checkpoint_data = pickle.load(f)
                print(f"Checkpoint loaded from iteration {checkpoint_data['iteration']:,}")
                return checkpoint_data
            except Exception as e:
                print(f"Failed to load checkpoint: {e}")
        return None

    # Monte Carlo simulation functions
    def estimate_pi_sample():
        """Single sample for pi estimation"""
        x, y = np.random.random(2)
        return 1 if x*x + y*y <= 1 else 0

    def option_pricing_sample(S0=100, K=105, T=1, r=0.05, sigma=0.2):
        """Single Monte Carlo sample for option pricing"""
        # Geometric Brownian Motion
        dt = T
        z = np.random.standard_normal()
        ST = S0 * np.exp((r - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * z)
        payoff = max(ST - K, 0)  # Call option payoff
        return payoff * np.exp(-r * T)  # Discounted payoff

    def portfolio_var_sample(returns_mean=0.08, returns_std=0.2, portfolio_value=1000000):
        """Single sample for portfolio Value at Risk calculation"""
        daily_return = np.random.normal(returns_mean/252, returns_std/np.sqrt(252))
        portfolio_change = portfolio_value * daily_return
        return portfolio_change

    def percolation_sample(grid_size=50, p=0.593):
        """Single sample for percolation theory"""
        # Simplified 2D percolation
        grid = np.random.random((grid_size, grid_size)) < p
        # Check if there's a path from top to bottom (simplified)
        # This is a very simplified percolation check
        top_row = grid[0, :]
        bottom_row = grid[-1, :]
        return 1 if np.any(top_row) and np.any(bottom_row) else 0

    # Load checkpoint if available
    checkpoint = load_checkpoint()
    if checkpoint:
        start_iteration = checkpoint['iteration']
        pi_samples = checkpoint['results']['pi_samples']
        option_prices = checkpoint['results']['option_prices']
        portfolio_changes = checkpoint['results']['portfolio_changes']
        percolation_samples = checkpoint['results']['percolation_samples']
        # Restore random state
        np.random.set_state(checkpoint['random_state'])
        print(f"Resuming from iteration {start_iteration:,}")
    else:
        start_iteration = 0
        pi_samples = []
        option_prices = []
        portfolio_changes = []
        percolation_samples = []

    # Main simulation loop with fault tolerance
    failure_count = 0
    successful_simulations = start_iteration

    for i in range(start_iteration, n_simulations):
        if i % (n_simulations // 20) == 0:
            print(f"Progress: {i:,}/{n_simulations:,} ({100*i/n_simulations:.1f}%)")

        # Simulate potential failures
        if simulate_random_failure():
            failure_count += 1
            continue  # Skip this iteration but continue

        # Perform Monte Carlo samples
        try:
            pi_sample = estimate_pi_sample()
            option_price = option_pricing_sample()
            portfolio_change = portfolio_var_sample()
            percolation = percolation_sample()

            pi_samples.append(pi_sample)
            option_prices.append(option_price)
            portfolio_changes.append(portfolio_change)
            percolation_samples.append(percolation)

            successful_simulations += 1

        except Exception as e:
            print(f"Simulation error at iteration {i}: {e}")
            failure_count += 1
            continue

        # Checkpoint periodically
        if (i + 1) % checkpoint_interval == 0:
            results = {
                'pi_samples': pi_samples,
                'option_prices': option_prices,
                'portfolio_changes': portfolio_changes,
                'percolation_samples': percolation_samples
            }
            save_checkpoint(i + 1, results, np.random.get_state())

    # Final calculations
    print(f"Simulation completed. Successful: {successful_simulations:,}, Failures: {failure_count}")

    # Pi estimation
    pi_estimate = 4 * np.mean(pi_samples) if pi_samples else 0
    pi_error = abs(pi_estimate - np.pi) if pi_samples else 0
    pi_confidence_interval = 1.96 * np.sqrt(np.var(pi_samples) / len(pi_samples)) if len(pi_samples) > 1 else 0

    # Option pricing
    option_price_mean = np.mean(option_prices) if option_prices else 0
    option_price_std = np.std(option_prices) if len(option_prices) > 1 else 0
    option_confidence_interval = 1.96 * option_price_std / np.sqrt(len(option_prices)) if len(option_prices) > 1 else 0

    # Portfolio VaR (95% confidence)
    if portfolio_changes:
        portfolio_changes_sorted = sorted(portfolio_changes)
        var_95 = portfolio_changes_sorted[int(0.05 * len(portfolio_changes))]
        expected_shortfall = np.mean(portfolio_changes_sorted[:int(0.05 * len(portfolio_changes))])
    else:
        var_95 = 0
        expected_shortfall = 0

    # Percolation probability
    percolation_probability = np.mean(percolation_samples) if percolation_samples else 0

    # Cleanup checkpoint file
    try:
        os.remove(checkpoint_file)
        print("Checkpoint file cleaned up")
    except:
        pass

    fault_tolerant_results = {
        'simulation_parameters': {
            'total_simulations_requested': n_simulations,
            'successful_simulations': successful_simulations,
            'simulated_failures': failure_count,
            'success_rate': successful_simulations / n_simulations if n_simulations > 0 else 0,
            'checkpoint_interval': checkpoint_interval
        },
        'pi_estimation': {
            'estimate': pi_estimate,
            'true_value': float(np.pi),
            'absolute_error': pi_error,
            'relative_error_percent': (pi_error / np.pi) * 100,
            'confidence_interval_95': pi_confidence_interval * 4,  # Scale for pi
            'samples_used': len(pi_samples)
        },
        'option_pricing': {
            'estimated_price': option_price_mean,
            'price_std_dev': option_price_std,
            'confidence_interval_95': option_confidence_interval,
            'samples_used': len(option_prices)
        },
        'portfolio_risk': {
            'value_at_risk_95': var_95,
            'expected_shortfall': expected_shortfall,
            'daily_volatility': np.std(portfolio_changes) if len(portfolio_changes) > 1 else 0,
            'samples_used': len(portfolio_changes)
        },
        'percolation_analysis': {
            'percolation_probability': percolation_probability,
            'theoretical_threshold': 0.593,  # 2D percolation threshold
            'samples_used': len(percolation_samples)
        },
        'fault_tolerance': {
            'checkpoint_saves': successful_simulations // checkpoint_interval,
            'recovery_successful': checkpoint is not None,
            'resilience_score': (successful_simulations / (successful_simulations + failure_count)) if (successful_simulations + failure_count) > 0 else 0
        },
        'kubernetes_info': {
            'pod_name': os.environ.get('HOSTNAME', 'unknown'),
            'restart_count': int(os.environ.get('RESTART_COUNT', '0')),
            'completion_time': datetime.now().isoformat()
        }
    }

    return fault_tolerant_results

# Run fault-tolerant Monte Carlo simulation
mc_results = fault_tolerant_monte_carlo(
    n_simulations=500000,
    failure_probability=0.05,  # 5% chance of simulated failure
    checkpoint_interval=50000
)

print(f"\nFAULT-TOLERANT MONTE CARLO COMPLETE")
sim_params = mc_results['simulation_parameters']
print(f"Requested simulations: {sim_params['total_simulations_requested']:,}")
print(f"Successful simulations: {sim_params['successful_simulations']:,}")
print(f"Simulated failures: {sim_params['simulated_failures']}")
print(f"Success rate: {sim_params['success_rate']*100:.1f}%")

pi_est = mc_results['pi_estimation']
print(f"\nPi Estimation:")
print(f"  Estimate: {pi_est['estimate']:.6f}")
print(f"  True value: {pi_est['true_value']:.6f}")
print(f"  Error: {pi_est['relative_error_percent']:.4f}%")

option = mc_results['option_pricing']
print(f"\nOption Pricing:")
print(f"  Estimated price: ${option['estimated_price']:.2f}")
print(f"  Standard deviation: ${option['price_std_dev']:.2f}")

risk = mc_results['portfolio_risk']
print(f"\nPortfolio Risk:")
print(f"  VaR (95%): ${risk['value_at_risk_95']:,.0f}")
print(f"  Expected shortfall: ${risk['expected_shortfall']:,.0f}")

fault_tol = mc_results['fault_tolerance']
print(f"\nFault Tolerance:")
print(f"  Checkpoints saved: {fault_tol['checkpoint_saves']}")
print(f"  Resilience score: {fault_tol['resilience_score']:.3f}")

k8s_info = mc_results['kubernetes_info']
print(f"\nKubernetes Info:")
print(f"  Pod: {k8s_info['pod_name']}")
print(f"  Restart count: {k8s_info['restart_count']}")

Kubernetes Resource Management and Best PracticesΒΆ

[ ]:
# Example Kubernetes resource configurations
def get_kubernetes_resource_examples():
    """
    Examples of resource configurations for different workload types.
    """

    examples = {
        'cpu_intensive': {
            'cores': 8,
            'memory': '16Gi',
            'cpu_limit': 8,
            'memory_limit': '20Gi'
        },
        'memory_intensive': {
            'cores': 4,
            'memory': '32Gi',
            'cpu_limit': 6,
            'memory_limit': '40Gi'
        },
        'ml_training': {
            'cores': 6,
            'memory': '24Gi',
            'cpu_limit': 8,
            'memory_limit': '32Gi'
        }
    }

    return examples

# Example usage:
# resources = get_kubernetes_resource_examples()
# print(f"Available resource patterns: {list(resources.keys())}")

Clustrix Kubernetes Configuration ExamplesΒΆ

Basic ComputationΒΆ

Use case: Simple mathematical computations

@cluster(
    cores=2,
    memory="4Gi",
    cpu_limit=3,
    memory_limit="6Gi",
    container_image="python:3.11-slim"
)

ML TrainingΒΆ

Use case: Machine learning model training with fault tolerance

@cluster(
    cores=8,
    memory="32Gi",
    cpu_limit=12,
    memory_limit="40Gi",
    container_image="python:3.11",
    job_name="ml-training",
    backoff_limit=3
)

Parallel ProcessingΒΆ

Use case: Embarrassingly parallel data processing

@cluster(
    cores=4,
    memory="16Gi",
    parallel=True,
    parallelism=5,
    completions=20,
    job_name="parallel-processing"
)

Fault TolerantΒΆ

Use case: Long-running computations with automatic retry

@cluster(
    cores=6,
    memory="24Gi",
    backoff_limit=5,
    restart_policy="OnFailure",
    job_ttl_seconds=7200,
    active_deadline_seconds=3600
)

Kubernetes Job PatternsΒΆ

Single JobΒΆ

  • Description: Single pod, run-to-completion

  • Best for: One-off computations, small datasets

  • Parameters:

    • completions: 1

    • parallelism: 1

    • backoff_limit: 3

Parallel JobΒΆ

  • Description: Multiple pods running simultaneously

  • Best for: Independent parallel tasks, embarrassingly parallel problems

  • Parameters:

    • completions: 10

    • parallelism: 5

    • backoff_limit: 2

Queue JobΒΆ

  • Description: Work queue pattern with multiple workers

  • Best for: Dynamic workloads, task queues, streaming data

  • Parameters:

    • completions: None (no fixed completion count)

    • parallelism: 3

    • backoff_limit: 5

Indexed JobΒΆ

  • Description: Jobs with completion index for task assignment

  • Best for: Parameter sweeps, data partitioning, batch processing

  • Parameters:

    • completion_mode: Indexed

    • completions: 20

    • parallelism: 4

Kubernetes Resource Management GuidelinesΒΆ

CPU Intensive WorkloadsΒΆ

  • Description: Mathematical computations, simulations, optimization

  • Resource ratio: cores β‰ˆ cpu_limit, memory moderate

  • Example configuration:

    • cores: 8

    • memory: 16Gi

    • cpu_limit: 8

    • memory_limit: 20Gi

  • Use cases: Monte Carlo simulations, Genetic algorithms, Scientific computing

Memory Intensive WorkloadsΒΆ

  • Description: Large dataset processing, in-memory analytics

  • Resource ratio: memory >> cores, higher memory limits

  • Example configuration:

    • cores: 4

    • memory: 32Gi

    • cpu_limit: 6

    • memory_limit: 40Gi

  • Use cases: Big data processing, Large ML models, Genomics analysis

I/O Intensive WorkloadsΒΆ

  • Description: File processing, database operations, network I/O

  • Resource ratio: moderate cores and memory, focus on concurrency

  • Example configuration:

    • cores: 2

    • memory: 8Gi

    • cpu_limit: 4

    • memory_limit: 12Gi

  • Use cases: Data ingestion, ETL pipelines, Web scraping

ML Training WorkloadsΒΆ

  • Description: Machine learning model training

  • Resource ratio: balanced cores and memory, burst capacity

  • Example configuration:

    • cores: 6

    • memory: 24Gi

    • cpu_limit: 8

    • memory_limit: 32Gi

  • Use cases: Deep learning, Model hyperparameter tuning, Feature engineering

Kubernetes Cluster MonitoringΒΆ

[ ]:
def check_kubernetes_cluster_status():
    """
    Check Kubernetes cluster status and resources.
    Note: This requires kubectl to be configured properly.
    """
    import subprocess
    import json

    def run_kubectl_command(cmd):
        """Run kubectl command and return output"""
        try:
            result = subprocess.run(
                f"kubectl {cmd}",
                shell=True,
                capture_output=True,
                text=True,
                timeout=30
            )
            if result.returncode == 0:
                return result.stdout.strip()
            else:
                return f"Error: {result.stderr.strip()}"
        except subprocess.TimeoutExpired:
            return "Error: Command timed out"
        except Exception as e:
            return f"Error: {str(e)}"

    print("Kubernetes Cluster Status Check:")
    print("=" * 40)

    # Check cluster info
    print("\n1. Cluster Info:")
    cluster_info = run_kubectl_command("cluster-info")
    if "Error" not in cluster_info:
        lines = cluster_info.split('\n')[:3]  # First 3 lines
        for line in lines:
            print(f"   {line}")
    else:
        print(f"   {cluster_info}")

    # Check nodes
    print("\n2. Node Status:")
    nodes = run_kubectl_command("get nodes -o wide")
    if "Error" not in nodes:
        lines = nodes.split('\n')[:6]  # Header + first 5 nodes
        for line in lines:
            print(f"   {line}")
    else:
        print(f"   {nodes}")

    # Check namespaces
    print("\n3. Namespaces:")
    namespaces = run_kubectl_command("get namespaces")
    if "Error" not in namespaces:
        lines = namespaces.split('\n')[:8]  # Header + first 7 namespaces
        for line in lines:
            print(f"   {line}")
    else:
        print(f"   {namespaces}")

    # Check current context
    print("\n4. Current Context:")
    context = run_kubectl_command("config current-context")
    print(f"   {context}")

    # Check resource quotas
    print("\n5. Resource Quotas (default namespace):")
    quotas = run_kubectl_command("get resourcequota -n default")
    if "No resources found" in quotas:
        print("   No resource quotas configured")
    else:
        print(f"   {quotas}")

    # Check running jobs
    print("\n6. Running Jobs (default namespace):")
    jobs = run_kubectl_command("get jobs -n default")
    if "No resources found" in jobs:
        print("   No jobs currently running")
    else:
        lines = jobs.split('\n')[:6]  # Header + first 5 jobs
        for line in lines:
            print(f"   {line}")

    # Check running pods
    print("\n7. Running Pods (default namespace):")
    pods = run_kubectl_command("get pods -n default")
    if "No resources found" in pods:
        print("   No pods currently running")
    else:
        lines = pods.split('\n')[:6]  # Header + first 5 pods
        for line in lines:
            print(f"   {line}")

    # Check node resource usage
    print("\n8. Node Resource Usage:")
    top_nodes = run_kubectl_command("top nodes")
    if "Error" not in top_nodes and "not available" not in top_nodes:
        lines = top_nodes.split('\n')[:6]  # Header + first 5 nodes
        for line in lines:
            print(f"   {line}")
    else:
        print("   Resource metrics not available (metrics-server may not be installed)")

# Check cluster status
try:
    check_kubernetes_cluster_status()
except Exception as e:
    print(f"Failed to check Kubernetes cluster status: {e}")
    print("Make sure kubectl is installed and configured for your cluster")

SummaryΒΆ

This tutorial covered Kubernetes usage with Clustrix:

  1. Kubernetes Configuration - Setting up Clustrix for container-based computing

  2. Machine Learning Training - Distributed ML workflows in pods

  3. Data Processing - Large-scale data analysis with automatic parallelization

  4. Fault Tolerance - Robust computing with checkpointing and retry mechanisms

  5. Resource Management - Intelligent resource allocation and limits

  6. Job Patterns - Different Kubernetes job execution patterns

  7. Cluster Monitoring - Status checking and resource monitoring

Key Kubernetes Advantages:ΒΆ

  • Containerization: Consistent execution environments across clusters

  • Scalability: Automatic scaling based on workload demands

  • Fault Tolerance: Built-in restart and retry mechanisms

  • Resource Management: Fine-grained CPU and memory control

  • Isolation: Secure, isolated execution environments

  • Portability: Run on any Kubernetes cluster (cloud or on-premises)

Best Practices:ΒΆ

  • Resource Limits: Always set both requests and limits for predictable scheduling

  • Container Images: Use specific, lightweight base images for faster startup

  • Job Patterns: Choose appropriate job patterns for your workload type

  • Fault Tolerance: Implement checkpointing for long-running computations

  • Monitoring: Regular cluster health and resource usage monitoring

  • Cleanup: Set TTL for automatic job cleanup to prevent resource buildup

Kubernetes-Specific Features:ΒΆ

  • ``cpu_limit`` and ``memory_limit``: Resource limits for burst capacity

  • ``backoff_limit``: Automatic retry on failures

  • ``parallelism`` and ``completions``: Parallel job execution control

  • ``job_ttl_seconds``: Automatic cleanup of completed jobs

  • ``restart_policy``: Pod restart behavior on failure

  • ``active_deadline_seconds``: Maximum job runtime limit

Next Steps:ΒΆ

For more information, visit the Clustrix Documentation.