{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Kubernetes Tutorial\n", "\n", "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/ContextLab/clustrix/blob/master/docs/notebooks/kubernetes_tutorial.ipynb)\n", "\n", "This tutorial demonstrates how to use Clustrix with Kubernetes clusters for containerized distributed computing.\n", "\n", "## Prerequisites\n", "\n", "- Access to a Kubernetes cluster\n", "- kubectl configured for your cluster\n", "- Clustrix installed with Kubernetes support: `pip install clustrix[kubernetes]`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Install Clustrix with Kubernetes support (uncomment if needed)\n", "# !pip install clustrix[kubernetes]\n", "\n", "import clustrix\n", "from clustrix import cluster, configure\n", "import numpy as np" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Kubernetes Configuration\n", "\n", "Configure Clustrix for your Kubernetes cluster:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Configure for Kubernetes cluster\n", "configure(\n", " cluster_type=\"kubernetes\",\n", " \n", " # Kubernetes-specific settings\n", " k8s_namespace=\"default\", # Kubernetes namespace\n", " k8s_config_file=\"~/.kube/config\", # Path to kubeconfig\n", " \n", " # Default resource requirements\n", " default_cores=2,\n", " default_memory=\"4Gi\", # Kubernetes format (Gi, Mi)\n", " default_cpu_limit=4, # CPU limit (can be > cores)\n", " default_memory_limit=\"8Gi\", # Memory limit\n", " \n", " # Container settings\n", " container_image=\"python:3.11-slim\", # Base Python image\n", " image_pull_policy=\"IfNotPresent\", # Image pull policy\n", " \n", " # Job settings\n", " job_ttl_seconds=3600, # Job cleanup after 1 hour\n", " backoff_limit=3, # Retry failed jobs up to 3 times\n", " \n", " # Cleanup\n", " cleanup_on_success=True,\n", " max_parallel_jobs=20\n", ")\n", "\n", "print(\"Kubernetes cluster configured successfully!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example 1: Containerized Machine Learning\n", "\n", "Train machine learning models in Kubernetes pods:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@cluster(\n", " cores=4,\n", " memory=\"8Gi\",\n", " cpu_limit=6,\n", " memory_limit=\"12Gi\",\n", " container_image=\"python:3.11\",\n", " job_name=\"ml-training\" # Custom job name\n", ")\n", "def distributed_ml_training(model_type=\"random_forest\", n_estimators=200, dataset_size=50000):\n", " \"\"\"\n", " Distributed machine learning training in Kubernetes.\n", " \"\"\"\n", " import numpy as np\n", " import os\n", " import json\n", " from datetime import datetime\n", " \n", " # Install required packages within the container\n", " os.system(\"pip install scikit-learn pandas numpy\")\n", " \n", " from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier\n", " from sklearn.svm import SVC\n", " from sklearn.neural_network import MLPClassifier\n", " from sklearn.datasets import make_classification\n", " from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV\n", " from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score\n", " from sklearn.preprocessing import StandardScaler\n", " import pandas as pd\n", " \n", " print(f\"Starting ML training: {model_type}, {n_estimators} estimators, {dataset_size:,} samples\")\n", " print(f\"Pod started at: {datetime.now()}\")\n", " \n", " # Generate synthetic dataset\n", " print(\"Generating synthetic dataset...\")\n", " X, y = make_classification(\n", " n_samples=dataset_size,\n", " n_features=50,\n", " n_informative=30,\n", " n_redundant=10,\n", " n_classes=3,\n", " n_clusters_per_class=2,\n", " flip_y=0.05, # Add some noise\n", " random_state=42\n", " )\n", " \n", " # Split the data\n", " X_train, X_test, y_train, y_test = train_test_split(\n", " X, y, test_size=0.2, random_state=42, stratify=y\n", " )\n", " \n", " # Feature scaling for SVM and MLP\n", " if model_type in ['svm', 'mlp']:\n", " scaler = StandardScaler()\n", " X_train = scaler.fit_transform(X_train)\n", " X_test = scaler.transform(X_test)\n", " \n", " print(f\"Dataset: {X_train.shape[0]:,} training, {X_test.shape[0]:,} test samples\")\n", " \n", " # Model selection and configuration\n", " models = {\n", " 'random_forest': {\n", " 'model': RandomForestClassifier,\n", " 'params': {\n", " 'n_estimators': n_estimators,\n", " 'max_depth': 20,\n", " 'min_samples_split': 5,\n", " 'min_samples_leaf': 2,\n", " 'n_jobs': -1,\n", " 'random_state': 42\n", " },\n", " 'param_grid': {\n", " 'max_depth': [15, 20, 25],\n", " 'min_samples_split': [2, 5, 10]\n", " }\n", " },\n", " 'gradient_boosting': {\n", " 'model': GradientBoostingClassifier,\n", " 'params': {\n", " 'n_estimators': n_estimators,\n", " 'learning_rate': 0.1,\n", " 'max_depth': 6,\n", " 'random_state': 42\n", " },\n", " 'param_grid': {\n", " 'learning_rate': [0.05, 0.1, 0.2],\n", " 'max_depth': [4, 6, 8]\n", " }\n", " },\n", " 'svm': {\n", " 'model': SVC,\n", " 'params': {\n", " 'kernel': 'rbf',\n", " 'C': 1.0,\n", " 'gamma': 'scale',\n", " 'random_state': 42\n", " },\n", " 'param_grid': {\n", " 'C': [0.1, 1.0, 10.0],\n", " 'gamma': ['scale', 'auto']\n", " }\n", " },\n", " 'mlp': {\n", " 'model': MLPClassifier,\n", " 'params': {\n", " 'hidden_layer_sizes': (100, 50),\n", " 'activation': 'relu',\n", " 'solver': 'adam',\n", " 'alpha': 0.0001,\n", " 'max_iter': 1000,\n", " 'random_state': 42\n", " },\n", " 'param_grid': {\n", " 'hidden_layer_sizes': [(50,), (100,), (100, 50)],\n", " 'alpha': [0.0001, 0.001, 0.01]\n", " }\n", " }\n", " }\n", " \n", " if model_type not in models:\n", " model_type = 'random_forest' # Default fallback\n", " \n", " model_config = models[model_type]\n", " \n", " # Train base model\n", " print(f\"Training {model_type} model...\")\n", " start_time = datetime.now()\n", " \n", " base_model = model_config['model'](**model_config['params'])\n", " base_model.fit(X_train, y_train)\n", " \n", " training_time = (datetime.now() - start_time).total_seconds()\n", " print(f\"Base model training completed in {training_time:.2f} seconds\")\n", " \n", " # Base model evaluation\n", " y_pred = base_model.predict(X_test)\n", " base_accuracy = accuracy_score(y_test, y_pred)\n", " base_precision = precision_score(y_test, y_pred, average='weighted')\n", " base_recall = recall_score(y_test, y_pred, average='weighted')\n", " base_f1 = f1_score(y_test, y_pred, average='weighted')\n", " \n", " print(f\"Base model performance: Accuracy={base_accuracy:.4f}\")\n", " \n", " # Cross-validation\n", " print(\"Performing cross-validation...\")\n", " cv_scores = cross_val_score(base_model, X_train, y_train, cv=5, n_jobs=-1)\n", " \n", " # Hyperparameter optimization\n", " print(\"Optimizing hyperparameters...\")\n", " grid_search = GridSearchCV(\n", " model_config['model'](),\n", " model_config['param_grid'],\n", " cv=3,\n", " scoring='accuracy',\n", " n_jobs=-1,\n", " verbose=0\n", " )\n", " \n", " grid_search.fit(X_train, y_train)\n", " best_model = grid_search.best_estimator_\n", " \n", " # Best model evaluation\n", " y_pred_best = best_model.predict(X_test)\n", " best_accuracy = accuracy_score(y_test, y_pred_best)\n", " best_precision = precision_score(y_test, y_pred_best, average='weighted')\n", " best_recall = recall_score(y_test, y_pred_best, average='weighted')\n", " best_f1 = f1_score(y_test, y_pred_best, average='weighted')\n", " \n", " print(f\"Optimized model performance: Accuracy={best_accuracy:.4f}\")\n", " \n", " # Feature importance (if available)\n", " feature_importance = None\n", " if hasattr(best_model, 'feature_importances_'):\n", " feature_importance = best_model.feature_importances_.tolist()\n", " top_features = sorted(enumerate(feature_importance), \n", " key=lambda x: x[1], reverse=True)[:10]\n", " print(f\"Top 5 features: {[f'Feature_{i}' for i, _ in top_features[:5]]}\")\n", " \n", " # Model complexity analysis\n", " def analyze_model_complexity(model, model_type):\n", " complexity_metrics = {}\n", " \n", " if model_type == 'random_forest':\n", " complexity_metrics = {\n", " 'n_estimators': model.n_estimators,\n", " 'max_depth': model.max_depth,\n", " 'total_nodes': sum(tree.tree_.node_count for tree in model.estimators_),\n", " 'avg_depth': np.mean([tree.tree_.max_depth for tree in model.estimators_])\n", " }\n", " elif model_type == 'gradient_boosting':\n", " complexity_metrics = {\n", " 'n_estimators': model.n_estimators,\n", " 'max_depth': model.max_depth,\n", " 'learning_rate': model.learning_rate,\n", " 'total_nodes': sum(tree[0].tree_.node_count for tree in model.estimators_)\n", " }\n", " elif model_type == 'svm':\n", " complexity_metrics = {\n", " 'n_support_vectors': model.n_support_.sum(),\n", " 'kernel': model.kernel,\n", " 'C': model.C,\n", " 'gamma': model.gamma\n", " }\n", " elif model_type == 'mlp':\n", " complexity_metrics = {\n", " 'hidden_layers': len(model.hidden_layer_sizes),\n", " 'total_parameters': sum(layer.size for layer in model.coefs_) + \n", " sum(layer.size for layer in model.intercepts_),\n", " 'n_iterations': model.n_iter_,\n", " 'loss': model.loss_\n", " }\n", " \n", " return complexity_metrics\n", " \n", " complexity_metrics = analyze_model_complexity(best_model, model_type)\n", " \n", " # Compile results\n", " training_results = {\n", " 'model_info': {\n", " 'model_type': model_type,\n", " 'dataset_size': dataset_size,\n", " 'n_features': X.shape[1],\n", " 'n_classes': len(np.unique(y)),\n", " 'training_samples': X_train.shape[0],\n", " 'test_samples': X_test.shape[0]\n", " },\n", " 'training_metrics': {\n", " 'training_time_seconds': training_time,\n", " 'hyperparameter_optimization': True,\n", " 'cross_validation_folds': 5\n", " },\n", " 'base_model_performance': {\n", " 'accuracy': base_accuracy,\n", " 'precision': base_precision,\n", " 'recall': base_recall,\n", " 'f1_score': base_f1\n", " },\n", " 'optimized_model_performance': {\n", " 'accuracy': best_accuracy,\n", " 'precision': best_precision,\n", " 'recall': best_recall,\n", " 'f1_score': best_f1,\n", " 'improvement_over_base': best_accuracy - base_accuracy\n", " },\n", " 'cross_validation': {\n", " 'cv_scores': cv_scores.tolist(),\n", " 'cv_mean': np.mean(cv_scores),\n", " 'cv_std': np.std(cv_scores)\n", " },\n", " 'best_hyperparameters': grid_search.best_params_,\n", " 'model_complexity': complexity_metrics,\n", " 'feature_importance': feature_importance,\n", " 'kubernetes_info': {\n", " 'pod_name': os.environ.get('HOSTNAME', 'unknown'),\n", " 'namespace': os.environ.get('KUBERNETES_NAMESPACE', 'default'),\n", " 'completion_time': datetime.now().isoformat()\n", " }\n", " }\n", " \n", " return training_results\n", "\n", "# Run ML training in Kubernetes\n", "ml_results = distributed_ml_training(\n", " model_type=\"random_forest\", \n", " n_estimators=150, \n", " dataset_size=30000\n", ")\n", "\n", "print(f\"\\nMACHINE LEARNING TRAINING COMPLETE\")\n", "model_info = ml_results['model_info']\n", "print(f\"Model: {model_info['model_type']}\")\n", "print(f\"Dataset: {model_info['dataset_size']:,} samples, {model_info['n_features']} features\")\n", "\n", "base_perf = ml_results['base_model_performance']\n", "opt_perf = ml_results['optimized_model_performance']\n", "print(f\"\\nPerformance Comparison:\")\n", "print(f\" Base model accuracy: {base_perf['accuracy']:.4f}\")\n", "print(f\" Optimized accuracy: {opt_perf['accuracy']:.4f}\")\n", "print(f\" Improvement: +{opt_perf['improvement_over_base']:.4f}\")\n", "\n", "cv = ml_results['cross_validation']\n", "print(f\"\\nCross-validation: {cv['cv_mean']:.4f} ± {cv['cv_std']:.4f}\")\n", "\n", "k8s_info = ml_results['kubernetes_info']\n", "print(f\"\\nKubernetes Info:\")\n", "print(f\" Pod: {k8s_info['pod_name']}\")\n", "print(f\" Namespace: {k8s_info['namespace']}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example 2: Distributed Data Processing\n", "\n", "Process large datasets using Kubernetes job parallelization:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@cluster(\n", " cores=6,\n", " memory=\"12Gi\",\n", " cpu_limit=8,\n", " memory_limit=\"16Gi\",\n", " parallel=True, # Enable automatic parallelization\n", " job_name=\"data-processing\",\n", " parallelism=3, # Run up to 3 pods simultaneously\n", " completions=10 # Total number of completions needed\n", ")\n", "def distributed_data_analysis(data_chunks=100, chunk_size=10000):\n", " \"\"\"\n", " Distributed data analysis across multiple Kubernetes pods.\n", " \"\"\"\n", " import numpy as np\n", " import os\n", " import json\n", " from datetime import datetime, timedelta\n", " import random\n", " import math\n", " \n", " # Install required packages\n", " os.system(\"pip install pandas scipy numpy\")\n", " \n", " import pandas as pd\n", " from scipy import stats\n", " \n", " print(f\"Starting distributed data analysis: {data_chunks} chunks of {chunk_size:,} records each\")\n", " print(f\"Total data points: {data_chunks * chunk_size:,}\")\n", " \n", " def generate_synthetic_timeseries_data(chunk_id, chunk_size):\n", " \"\"\"Generate synthetic time-series data for analysis\"\"\"\n", " np.random.seed(chunk_id * 123) # Reproducible but different per chunk\n", " \n", " # Generate timestamps (1 year of hourly data)\n", " start_date = datetime(2023, 1, 1) + timedelta(days=chunk_id * 10)\n", " timestamps = [start_date + timedelta(hours=i) for i in range(chunk_size)]\n", " \n", " # Generate multiple correlated time series\n", " base_trend = np.linspace(100, 200, chunk_size) # Long-term trend\n", " seasonal = 20 * np.sin(2 * np.pi * np.arange(chunk_size) / (24 * 7)) # Weekly seasonality\n", " daily = 10 * np.sin(2 * np.pi * np.arange(chunk_size) / 24) # Daily pattern\n", " \n", " # Add different noise patterns\n", " noise = np.random.normal(0, 5, chunk_size)\n", " \n", " # Primary metric (e.g., web traffic, sales, etc.)\n", " primary_metric = base_trend + seasonal + daily + noise\n", " primary_metric = np.maximum(0, primary_metric) # Ensure non-negative\n", " \n", " # Secondary metrics correlated with primary\n", " secondary_metric = primary_metric * 0.7 + np.random.normal(0, 3, chunk_size)\n", " tertiary_metric = primary_metric * 1.2 + np.random.normal(10, 8, chunk_size)\n", " \n", " # Categorical data\n", " categories = ['A', 'B', 'C', 'D', 'E']\n", " category_weights = [0.3, 0.25, 0.2, 0.15, 0.1]\n", " categories_data = np.random.choice(categories, chunk_size, p=category_weights)\n", " \n", " # Geographic regions\n", " regions = ['North', 'South', 'East', 'West', 'Central']\n", " region_weights = [0.2, 0.2, 0.25, 0.2, 0.15]\n", " regions_data = np.random.choice(regions, chunk_size, p=region_weights)\n", " \n", " # Create DataFrame\n", " data = pd.DataFrame({\n", " 'timestamp': timestamps,\n", " 'primary_metric': primary_metric,\n", " 'secondary_metric': secondary_metric,\n", " 'tertiary_metric': tertiary_metric,\n", " 'category': categories_data,\n", " 'region': regions_data,\n", " 'chunk_id': chunk_id\n", " })\n", " \n", " return data\n", " \n", " def analyze_chunk_statistics(chunk_data):\n", " \"\"\"Comprehensive statistical analysis of a data chunk\"\"\"\n", " numeric_cols = ['primary_metric', 'secondary_metric', 'tertiary_metric']\n", " \n", " statistics = {}\n", " \n", " # Basic descriptive statistics\n", " for col in numeric_cols:\n", " series = chunk_data[col]\n", " statistics[col] = {\n", " 'count': len(series),\n", " 'mean': float(np.mean(series)),\n", " 'median': float(np.median(series)),\n", " 'std': float(np.std(series)),\n", " 'min': float(np.min(series)),\n", " 'max': float(np.max(series)),\n", " 'q25': float(np.percentile(series, 25)),\n", " 'q75': float(np.percentile(series, 75)),\n", " 'skewness': float(stats.skew(series)),\n", " 'kurtosis': float(stats.kurtosis(series))\n", " }\n", " \n", " # Correlation analysis\n", " correlation_matrix = chunk_data[numeric_cols].corr()\n", " statistics['correlations'] = {\n", " 'primary_secondary': float(correlation_matrix.loc['primary_metric', 'secondary_metric']),\n", " 'primary_tertiary': float(correlation_matrix.loc['primary_metric', 'tertiary_metric']),\n", " 'secondary_tertiary': float(correlation_matrix.loc['secondary_metric', 'tertiary_metric'])\n", " }\n", " \n", " # Categorical analysis\n", " category_stats = chunk_data['category'].value_counts()\n", " region_stats = chunk_data['region'].value_counts()\n", " \n", " statistics['categorical'] = {\n", " 'category_distribution': category_stats.to_dict(),\n", " 'region_distribution': region_stats.to_dict(),\n", " 'category_entropy': float(-sum(p * np.log2(p) for p in category_stats / len(chunk_data) if p > 0)),\n", " 'region_entropy': float(-sum(p * np.log2(p) for p in region_stats / len(chunk_data) if p > 0))\n", " }\n", " \n", " # Time-based analysis\n", " chunk_data['hour'] = chunk_data['timestamp'].dt.hour\n", " chunk_data['day_of_week'] = chunk_data['timestamp'].dt.dayofweek\n", " \n", " hourly_pattern = chunk_data.groupby('hour')['primary_metric'].mean()\n", " daily_pattern = chunk_data.groupby('day_of_week')['primary_metric'].mean()\n", " \n", " statistics['temporal'] = {\n", " 'hourly_peak': int(hourly_pattern.idxmax()),\n", " 'hourly_trough': int(hourly_pattern.idxmin()),\n", " 'hourly_variation': float(hourly_pattern.std()),\n", " 'daily_peak': int(daily_pattern.idxmax()), # 0=Monday, 6=Sunday\n", " 'daily_variation': float(daily_pattern.std())\n", " }\n", " \n", " # Anomaly detection (simple threshold-based)\n", " for col in numeric_cols:\n", " series = chunk_data[col]\n", " q1, q3 = np.percentile(series, [25, 75])\n", " iqr = q3 - q1\n", " lower_bound = q1 - 1.5 * iqr\n", " upper_bound = q3 + 1.5 * iqr\n", " \n", " outliers = series[(series < lower_bound) | (series > upper_bound)]\n", " statistics[col]['outliers'] = {\n", " 'count': len(outliers),\n", " 'percentage': float(len(outliers) / len(series) * 100),\n", " 'lower_bound': float(lower_bound),\n", " 'upper_bound': float(upper_bound)\n", " }\n", " \n", " return statistics\n", " \n", " def detect_patterns_and_trends(chunk_data):\n", " \"\"\"Advanced pattern detection and trend analysis\"\"\"\n", " patterns = {}\n", " \n", " # Trend analysis using linear regression\n", " time_index = np.arange(len(chunk_data))\n", " \n", " for col in ['primary_metric', 'secondary_metric', 'tertiary_metric']:\n", " slope, intercept, r_value, p_value, std_err = stats.linregress(time_index, chunk_data[col])\n", " \n", " patterns[f'{col}_trend'] = {\n", " 'slope': float(slope),\n", " 'r_squared': float(r_value ** 2),\n", " 'p_value': float(p_value),\n", " 'trend_direction': 'increasing' if slope > 0 else 'decreasing',\n", " 'trend_strength': 'strong' if abs(r_value) > 0.7 else 'moderate' if abs(r_value) > 0.3 else 'weak'\n", " }\n", " \n", " # Seasonality detection (simplified)\n", " primary_hourly = chunk_data.groupby(chunk_data['timestamp'].dt.hour)['primary_metric'].mean()\n", " hourly_variation = primary_hourly.std() / primary_hourly.mean()\n", " \n", " patterns['seasonality'] = {\n", " 'hourly_coefficient_of_variation': float(hourly_variation),\n", " 'has_daily_pattern': hourly_variation > 0.15, # Threshold for daily seasonality\n", " 'peak_hours': [int(hour) for hour in primary_hourly.nlargest(3).index.tolist()],\n", " 'trough_hours': [int(hour) for hour in primary_hourly.nsmallest(3).index.tolist()]\n", " }\n", " \n", " # Change point detection (simplified)\n", " def detect_change_points(series, window=100):\n", " if len(series) < 2 * window:\n", " return []\n", " \n", " change_points = []\n", " for i in range(window, len(series) - window):\n", " before = series[i-window:i]\n", " after = series[i:i+window]\n", " \n", " # Statistical test for difference in means\n", " t_stat, p_val = stats.ttest_ind(before, after)\n", " if p_val < 0.01: # Significant change\n", " change_points.append(i)\n", " \n", " return change_points\n", " \n", " change_points = detect_change_points(chunk_data['primary_metric'].values)\n", " patterns['change_points'] = {\n", " 'detected_points': len(change_points),\n", " 'positions': change_points[:5] if change_points else [], # First 5\n", " 'has_significant_changes': len(change_points) > 0\n", " }\n", " \n", " return patterns\n", " \n", " # Process chunks (this loop will be automatically parallelized)\n", " chunk_results = []\n", " \n", " for chunk_id in range(data_chunks):\n", " if chunk_id % 10 == 0:\n", " print(f\"Processing chunk {chunk_id + 1}/{data_chunks}...\")\n", " \n", " # Generate data for this chunk\n", " chunk_data = generate_synthetic_timeseries_data(chunk_id, chunk_size)\n", " \n", " # Analyze the chunk\n", " chunk_stats = analyze_chunk_statistics(chunk_data)\n", " chunk_patterns = detect_patterns_and_trends(chunk_data)\n", " \n", " chunk_result = {\n", " 'chunk_id': chunk_id,\n", " 'chunk_size': len(chunk_data),\n", " 'statistics': chunk_stats,\n", " 'patterns': chunk_patterns,\n", " 'processing_timestamp': datetime.now().isoformat()\n", " }\n", " \n", " chunk_results.append(chunk_result)\n", " \n", " # Aggregate results across all chunks\n", " def aggregate_chunk_results(chunk_results):\n", " \"\"\"Aggregate statistics across all processed chunks\"\"\"\n", " \n", " total_records = sum(chunk['chunk_size'] for chunk in chunk_results)\n", " \n", " # Aggregate basic statistics\n", " metrics = ['primary_metric', 'secondary_metric', 'tertiary_metric']\n", " aggregated_stats = {}\n", " \n", " for metric in metrics:\n", " means = [chunk['statistics'][metric]['mean'] for chunk in chunk_results]\n", " stds = [chunk['statistics'][metric]['std'] for chunk in chunk_results]\n", " \n", " aggregated_stats[metric] = {\n", " 'global_mean': float(np.mean(means)),\n", " 'mean_std': float(np.std(means)),\n", " 'avg_within_chunk_std': float(np.mean(stds)),\n", " 'total_variation': float(np.std(means) + np.mean(stds))\n", " }\n", " \n", " # Aggregate patterns\n", " trend_directions = {}\n", " for metric in metrics:\n", " directions = [chunk['patterns'][f'{metric}_trend']['trend_direction'] \n", " for chunk in chunk_results]\n", " trend_directions[metric] = {\n", " 'increasing_chunks': directions.count('increasing'),\n", " 'decreasing_chunks': directions.count('decreasing'),\n", " 'dominant_trend': 'increasing' if directions.count('increasing') > directions.count('decreasing') else 'decreasing'\n", " }\n", " \n", " # Aggregate seasonality\n", " seasonal_chunks = sum(1 for chunk in chunk_results \n", " if chunk['patterns']['seasonality']['has_daily_pattern'])\n", " \n", " # Aggregate change points\n", " total_change_points = sum(chunk['patterns']['change_points']['detected_points'] \n", " for chunk in chunk_results)\n", " \n", " aggregated_results = {\n", " 'processing_summary': {\n", " 'total_chunks': len(chunk_results),\n", " 'total_records': total_records,\n", " 'avg_records_per_chunk': total_records / len(chunk_results),\n", " 'processing_completed': datetime.now().isoformat()\n", " },\n", " 'aggregated_statistics': aggregated_stats,\n", " 'global_patterns': {\n", " 'trend_analysis': trend_directions,\n", " 'seasonality': {\n", " 'chunks_with_daily_patterns': seasonal_chunks,\n", " 'percentage_seasonal': float(seasonal_chunks / len(chunk_results) * 100)\n", " },\n", " 'change_points': {\n", " 'total_detected': total_change_points,\n", " 'avg_per_chunk': float(total_change_points / len(chunk_results))\n", " }\n", " },\n", " 'data_quality': {\n", " 'chunks_processed': len(chunk_results),\n", " 'processing_success_rate': 100.0, # All chunks processed successfully\n", " 'data_consistency_score': float(np.mean([chunk['statistics']['primary_metric']['std'] \n", " for chunk in chunk_results]) / \n", " np.std([chunk['statistics']['primary_metric']['mean'] \n", " for chunk in chunk_results])) if len(chunk_results) > 1 else 1.0\n", " },\n", " 'kubernetes_execution': {\n", " 'pod_hostname': os.environ.get('HOSTNAME', 'unknown'),\n", " 'parallel_execution': True,\n", " 'chunk_distribution': 'automatic_parallelization'\n", " }\n", " }\n", " \n", " return aggregated_results\n", " \n", " final_results = aggregate_chunk_results(chunk_results)\n", " final_results['individual_chunks'] = chunk_results[:5] # Include first 5 for inspection\n", " \n", " return final_results\n", "\n", "# Run distributed data analysis\n", "data_results = distributed_data_analysis(data_chunks=50, chunk_size=5000)\n", "\n", "print(f\"\\nDISTRIBUTED DATA ANALYSIS COMPLETE\")\n", "summary = data_results['processing_summary']\n", "print(f\"Chunks processed: {summary['total_chunks']}\")\n", "print(f\"Total records: {summary['total_records']:,}\")\n", "print(f\"Avg records per chunk: {summary['avg_records_per_chunk']:,.0f}\")\n", "\n", "patterns = data_results['global_patterns']\n", "print(f\"\\nGlobal Patterns:\")\n", "print(f\" Chunks with daily seasonality: {patterns['seasonality']['chunks_with_daily_patterns']} ({patterns['seasonality']['percentage_seasonal']:.1f}%)\")\n", "print(f\" Total change points detected: {patterns['change_points']['total_detected']}\")\n", "print(f\" Average change points per chunk: {patterns['change_points']['avg_per_chunk']:.2f}\")\n", "\n", "quality = data_results['data_quality']\n", "print(f\"\\nData Quality:\")\n", "print(f\" Processing success rate: {quality['processing_success_rate']:.1f}%\")\n", "print(f\" Data consistency score: {quality['data_consistency_score']:.3f}\")\n", "\n", "k8s_exec = data_results['kubernetes_execution']\n", "print(f\"\\nKubernetes Execution:\")\n", "print(f\" Pod hostname: {k8s_exec['pod_hostname']}\")\n", "print(f\" Parallel execution: {k8s_exec['parallel_execution']}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example 3: Fault-Tolerant Scientific Computing\n", "\n", "Demonstrate Kubernetes' fault tolerance and job retry capabilities:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@cluster(\n", " cores=4,\n", " memory=\"8Gi\",\n", " cpu_limit=6,\n", " memory_limit=\"12Gi\",\n", " backoff_limit=5, # Retry up to 5 times on failure\n", " restart_policy=\"OnFailure\",\n", " job_name=\"fault-tolerant-computation\"\n", ")\n", "def fault_tolerant_monte_carlo(n_simulations=1000000, failure_probability=0.1, checkpoint_interval=100000):\n", " \"\"\"\n", " Fault-tolerant Monte Carlo simulation with checkpointing.\n", " \"\"\"\n", " import numpy as np\n", " import os\n", " import json\n", " import pickle\n", " import random\n", " import time\n", " from datetime import datetime\n", " \n", " print(f\"Starting fault-tolerant Monte Carlo: {n_simulations:,} simulations\")\n", " print(f\"Failure probability: {failure_probability}, Checkpoint interval: {checkpoint_interval:,}\")\n", " \n", " # Simulate random failures for demonstration\n", " def simulate_random_failure():\n", " if random.random() < failure_probability:\n", " failure_types = [\n", " \"Simulated network timeout\",\n", " \"Simulated memory pressure\",\n", " \"Simulated compute node failure\",\n", " \"Simulated resource exhaustion\"\n", " ]\n", " failure_type = random.choice(failure_types)\n", " print(f\"WARNING: {failure_type} - continuing with fault tolerance...\")\n", " time.sleep(2) # Simulate recovery time\n", " return True\n", " return False\n", " \n", " # Checkpoint management\n", " checkpoint_file = \"/tmp/monte_carlo_checkpoint.pkl\"\n", " \n", " def save_checkpoint(iteration, results, random_state):\n", " \"\"\"Save current progress to checkpoint\"\"\"\n", " checkpoint_data = {\n", " 'iteration': iteration,\n", " 'results': results,\n", " 'random_state': random_state,\n", " 'timestamp': datetime.now().isoformat()\n", " }\n", " \n", " try:\n", " with open(checkpoint_file, 'wb') as f:\n", " pickle.dump(checkpoint_data, f)\n", " print(f\"Checkpoint saved at iteration {iteration:,}\")\n", " except Exception as e:\n", " print(f\"Failed to save checkpoint: {e}\")\n", " \n", " def load_checkpoint():\n", " \"\"\"Load progress from checkpoint if available\"\"\"\n", " if os.path.exists(checkpoint_file):\n", " try:\n", " with open(checkpoint_file, 'rb') as f:\n", " checkpoint_data = pickle.load(f)\n", " print(f\"Checkpoint loaded from iteration {checkpoint_data['iteration']:,}\")\n", " return checkpoint_data\n", " except Exception as e:\n", " print(f\"Failed to load checkpoint: {e}\")\n", " return None\n", " \n", " # Monte Carlo simulation functions\n", " def estimate_pi_sample():\n", " \"\"\"Single sample for pi estimation\"\"\"\n", " x, y = np.random.random(2)\n", " return 1 if x*x + y*y <= 1 else 0\n", " \n", " def option_pricing_sample(S0=100, K=105, T=1, r=0.05, sigma=0.2):\n", " \"\"\"Single Monte Carlo sample for option pricing\"\"\"\n", " # Geometric Brownian Motion\n", " dt = T\n", " z = np.random.standard_normal()\n", " ST = S0 * np.exp((r - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * z)\n", " payoff = max(ST - K, 0) # Call option payoff\n", " return payoff * np.exp(-r * T) # Discounted payoff\n", " \n", " def portfolio_var_sample(returns_mean=0.08, returns_std=0.2, portfolio_value=1000000):\n", " \"\"\"Single sample for portfolio Value at Risk calculation\"\"\"\n", " daily_return = np.random.normal(returns_mean/252, returns_std/np.sqrt(252))\n", " portfolio_change = portfolio_value * daily_return\n", " return portfolio_change\n", " \n", " def percolation_sample(grid_size=50, p=0.593):\n", " \"\"\"Single sample for percolation theory\"\"\"\n", " # Simplified 2D percolation\n", " grid = np.random.random((grid_size, grid_size)) < p\n", " # Check if there's a path from top to bottom (simplified)\n", " # This is a very simplified percolation check\n", " top_row = grid[0, :]\n", " bottom_row = grid[-1, :]\n", " return 1 if np.any(top_row) and np.any(bottom_row) else 0\n", " \n", " # Load checkpoint if available\n", " checkpoint = load_checkpoint()\n", " if checkpoint:\n", " start_iteration = checkpoint['iteration']\n", " pi_samples = checkpoint['results']['pi_samples']\n", " option_prices = checkpoint['results']['option_prices']\n", " portfolio_changes = checkpoint['results']['portfolio_changes']\n", " percolation_samples = checkpoint['results']['percolation_samples']\n", " # Restore random state\n", " np.random.set_state(checkpoint['random_state'])\n", " print(f\"Resuming from iteration {start_iteration:,}\")\n", " else:\n", " start_iteration = 0\n", " pi_samples = []\n", " option_prices = []\n", " portfolio_changes = []\n", " percolation_samples = []\n", " \n", " # Main simulation loop with fault tolerance\n", " failure_count = 0\n", " successful_simulations = start_iteration\n", " \n", " for i in range(start_iteration, n_simulations):\n", " if i % (n_simulations // 20) == 0:\n", " print(f\"Progress: {i:,}/{n_simulations:,} ({100*i/n_simulations:.1f}%)\")\n", " \n", " # Simulate potential failures\n", " if simulate_random_failure():\n", " failure_count += 1\n", " continue # Skip this iteration but continue\n", " \n", " # Perform Monte Carlo samples\n", " try:\n", " pi_sample = estimate_pi_sample()\n", " option_price = option_pricing_sample()\n", " portfolio_change = portfolio_var_sample()\n", " percolation = percolation_sample()\n", " \n", " pi_samples.append(pi_sample)\n", " option_prices.append(option_price)\n", " portfolio_changes.append(portfolio_change)\n", " percolation_samples.append(percolation)\n", " \n", " successful_simulations += 1\n", " \n", " except Exception as e:\n", " print(f\"Simulation error at iteration {i}: {e}\")\n", " failure_count += 1\n", " continue\n", " \n", " # Checkpoint periodically\n", " if (i + 1) % checkpoint_interval == 0:\n", " results = {\n", " 'pi_samples': pi_samples,\n", " 'option_prices': option_prices,\n", " 'portfolio_changes': portfolio_changes,\n", " 'percolation_samples': percolation_samples\n", " }\n", " save_checkpoint(i + 1, results, np.random.get_state())\n", " \n", " # Final calculations\n", " print(f\"Simulation completed. Successful: {successful_simulations:,}, Failures: {failure_count}\")\n", " \n", " # Pi estimation\n", " pi_estimate = 4 * np.mean(pi_samples) if pi_samples else 0\n", " pi_error = abs(pi_estimate - np.pi) if pi_samples else 0\n", " pi_confidence_interval = 1.96 * np.sqrt(np.var(pi_samples) / len(pi_samples)) if len(pi_samples) > 1 else 0\n", " \n", " # Option pricing\n", " option_price_mean = np.mean(option_prices) if option_prices else 0\n", " option_price_std = np.std(option_prices) if len(option_prices) > 1 else 0\n", " option_confidence_interval = 1.96 * option_price_std / np.sqrt(len(option_prices)) if len(option_prices) > 1 else 0\n", " \n", " # Portfolio VaR (95% confidence)\n", " if portfolio_changes:\n", " portfolio_changes_sorted = sorted(portfolio_changes)\n", " var_95 = portfolio_changes_sorted[int(0.05 * len(portfolio_changes))]\n", " expected_shortfall = np.mean(portfolio_changes_sorted[:int(0.05 * len(portfolio_changes))])\n", " else:\n", " var_95 = 0\n", " expected_shortfall = 0\n", " \n", " # Percolation probability\n", " percolation_probability = np.mean(percolation_samples) if percolation_samples else 0\n", " \n", " # Cleanup checkpoint file\n", " try:\n", " os.remove(checkpoint_file)\n", " print(\"Checkpoint file cleaned up\")\n", " except:\n", " pass\n", " \n", " fault_tolerant_results = {\n", " 'simulation_parameters': {\n", " 'total_simulations_requested': n_simulations,\n", " 'successful_simulations': successful_simulations,\n", " 'simulated_failures': failure_count,\n", " 'success_rate': successful_simulations / n_simulations if n_simulations > 0 else 0,\n", " 'checkpoint_interval': checkpoint_interval\n", " },\n", " 'pi_estimation': {\n", " 'estimate': pi_estimate,\n", " 'true_value': float(np.pi),\n", " 'absolute_error': pi_error,\n", " 'relative_error_percent': (pi_error / np.pi) * 100,\n", " 'confidence_interval_95': pi_confidence_interval * 4, # Scale for pi\n", " 'samples_used': len(pi_samples)\n", " },\n", " 'option_pricing': {\n", " 'estimated_price': option_price_mean,\n", " 'price_std_dev': option_price_std,\n", " 'confidence_interval_95': option_confidence_interval,\n", " 'samples_used': len(option_prices)\n", " },\n", " 'portfolio_risk': {\n", " 'value_at_risk_95': var_95,\n", " 'expected_shortfall': expected_shortfall,\n", " 'daily_volatility': np.std(portfolio_changes) if len(portfolio_changes) > 1 else 0,\n", " 'samples_used': len(portfolio_changes)\n", " },\n", " 'percolation_analysis': {\n", " 'percolation_probability': percolation_probability,\n", " 'theoretical_threshold': 0.593, # 2D percolation threshold\n", " 'samples_used': len(percolation_samples)\n", " },\n", " 'fault_tolerance': {\n", " 'checkpoint_saves': successful_simulations // checkpoint_interval,\n", " 'recovery_successful': checkpoint is not None,\n", " 'resilience_score': (successful_simulations / (successful_simulations + failure_count)) if (successful_simulations + failure_count) > 0 else 0\n", " },\n", " 'kubernetes_info': {\n", " 'pod_name': os.environ.get('HOSTNAME', 'unknown'),\n", " 'restart_count': int(os.environ.get('RESTART_COUNT', '0')),\n", " 'completion_time': datetime.now().isoformat()\n", " }\n", " }\n", " \n", " return fault_tolerant_results\n", "\n", "# Run fault-tolerant Monte Carlo simulation\n", "mc_results = fault_tolerant_monte_carlo(\n", " n_simulations=500000, \n", " failure_probability=0.05, # 5% chance of simulated failure\n", " checkpoint_interval=50000\n", ")\n", "\n", "print(f\"\\nFAULT-TOLERANT MONTE CARLO COMPLETE\")\n", "sim_params = mc_results['simulation_parameters']\n", "print(f\"Requested simulations: {sim_params['total_simulations_requested']:,}\")\n", "print(f\"Successful simulations: {sim_params['successful_simulations']:,}\")\n", "print(f\"Simulated failures: {sim_params['simulated_failures']}\")\n", "print(f\"Success rate: {sim_params['success_rate']*100:.1f}%\")\n", "\n", "pi_est = mc_results['pi_estimation']\n", "print(f\"\\nPi Estimation:\")\n", "print(f\" Estimate: {pi_est['estimate']:.6f}\")\n", "print(f\" True value: {pi_est['true_value']:.6f}\")\n", "print(f\" Error: {pi_est['relative_error_percent']:.4f}%\")\n", "\n", "option = mc_results['option_pricing']\n", "print(f\"\\nOption Pricing:\")\n", "print(f\" Estimated price: ${option['estimated_price']:.2f}\")\n", "print(f\" Standard deviation: ${option['price_std_dev']:.2f}\")\n", "\n", "risk = mc_results['portfolio_risk']\n", "print(f\"\\nPortfolio Risk:\")\n", "print(f\" VaR (95%): ${risk['value_at_risk_95']:,.0f}\")\n", "print(f\" Expected shortfall: ${risk['expected_shortfall']:,.0f}\")\n", "\n", "fault_tol = mc_results['fault_tolerance']\n", "print(f\"\\nFault Tolerance:\")\n", "print(f\" Checkpoints saved: {fault_tol['checkpoint_saves']}\")\n", "print(f\" Resilience score: {fault_tol['resilience_score']:.3f}\")\n", "\n", "k8s_info = mc_results['kubernetes_info']\n", "print(f\"\\nKubernetes Info:\")\n", "print(f\" Pod: {k8s_info['pod_name']}\")\n", "print(f\" Restart count: {k8s_info['restart_count']}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Kubernetes Resource Management and Best Practices" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def kubernetes_resource_guidelines():\n", " \"\"\"\n", " Guidelines for Kubernetes resource management with Clustrix.\n", " \"\"\"\n", " \n", " resource_patterns = {\n", " 'cpu_intensive': {\n", " 'description': 'Mathematical computations, simulations, optimization',\n", " 'resource_ratio': 'cores ≈ cpu_limit, memory moderate',\n", " 'example': {\n", " 'cores': 8,\n", " 'memory': '16Gi',\n", " 'cpu_limit': 8,\n", " 'memory_limit': '20Gi'\n", " },\n", " 'use_cases': ['Monte Carlo simulations', 'Genetic algorithms', 'Scientific computing']\n", " },\n", " 'memory_intensive': {\n", " 'description': 'Large dataset processing, in-memory analytics',\n", " 'resource_ratio': 'memory >> cores, higher memory limits',\n", " 'example': {\n", " 'cores': 4,\n", " 'memory': '32Gi',\n", " 'cpu_limit': 6,\n", " 'memory_limit': '40Gi'\n", " },\n", " 'use_cases': ['Big data processing', 'Large ML models', 'Genomics analysis']\n", " },\n", " 'io_intensive': {\n", " 'description': 'File processing, database operations, network I/O',\n", " 'resource_ratio': 'moderate cores and memory, focus on concurrency',\n", " 'example': {\n", " 'cores': 2,\n", " 'memory': '8Gi',\n", " 'cpu_limit': 4,\n", " 'memory_limit': '12Gi'\n", " },\n", " 'use_cases': ['Data ingestion', 'ETL pipelines', 'Web scraping']\n", " },\n", " 'ml_training': {\n", " 'description': 'Machine learning model training',\n", " 'resource_ratio': 'balanced cores and memory, burst capacity',\n", " 'example': {\n", " 'cores': 6,\n", " 'memory': '24Gi',\n", " 'cpu_limit': 8,\n", " 'memory_limit': '32Gi'\n", " },\n", " 'use_cases': ['Deep learning', 'Model hyperparameter tuning', 'Feature engineering']\n", " }\n", " }\n", " \n", " print(\"Kubernetes Resource Management Guidelines:\")\n", " print(\"=\" * 60)\n", " \n", " for pattern_name, pattern in resource_patterns.items():\n", " print(f\"\\n{pattern_name.upper().replace('_', ' ')}:\")\n", " print(f\" Description: {pattern['description']}\")\n", " print(f\" Resource ratio: {pattern['resource_ratio']}\")\n", " print(f\" Example configuration:\")\n", " for key, value in pattern['example'].items():\n", " print(f\" {key}: {value}\")\n", " print(f\" Use cases: {', '.join(pattern['use_cases'])}\")\n", " \n", " return resource_patterns\n", "\n", "def kubernetes_job_patterns():\n", " \"\"\"\n", " Common Kubernetes job patterns for different workloads.\n", " \"\"\"\n", " \n", " job_patterns = {\n", " 'single_job': {\n", " 'description': 'Single pod, run-to-completion',\n", " 'parameters': {\n", " 'completions': 1,\n", " 'parallelism': 1,\n", " 'backoff_limit': 3\n", " },\n", " 'best_for': 'One-off computations, small datasets'\n", " },\n", " 'parallel_job': {\n", " 'description': 'Multiple pods running simultaneously',\n", " 'parameters': {\n", " 'completions': 10,\n", " 'parallelism': 5,\n", " 'backoff_limit': 2\n", " },\n", " 'best_for': 'Independent parallel tasks, embarrassingly parallel problems'\n", " },\n", " 'queue_job': {\n", " 'description': 'Work queue pattern with multiple workers',\n", " 'parameters': {\n", " 'completions': None, # No fixed completion count\n", " 'parallelism': 3,\n", " 'backoff_limit': 5\n", " },\n", " 'best_for': 'Dynamic workloads, task queues, streaming data'\n", " },\n", " 'indexed_job': {\n", " 'description': 'Jobs with completion index for task assignment',\n", " 'parameters': {\n", " 'completion_mode': 'Indexed',\n", " 'completions': 20,\n", " 'parallelism': 4\n", " },\n", " 'best_for': 'Parameter sweeps, data partitioning, batch processing'\n", " }\n", " }\n", " \n", " print(\"\\nKubernetes Job Patterns:\")\n", " print(\"=\" * 40)\n", " \n", " for pattern_name, pattern in job_patterns.items():\n", " print(f\"\\n{pattern_name.upper().replace('_', ' ')}:\")\n", " print(f\" Description: {pattern['description']}\")\n", " print(f\" Best for: {pattern['best_for']}\")\n", " print(f\" Parameters:\")\n", " for key, value in pattern['parameters'].items():\n", " print(f\" {key}: {value}\")\n", " \n", " return job_patterns\n", "\n", "def clustrix_kubernetes_examples():\n", " \"\"\"\n", " Specific Clustrix configuration examples for Kubernetes.\n", " \"\"\"\n", " \n", " examples = {\n", " 'basic_computation': {\n", " 'clustrix_decorator': '''\n", "@cluster(\n", " cores=2,\n", " memory=\"4Gi\",\n", " cpu_limit=3,\n", " memory_limit=\"6Gi\",\n", " container_image=\"python:3.11-slim\"\n", ")''',\n", " 'use_case': 'Simple mathematical computations'\n", " },\n", " 'ml_training': {\n", " 'clustrix_decorator': '''\n", "@cluster(\n", " cores=8,\n", " memory=\"32Gi\",\n", " cpu_limit=12,\n", " memory_limit=\"40Gi\",\n", " container_image=\"python:3.11\",\n", " job_name=\"ml-training\",\n", " backoff_limit=3\n", ")''',\n", " 'use_case': 'Machine learning model training with fault tolerance'\n", " },\n", " 'parallel_processing': {\n", " 'clustrix_decorator': '''\n", "@cluster(\n", " cores=4,\n", " memory=\"16Gi\",\n", " parallel=True,\n", " parallelism=5,\n", " completions=20,\n", " job_name=\"parallel-processing\"\n", ")''',\n", " 'use_case': 'Embarrassingly parallel data processing'\n", " },\n", " 'fault_tolerant': {\n", " 'clustrix_decorator': '''\n", "@cluster(\n", " cores=6,\n", " memory=\"24Gi\",\n", " backoff_limit=5,\n", " restart_policy=\"OnFailure\",\n", " job_ttl_seconds=7200,\n", " active_deadline_seconds=3600\n", ")''',\n", " 'use_case': 'Long-running computations with automatic retry'\n", " }\n", " }\n", " \n", " print(\"\\nClustrix Kubernetes Configuration Examples:\")\n", " print(\"=\" * 50)\n", " \n", " for example_name, example in examples.items():\n", " print(f\"\\n{example_name.upper().replace('_', ' ')}:\")\n", " print(f\"Use case: {example['use_case']}\")\n", " print(f\"Configuration:\")\n", " print(example['clustrix_decorator'])\n", "\n", "# Display all guidelines\n", "resource_patterns = kubernetes_resource_guidelines()\n", "job_patterns = kubernetes_job_patterns()\n", "clustrix_kubernetes_examples()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Kubernetes Cluster Monitoring" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def check_kubernetes_cluster_status():\n", " \"\"\"\n", " Check Kubernetes cluster status and resources.\n", " Note: This requires kubectl to be configured properly.\n", " \"\"\"\n", " import subprocess\n", " import json\n", " \n", " def run_kubectl_command(cmd):\n", " \"\"\"Run kubectl command and return output\"\"\"\n", " try:\n", " result = subprocess.run(\n", " f\"kubectl {cmd}\", \n", " shell=True, \n", " capture_output=True, \n", " text=True,\n", " timeout=30\n", " )\n", " if result.returncode == 0:\n", " return result.stdout.strip()\n", " else:\n", " return f\"Error: {result.stderr.strip()}\"\n", " except subprocess.TimeoutExpired:\n", " return \"Error: Command timed out\"\n", " except Exception as e:\n", " return f\"Error: {str(e)}\"\n", " \n", " print(\"Kubernetes Cluster Status Check:\")\n", " print(\"=\" * 40)\n", " \n", " # Check cluster info\n", " print(\"\\n1. Cluster Info:\")\n", " cluster_info = run_kubectl_command(\"cluster-info\")\n", " if \"Error\" not in cluster_info:\n", " lines = cluster_info.split('\\n')[:3] # First 3 lines\n", " for line in lines:\n", " print(f\" {line}\")\n", " else:\n", " print(f\" {cluster_info}\")\n", " \n", " # Check nodes\n", " print(\"\\n2. Node Status:\")\n", " nodes = run_kubectl_command(\"get nodes -o wide\")\n", " if \"Error\" not in nodes:\n", " lines = nodes.split('\\n')[:6] # Header + first 5 nodes\n", " for line in lines:\n", " print(f\" {line}\")\n", " else:\n", " print(f\" {nodes}\")\n", " \n", " # Check namespaces\n", " print(\"\\n3. Namespaces:\")\n", " namespaces = run_kubectl_command(\"get namespaces\")\n", " if \"Error\" not in namespaces:\n", " lines = namespaces.split('\\n')[:8] # Header + first 7 namespaces\n", " for line in lines:\n", " print(f\" {line}\")\n", " else:\n", " print(f\" {namespaces}\")\n", " \n", " # Check current context\n", " print(\"\\n4. Current Context:\")\n", " context = run_kubectl_command(\"config current-context\")\n", " print(f\" {context}\")\n", " \n", " # Check resource quotas\n", " print(\"\\n5. Resource Quotas (default namespace):\")\n", " quotas = run_kubectl_command(\"get resourcequota -n default\")\n", " if \"No resources found\" in quotas:\n", " print(\" No resource quotas configured\")\n", " else:\n", " print(f\" {quotas}\")\n", " \n", " # Check running jobs\n", " print(\"\\n6. Running Jobs (default namespace):\")\n", " jobs = run_kubectl_command(\"get jobs -n default\")\n", " if \"No resources found\" in jobs:\n", " print(\" No jobs currently running\")\n", " else:\n", " lines = jobs.split('\\n')[:6] # Header + first 5 jobs\n", " for line in lines:\n", " print(f\" {line}\")\n", " \n", " # Check running pods\n", " print(\"\\n7. Running Pods (default namespace):\")\n", " pods = run_kubectl_command(\"get pods -n default\")\n", " if \"No resources found\" in pods:\n", " print(\" No pods currently running\")\n", " else:\n", " lines = pods.split('\\n')[:6] # Header + first 5 pods\n", " for line in lines:\n", " print(f\" {line}\")\n", " \n", " # Check node resource usage\n", " print(\"\\n8. Node Resource Usage:\")\n", " top_nodes = run_kubectl_command(\"top nodes\")\n", " if \"Error\" not in top_nodes and \"not available\" not in top_nodes:\n", " lines = top_nodes.split('\\n')[:6] # Header + first 5 nodes\n", " for line in lines:\n", " print(f\" {line}\")\n", " else:\n", " print(\" Resource metrics not available (metrics-server may not be installed)\")\n", "\n", "# Check cluster status\n", "try:\n", " check_kubernetes_cluster_status()\n", "except Exception as e:\n", " print(f\"Failed to check Kubernetes cluster status: {e}\")\n", " print(\"Make sure kubectl is installed and configured for your cluster\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Summary\n", "\n", "This tutorial covered Kubernetes usage with Clustrix:\n", "\n", "1. **Kubernetes Configuration** - Setting up Clustrix for container-based computing\n", "2. **Machine Learning Training** - Distributed ML workflows in pods\n", "3. **Data Processing** - Large-scale data analysis with automatic parallelization\n", "4. **Fault Tolerance** - Robust computing with checkpointing and retry mechanisms\n", "5. **Resource Management** - Intelligent resource allocation and limits\n", "6. **Job Patterns** - Different Kubernetes job execution patterns\n", "7. **Cluster Monitoring** - Status checking and resource monitoring\n", "\n", "### Key Kubernetes Advantages:\n", "\n", "- **Containerization**: Consistent execution environments across clusters\n", "- **Scalability**: Automatic scaling based on workload demands\n", "- **Fault Tolerance**: Built-in restart and retry mechanisms\n", "- **Resource Management**: Fine-grained CPU and memory control\n", "- **Isolation**: Secure, isolated execution environments\n", "- **Portability**: Run on any Kubernetes cluster (cloud or on-premises)\n", "\n", "### Best Practices:\n", "\n", "- **Resource Limits**: Always set both requests and limits for predictable scheduling\n", "- **Container Images**: Use specific, lightweight base images for faster startup\n", "- **Job Patterns**: Choose appropriate job patterns for your workload type\n", "- **Fault Tolerance**: Implement checkpointing for long-running computations\n", "- **Monitoring**: Regular cluster health and resource usage monitoring\n", "- **Cleanup**: Set TTL for automatic job cleanup to prevent resource buildup\n", "\n", "### Kubernetes-Specific Features:\n", "\n", "- **`cpu_limit` and `memory_limit`**: Resource limits for burst capacity\n", "- **`backoff_limit`**: Automatic retry on failures\n", "- **`parallelism` and `completions`**: Parallel job execution control\n", "- **`job_ttl_seconds`**: Automatic cleanup of completed jobs\n", "- **`restart_policy`**: Pod restart behavior on failure\n", "- **`active_deadline_seconds`**: Maximum job runtime limit\n", "\n", "### Next Steps:\n", "\n", "- Compare with [SLURM Tutorial](slurm_tutorial.ipynb) for HPC-style clusters\n", "- Explore [PBS Tutorial](pbs_tutorial.ipynb) for traditional batch systems\n", "- Try [SSH Tutorial](ssh_tutorial.ipynb) for simple remote execution\n", "- Check the [Configuration Guide](../api/config.rst) for advanced settings\n", "\n", "For more information, visit the [Clustrix Documentation](https://clustrix.readthedocs.io)." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.0" } }, "nbformat": 4, "nbformat_minor": 4 }