Kubernetes Cluster TutorialΒΆ
This tutorial demonstrates how to use Clustrix with Kubernetes clusters for cloud-native distributed computing. Kubernetes provides excellent scalability and resource management for containerized workloads.
PrerequisitesΒΆ
Access to a Kubernetes cluster (local, cloud, or on-premises)
kubectl configured with cluster access
Clustrix installed with Kubernetes support:
pip install clustrix[kubernetes]
Configuration OptionsΒΆ
Option 1: Interactive Widget (Recommended for Jupyter)
For Jupyter notebook users, use the interactive configuration widget:
import clustrix # Auto-loads the magic command
# Use the magic command to open the configuration widget
%%clusterfy
# Interactive widget appears with Kubernetes templates and GUI configuration
Option 2: Programmatic Configuration
Configure Clustrix programmatically for your Kubernetes cluster:
from clustrix import configure
configure(
cluster_type="kubernetes",
# Note: Kubernetes uses kubectl config, no host/SSH needed
namespace="default", # Optional: specify namespace
docker_image="python:3.11-slim" # Optional: custom image
)
Kubernetes-specific FeaturesΒΆ
Resource SpecificationΒΆ
Kubernetes uses different resource syntax:
from clustrix import cluster
@cluster(
cores=2, # CPU cores (can be fractional: 0.5, 1.5)
memory="4Gi", # Memory in Kubernetes format
time="01:00:00", # Job timeout
namespace="compute", # Kubernetes namespace
image="python:3.11" # Custom Docker image
)
def k8s_computation():
"""Example computation on Kubernetes."""
import numpy as np
import time
print("Starting Kubernetes job...")
# CPU-intensive computation
size = 3000
matrix_a = np.random.rand(size, size)
matrix_b = np.random.rand(size, size)
start_time = time.time()
result = np.dot(matrix_a, matrix_b)
end_time = time.time()
return {
'computation_time': end_time - start_time,
'matrix_size': size,
'result_trace': float(np.trace(result)),
'result_frobenius_norm': float(np.linalg.norm(result, 'fro'))
}
# Execute on Kubernetes
result = k8s_computation()
print(f"Computation completed in {result['computation_time']:.2f} seconds")
Advanced ConfigurationΒΆ
Custom Docker ImagesΒΆ
For complex dependencies, use custom images:
# First, create a Dockerfile for your requirements
"""
# Dockerfile
FROM python:3.11-slim
RUN pip install numpy pandas scikit-learn matplotlib
RUN pip install torch torchvision # For ML workloads
WORKDIR /app
CMD ["python"]
"""
# Then configure Clustrix to use your image
configure(
cluster_type="kubernetes",
namespace="ml-compute",
docker_image="your-registry/clustrix-ml:latest"
)
@cluster(cores=4, memory="8Gi", image="your-registry/clustrix-ml:latest")
def ml_computation():
"""Machine learning computation with custom image."""
import torch
import numpy as np
# Check GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Create neural network
model = torch.nn.Sequential(
torch.nn.Linear(100, 50),
torch.nn.ReLU(),
torch.nn.Linear(50, 1)
).to(device)
# Generate synthetic data
X = torch.randn(1000, 100).to(device)
y = torch.randn(1000, 1).to(device)
# Simple training loop
optimizer = torch.optim.Adam(model.parameters())
loss_fn = torch.nn.MSELoss()
losses = []
for epoch in range(100):
optimizer.zero_grad()
predictions = model(X)
loss = loss_fn(predictions, y)
loss.backward()
optimizer.step()
losses.append(loss.item())
return {
'device': str(device),
'final_loss': losses[-1],
'training_losses': losses[::10], # Every 10th loss
'model_parameters': sum(p.numel() for p in model.parameters())
}
Resource Limits and RequestsΒΆ
Configure resource limits for better cluster utilization:
@cluster(
# Resource requests (guaranteed)
cores=1, # Guaranteed 1 CPU core
memory="2Gi", # Guaranteed 2GB RAM
# Resource limits (maximum)
cpu_limit=2, # Can burst up to 2 cores
memory_limit="4Gi", # Maximum 4GB RAM
# Additional Kubernetes settings
restart_policy="Never",
backoff_limit=3, # Retry failed jobs 3 times
active_deadline_seconds=3600 # Kill job after 1 hour
)
def resource_managed_task():
"""Task with detailed resource management."""
import psutil
import time
# Monitor resource usage
process = psutil.Process()
results = {
'cpu_count': psutil.cpu_count(),
'memory_total_gb': psutil.virtual_memory().total / (1024**3),
'measurements': []
}
# Simulate varying workload
for i in range(10):
# CPU-intensive phase
start_time = time.time()
sum(x**2 for x in range(100000))
end_time = time.time()
# Measure current usage
cpu_percent = process.cpu_percent()
memory_mb = process.memory_info().rss / (1024**2)
results['measurements'].append({
'step': i,
'cpu_percent': cpu_percent,
'memory_mb': memory_mb,
'duration_ms': (end_time - start_time) * 1000
})
time.sleep(1)
return results
Kubernetes-Native ExamplesΒΆ
Distributed Data ProcessingΒΆ
@cluster(cores=2, memory="4Gi", namespace="data-processing")
def process_data_partition(partition_id, total_partitions, data_size=10000):
"""Process a partition of a large dataset."""
import numpy as np
import json
print(f"Processing partition {partition_id}/{total_partitions}")
# Simulate loading partition data
np.random.seed(partition_id) # Ensure reproducible partitions
partition_size = data_size // total_partitions
# Generate partition data
data = np.random.rand(partition_size, 50)
labels = np.random.randint(0, 5, partition_size)
# Process partition
results = {
'partition_id': partition_id,
'partition_size': partition_size,
'feature_means': np.mean(data, axis=0).tolist(),
'feature_stds': np.std(data, axis=0).tolist(),
'label_distribution': {
str(label): int(count)
for label, count in zip(*np.unique(labels, return_counts=True))
}
}
return results
# Process data in parallel across multiple Kubernetes jobs
total_partitions = 8
partition_results = []
for partition_id in range(total_partitions):
result = process_data_partition(partition_id, total_partitions)
partition_results.append(result)
# Aggregate results
total_samples = sum(r['partition_size'] for r in partition_results)
print(f"Processed {total_samples} samples across {total_partitions} partitions")
# Compute global statistics
all_feature_means = np.array([r['feature_means'] for r in partition_results])
global_feature_means = np.mean(all_feature_means, axis=0)
print(f"Global feature means: {global_feature_means[:5]}") # Show first 5
Microservices-Style ComputingΒΆ
@cluster(cores=1, memory="2Gi", namespace="microservices")
def image_processing_service(image_id, operations):
"""Microservice for image processing."""
import numpy as np
import json
print(f"Processing image {image_id} with operations: {operations}")
# Simulate image (random pixels)
height, width = 512, 512
image = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
results = {
'image_id': image_id,
'original_shape': image.shape,
'operations_performed': []
}
# Apply operations
for operation in operations:
if operation == 'grayscale':
# Convert to grayscale
gray = np.dot(image[...,:3], [0.2989, 0.5870, 0.1140])
image = np.stack([gray, gray, gray], axis=-1).astype(np.uint8)
results['operations_performed'].append('grayscale')
elif operation == 'blur':
# Simple blur (average with neighbors)
from scipy import ndimage
for channel in range(3):
image[:,:,channel] = ndimage.uniform_filter(
image[:,:,channel].astype(float), size=3
).astype(np.uint8)
results['operations_performed'].append('blur')
elif operation == 'edge_detect':
# Simple edge detection
edges = np.abs(np.diff(image.astype(float), axis=0)).sum(axis=-1)
edges = np.pad(edges, ((0,1), (0,0)), mode='constant')
results['edge_strength'] = float(np.mean(edges))
results['operations_performed'].append('edge_detect')
# Compute final statistics
results['final_mean_intensity'] = float(np.mean(image))
results['final_std_intensity'] = float(np.std(image))
return results
# Process multiple images with different operations
image_tasks = [
{'id': 'img_001', 'ops': ['grayscale', 'blur']},
{'id': 'img_002', 'ops': ['edge_detect']},
{'id': 'img_003', 'ops': ['grayscale', 'edge_detect']},
{'id': 'img_004', 'ops': ['blur', 'edge_detect']}
]
results = []
for task in image_tasks:
result = image_processing_service(task['id'], task['ops'])
results.append(result)
# Summary
for r in results:
print(f"Image {r['image_id']}: {', '.join(r['operations_performed'])}")
Cloud-Native Best PracticesΒΆ
Auto-scaling ConfigurationΒΆ
# Configure for auto-scaling environments
configure(
cluster_type="kubernetes",
namespace="auto-scale",
# Resource settings that work well with auto-scaling
default_cores=1, # Start small
default_memory="2Gi", # Conservative memory
# Job settings
active_deadline_seconds=1800, # 30 minute timeout
backoff_limit=2, # Limited retries
restart_policy="Never" # Don't restart failed jobs
)
@cluster(cores=0.5, memory="1Gi") # Fractional cores for efficiency
def lightweight_task(task_id):
"""Lightweight task suitable for auto-scaling."""
import time
import random
# Variable processing time
processing_time = random.uniform(10, 60) # 10-60 seconds
print(f"Task {task_id} starting (estimated {processing_time:.1f}s)")
# Simulate work
start_time = time.time()
time.sleep(processing_time)
end_time = time.time()
return {
'task_id': task_id,
'estimated_time': processing_time,
'actual_time': end_time - start_time,
'efficiency': processing_time / (end_time - start_time)
}
Fault ToleranceΒΆ
@cluster(cores=2, memory="4Gi", backoff_limit=3)
def fault_tolerant_computation(data_chunk_id, retry_count=0):
"""Computation with built-in fault tolerance."""
import random
import time
import numpy as np
print(f"Processing chunk {data_chunk_id} (attempt {retry_count + 1})")
# Simulate random failures (20% chance)
if random.random() < 0.2 and retry_count < 2:
raise RuntimeError(f"Simulated failure in chunk {data_chunk_id}")
# Simulate computation
chunk_size = 1000
data = np.random.rand(chunk_size, 100)
# Add checkpointing for long computations
checkpoint_interval = 200
results = []
for i in range(0, chunk_size, checkpoint_interval):
end_idx = min(i + checkpoint_interval, chunk_size)
batch = data[i:end_idx]
# Process batch
batch_result = np.mean(batch, axis=0)
results.append(batch_result)
print(f"Checkpoint: processed {end_idx}/{chunk_size} samples")
time.sleep(0.1) # Small delay
# Combine results
final_result = np.mean(results, axis=0)
return {
'chunk_id': data_chunk_id,
'chunk_size': chunk_size,
'checkpoints': len(results),
'result_mean': float(np.mean(final_result)),
'result_std': float(np.std(final_result)),
'retry_count': retry_count
}
# Process multiple chunks with fault tolerance
chunk_ids = range(10)
successful_results = []
for chunk_id in chunk_ids:
try:
result = fault_tolerant_computation(chunk_id)
successful_results.append(result)
print(f"β Chunk {chunk_id} completed successfully")
except Exception as e:
print(f"β Chunk {chunk_id} failed after retries: {e}")
print(f"Successfully processed {len(successful_results)}/{len(chunk_ids)} chunks")
Monitoring and LoggingΒΆ
Kubernetes Job MonitoringΒΆ
@cluster(cores=2, memory="4Gi")
def monitored_computation():
"""Computation with comprehensive monitoring."""
import time
import psutil
import logging
import json
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Monitoring data
monitor_data = {
'start_time': time.time(),
'resource_snapshots': [],
'milestones': []
}
def log_resources(milestone):
"""Log current resource usage."""
snapshot = {
'timestamp': time.time(),
'milestone': milestone,
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_mb': psutil.virtual_memory().used / (1024**2),
'memory_percent': psutil.virtual_memory().percent
}
monitor_data['resource_snapshots'].append(snapshot)
logger.info(f"Milestone '{milestone}': CPU {snapshot['cpu_percent']:.1f}%, "
f"Memory {snapshot['memory_mb']:.1f}MB")
try:
log_resources("computation_start")
# Phase 1: Data preparation
import numpy as np
data = np.random.rand(5000, 1000)
monitor_data['milestones'].append("data_prepared")
log_resources("data_preparation_complete")
# Phase 2: Computation
result = np.linalg.svd(data, compute_uv=False)
monitor_data['milestones'].append("computation_complete")
log_resources("computation_complete")
# Phase 3: Analysis
analysis = {
'singular_values_count': len(result),
'max_singular_value': float(np.max(result)),
'min_singular_value': float(np.min(result)),
'condition_number': float(np.max(result) / np.min(result))
}
monitor_data['milestones'].append("analysis_complete")
log_resources("analysis_complete")
monitor_data['end_time'] = time.time()
monitor_data['total_duration'] = monitor_data['end_time'] - monitor_data['start_time']
return {
'analysis_results': analysis,
'monitoring_data': monitor_data,
'success': True
}
except Exception as e:
logger.error(f"Computation failed: {e}")
monitor_data['error'] = str(e)
monitor_data['end_time'] = time.time()
raise
Complete Kubernetes ExampleΒΆ
Distributed Machine LearningΒΆ
from clustrix import configure, cluster
import numpy as np
# Configure for ML workloads
configure(
cluster_type="kubernetes",
namespace="ml-compute",
docker_image="python:3.11-slim",
# Default resources for ML tasks
default_cores=2,
default_memory="4Gi",
active_deadline_seconds=3600, # 1 hour limit
backoff_limit=1 # Single retry
)
@cluster(cores=4, memory="8Gi", cpu_limit=6, memory_limit="12Gi")
def distributed_training_worker(worker_id, total_workers, epochs=100):
"""Distributed training worker for machine learning."""
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
import time
import json
print(f"Worker {worker_id}/{total_workers} starting training...")
# Generate worker-specific dataset
np.random.seed(worker_id) # Ensure different data per worker
X, y = make_classification(
n_samples=10000,
n_features=50,
n_informative=30,
n_redundant=10,
n_classes=5,
random_state=worker_id
)
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=worker_id
)
print(f"Worker {worker_id}: Dataset prepared ({len(X_train)} training samples)")
# Train model
start_time = time.time()
model = RandomForestClassifier(
n_estimators=epochs,
max_depth=10,
random_state=worker_id,
n_jobs=-1 # Use all available cores
)
model.fit(X_train, y_train)
training_time = time.time() - start_time
# Evaluate model
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
# Feature importance
feature_importance = model.feature_importances_
top_features = np.argsort(feature_importance)[-10:] # Top 10 features
results = {
'worker_id': worker_id,
'total_workers': total_workers,
'training_samples': len(X_train),
'test_samples': len(X_test),
'training_time_seconds': training_time,
'accuracy': float(accuracy),
'top_feature_indices': top_features.tolist(),
'top_feature_importance': feature_importance[top_features].tolist(),
'model_parameters': {
'n_estimators': epochs,
'max_depth': 10
}
}
print(f"Worker {worker_id} completed: accuracy = {accuracy:.4f}")
return results
# Run distributed training
total_workers = 6
print(f"Starting distributed training with {total_workers} workers...")
worker_results = []
for worker_id in range(total_workers):
result = distributed_training_worker(worker_id, total_workers, epochs=150)
worker_results.append(result)
# Aggregate results
accuracies = [r['accuracy'] for r in worker_results]
training_times = [r['training_time_seconds'] for r in worker_results]
print("\nDistributed Training Results:")
print(f"Average accuracy: {np.mean(accuracies):.4f} Β± {np.std(accuracies):.4f}")
print(f"Average training time: {np.mean(training_times):.2f}s Β± {np.std(training_times):.2f}s")
print(f"Total training samples: {sum(r['training_samples'] for r in worker_results)}")
# Find best performing worker
best_worker = max(worker_results, key=lambda x: x['accuracy'])
print(f"Best worker: {best_worker['worker_id']} (accuracy: {best_worker['accuracy']:.4f})")
This tutorial demonstrates the cloud-native capabilities of Clustrix with Kubernetes, showcasing containerized distributed computing, auto-scaling, fault tolerance, and comprehensive monitoring for modern cloud environments.