Complete Clustrix API DemonstrationΒΆ
This notebook provides a comprehensive demonstration of all Clustrix user-facing functions and features. It serves as both a tutorial and a reference for the complete API.
Table of ContentsΒΆ
Installation and SetupΒΆ
First, letβs install and import Clustrix:
[ ]:
# Install Clustrix (uncomment if needed)
# !pip install clustrix
# !pip install clustrix[kubernetes] # With Kubernetes support
# Import all Clustrix components
import clustrix
from clustrix import cluster, configure, get_config
from clustrix.config import ClusterConfig
from clustrix.executor import ClusterExecutor
from clustrix.local_executor import LocalExecutor
# Standard libraries for examples
import numpy as np
import time
import os
from datetime import datetime
print(f"Clustrix version: {clustrix.__version__ if hasattr(clustrix, '__version__') else 'development'}")
print(f"Import successful at {datetime.now()}")
Configuration FunctionsΒΆ
1. Basic ConfigurationΒΆ
The configure() function is the primary way to set up Clustrix:
[ ]:
# Basic local configuration
clustrix.configure(
cluster_type="local", # Use local execution
default_cores=4, # Default number of cores
default_memory="8GB", # Default memory allocation
auto_parallel=True, # Enable automatic parallelization
max_parallel_jobs=10 # Maximum concurrent jobs
)
print("β Basic local configuration set")
# Get current configuration
config = clustrix.get_config()
print(f"Current cluster type: {config.cluster_type}")
print(f"Default cores: {config.default_cores}")
print(f"Default memory: {config.default_memory}")
print(f"Auto parallel: {config.auto_parallel}")
2. All Configuration OptionsΒΆ
Comprehensive configuration with all available options:
[ ]:
def demonstrate_all_config_options():
"""
Demonstrate all available configuration options for different cluster types.
"""
configurations = {
'local': {
'cluster_type': 'local',
'default_cores': 4,
'default_memory': '8GB',
'auto_parallel': True,
'max_parallel_jobs': 8,
'cleanup_on_success': True
},
'slurm': {
'cluster_type': 'slurm',
'cluster_host': 'slurm-cluster.university.edu',
'username': 'researcher',
'key_file': '~/.ssh/id_rsa',
'port': 22,
'default_cores': 8,
'default_memory': '32GB',
'default_time': '02:00:00',
'default_partition': 'normal',
'default_account': 'research_group',
'default_qos': 'normal',
'remote_work_dir': '/scratch/researcher/clustrix',
'module_loads': ['python/3.9', 'gcc/9.3.0'],
'conda_env_name': 'myproject',
'cleanup_on_success': True,
'max_parallel_jobs': 20
},
'pbs': {
'cluster_type': 'pbs',
'cluster_host': 'pbs-cluster.org',
'username': 'scientist',
'key_file': '~/.ssh/pbs_key',
'default_cores': 6,
'default_memory': '24GB',
'default_time': '04:00:00',
'default_queue': 'bioqueue',
'remote_work_dir': '/home/scientist/clustrix',
'walltime': '04:00:00', # PBS-specific
'features': 'infiniband',
'cleanup_on_success': True
},
'sge': {
'cluster_type': 'sge',
'cluster_host': 'sge-cluster.example.com',
'username': 'engineer',
'key_file': '~/.ssh/sge_key',
'default_cores': 12,
'default_memory': '48GB',
'default_time': '06:00:00',
'default_queue': 'all.q',
'pe': 'smp', # SGE parallel environment
'remote_work_dir': '/home/engineer/clustrix'
},
'kubernetes': {
'cluster_type': 'kubernetes',
'k8s_namespace': 'default',
'k8s_config_file': '~/.kube/config',
'default_cores': 4,
'default_memory': '8Gi',
'default_cpu_limit': 6,
'default_memory_limit': '12Gi',
'container_image': 'python:3.11-slim',
'image_pull_policy': 'IfNotPresent',
'job_ttl_seconds': 3600,
'backoff_limit': 3,
'restart_policy': 'OnFailure'
},
'ssh': {
'cluster_type': 'ssh',
'cluster_host': 'remote-server.example.com',
'username': 'developer',
'key_file': '~/.ssh/dev_key',
'port': 22,
'remote_work_dir': '/home/developer/clustrix',
'python_executable': 'python3',
'virtualenv_path': '/home/developer/venv/myproject',
'cleanup_on_success': True,
'max_parallel_jobs': 5
}
}
print("Configuration Options for All Cluster Types:")
print("=" * 50)
for cluster_type, config_options in configurations.items():
print(f"\n{cluster_type.upper()} Configuration:")
for key, value in config_options.items():
print(f" {key}: {value}")
return configurations
# Display all configuration options
all_configs = demonstrate_all_config_options()
3. Configuration from FileΒΆ
Load configuration from YAML files:
[ ]:
import tempfile
import yaml
import os
# Create a sample configuration file
sample_config = {
'cluster_type': 'local',
'default_cores': 6,
'default_memory': '16GB',
'auto_parallel': True,
'max_parallel_jobs': 12,
'cleanup_on_success': True,
'environment_variables': {
'OMP_NUM_THREADS': '6',
'PYTHONPATH': '/custom/path'
}
}
# Write to temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f:
yaml.dump(sample_config, f)
config_file = f.name
print(f"Created configuration file: {config_file}")
# Load configuration from file
config = ClusterConfig.from_file(config_file)
print(f"\nLoaded configuration:")
print(f" Cluster type: {config.cluster_type}")
print(f" Cores: {config.default_cores}")
print(f" Memory: {config.default_memory}")
print(f" Max parallel jobs: {config.max_parallel_jobs}")
# Apply the configuration
clustrix.configure(**config.__dict__)
# Cleanup
os.unlink(config_file)
print("\nβ Configuration loaded from file and applied")
Cluster DecoratorΒΆ
1. Basic Decorator UsageΒΆ
The @cluster decorator is the main interface for distributed execution:
[ ]:
# Basic decorator usage
@cluster
def simple_function(x, y):
"""A simple function with default cluster settings."""
import time
time.sleep(0.1) # Simulate some work
return x + y
result = simple_function(5, 10)
print(f"Simple function result: {result}")
# Decorator with resource specification
@cluster(cores=4, memory="8GB")
def resource_specific_function(data_size):
"""Function with specific resource requirements."""
import numpy as np
data = np.random.random(data_size)
return {
'mean': np.mean(data),
'std': np.std(data),
'size': len(data)
}
stats = resource_specific_function(100000)
print(f"Resource-specific function result: mean={stats['mean']:.4f}, std={stats['std']:.4f}")
2. All Decorator ParametersΒΆ
Comprehensive demonstration of all decorator parameters:
[ ]:
def demonstrate_decorator_parameters():
"""
Show all available parameters for the @cluster decorator.
"""
# Basic resource parameters
@cluster(
cores=8, # Number of CPU cores
memory="32GB", # Memory allocation
time="02:00:00", # Time limit (HH:MM:SS)
parallel=True # Enable automatic parallelization
)
def basic_resources_demo(n):
"""Basic resource specification."""
return sum(i**2 for i in range(n))
# Scheduler-specific parameters
@cluster(
cores=16,
memory="64GB",
time="04:00:00",
# SLURM-specific
partition="gpu", # SLURM partition
account="research_group", # SLURM account
qos="high", # Quality of Service
gres="gpu:2", # Generic resources (GPUs)
constraint="haswell", # Node constraints
array="1-10", # Job array specification
# PBS-specific
queue="bioqueue", # PBS queue
walltime="04:00:00", # PBS walltime
features="infiniband", # PBS features
# SGE-specific
pe="smp 16", # SGE parallel environment
sge_array="1-20" # SGE task array
)
def scheduler_specific_demo(data):
"""Scheduler-specific parameter demonstration."""
import numpy as np
return np.mean(data)
# Kubernetes-specific parameters
@cluster(
cores=4,
memory="16Gi", # Kubernetes memory format
cpu_limit=6, # CPU limit (can exceed cores)
memory_limit="24Gi", # Memory limit
container_image="python:3.11", # Container image
job_name="custom-job", # Kubernetes job name
parallelism=3, # Parallel pod execution
completions=10, # Total completions needed
backoff_limit=3, # Retry limit on failure
restart_policy="OnFailure", # Pod restart policy
job_ttl_seconds=7200, # Job cleanup time
active_deadline_seconds=3600 # Maximum job runtime
)
def kubernetes_demo(task_id):
"""Kubernetes-specific parameter demonstration."""
import os
import time
time.sleep(1)
return {
'task_id': task_id,
'pod_name': os.environ.get('HOSTNAME', 'unknown'),
'completion_time': time.time()
}
# Environment and execution parameters
@cluster(
cores=4,
memory="16GB",
environment={'OMP_NUM_THREADS': '4', 'CUDA_VISIBLE_DEVICES': '0'},
conda_env="myproject", # Conda environment
virtualenv_path="/path/to/venv", # Virtual environment
python_executable="python3", # Python command
working_directory="/tmp", # Working directory
cleanup_files=True, # Cleanup temporary files
timeout=3600 # Execution timeout
)
def environment_demo(message):
"""Environment configuration demonstration."""
import os
return {
'message': message,
'omp_threads': os.environ.get('OMP_NUM_THREADS', 'not_set'),
'cuda_devices': os.environ.get('CUDA_VISIBLE_DEVICES', 'not_set'),
'working_dir': os.getcwd()
}
print("Decorator Parameter Demonstrations:")
print("=" * 40)
# Run basic resources demo
print("\n1. Basic Resources Demo:")
result1 = basic_resources_demo(1000)
print(f" Sum of squares: {result1:,}")
# Run environment demo
print("\n2. Environment Demo:")
result2 = environment_demo("Hello from cluster!")
print(f" Message: {result2['message']}")
print(f" OMP threads: {result2['omp_threads']}")
print(f" Working dir: {result2['working_dir']}")
return {
'basic_resources': basic_resources_demo,
'scheduler_specific': scheduler_specific_demo,
'kubernetes': kubernetes_demo,
'environment': environment_demo
}
# Demonstrate all decorator parameters
decorator_functions = demonstrate_decorator_parameters()
3. Automatic ParallelizationΒΆ
Clustrix can automatically parallelize loops:
[ ]:
# Sequential execution (default)
@cluster(cores=4, parallel=False)
def sequential_processing(items):
"""Process items sequentially."""
import time
results = []
for item in items:
time.sleep(0.01) # Simulate work
results.append(item ** 2)
return results
# Parallel execution
@cluster(cores=4, parallel=True)
def parallel_processing(items):
"""Process items in parallel."""
import time
results = []
for item in items: # This loop will be parallelized
time.sleep(0.01) # Simulate work
results.append(item ** 2)
return results
# Test data
test_items = list(range(20))
# Time sequential execution
start = time.time()
seq_result = sequential_processing(test_items)
seq_time = time.time() - start
# Time parallel execution
start = time.time()
par_result = parallel_processing(test_items)
par_time = time.time() - start
print(f"Sequential execution: {seq_time:.3f} seconds")
print(f"Parallel execution: {par_time:.3f} seconds")
print(f"Speedup: {seq_time/par_time:.2f}x")
print(f"Results match: {seq_result == par_result}")
print(f"Sample results: {seq_result[:5]}")
Local ExecutionΒΆ
1. Local Executor Direct UsageΒΆ
Use the LocalExecutor directly for fine-grained control:
[ ]:
from clustrix.local_executor import LocalExecutor
# Create local executor
local_config = ClusterConfig(
cluster_type="local",
default_cores=4,
auto_parallel=True
)
executor = LocalExecutor(local_config)
# Define a function to execute
def compute_statistics(data):
"""Compute basic statistics on data."""
import numpy as np
return {
'mean': np.mean(data),
'std': np.std(data),
'median': np.median(data),
'min': np.min(data),
'max': np.max(data)
}
# Execute function with local executor
test_data = np.random.normal(100, 15, 10000)
result = executor.execute_function(compute_statistics, (test_data,), {})
print("Local Executor Results:")
for key, value in result.items():
print(f" {key}: {value:.4f}")
# Test parallel loop execution
def parallel_computation(n_iterations):
"""Function with parallelizable loop."""
import numpy as np
results = []
for i in range(n_iterations):
# Simulate CPU-intensive work
data = np.random.random(1000)
result = np.sum(data ** 2)
results.append(result)
return np.mean(results)
# Execute with automatic parallelization
start_time = time.time()
parallel_result = executor.execute_loop_parallel(
parallel_computation,
'i',
range(100), # Will be chunked across cores
cores=4
)
execution_time = time.time() - start_time
print(f"\nParallel loop execution:")
print(f" Result: {parallel_result:.6f}")
print(f" Execution time: {execution_time:.3f} seconds")
2. CPU vs I/O DetectionΒΆ
Clustrix automatically chooses between multiprocessing and threading:
[ ]:
from clustrix.local_executor import choose_executor_type
# CPU-intensive function
def cpu_intensive_task(n):
"""CPU-bound computation."""
total = 0
for i in range(n):
total += i ** 0.5
return total
# I/O-intensive function
def io_intensive_task(filename):
"""I/O-bound operation."""
import time
time.sleep(0.1) # Simulate I/O wait
with open(filename, 'w') as f:
f.write("test data")
return f"File {filename} written"
# Function with network I/O patterns
def network_task(url):
"""Network request simulation."""
import urllib.request
import time
time.sleep(0.05) # Simulate network latency
return f"Fetched {url}"
# Test executor type selection
test_cases = [
(cpu_intensive_task, (10000,), {}),
(io_intensive_task, ("/tmp/test.txt",), {}),
(network_task, ("http://example.com",), {})
]
print("Executor Type Selection:")
print("=" * 30)
for func, args, kwargs in test_cases:
use_threads = choose_executor_type(func, args, kwargs)
executor_type = "ThreadPoolExecutor" if use_threads else "ProcessPoolExecutor"
task_type = "I/O-bound" if use_threads else "CPU-bound"
print(f"Function: {func.__name__}")
print(f" Detected as: {task_type}")
print(f" Will use: {executor_type}")
print()
Remote Cluster ExecutionΒΆ
1. Cluster Executor Direct UsageΒΆ
Use ClusterExecutor for direct cluster operations:
[ ]:
# Note: This section demonstrates the API but won't actually connect to remote clusters
# in this demo notebook
def demonstrate_cluster_executor_api():
"""
Demonstrate the ClusterExecutor API without actually connecting.
"""
# Example configurations for different cluster types
cluster_configs = {
'slurm': ClusterConfig(
cluster_type="slurm",
cluster_host="slurm-cluster.edu",
username="researcher",
key_file="~/.ssh/id_rsa",
default_partition="normal"
),
'pbs': ClusterConfig(
cluster_type="pbs",
cluster_host="pbs-cluster.org",
username="scientist",
default_queue="bioqueue"
),
'kubernetes': ClusterConfig(
cluster_type="kubernetes",
k8s_namespace="default",
container_image="python:3.11-slim"
)
}
print("Cluster Executor API Demonstration:")
print("=" * 40)
for cluster_type, config in cluster_configs.items():
print(f"\n{cluster_type.upper()} Executor:")
# Create executor (but don't connect)
executor = ClusterExecutor(config)
print(f" Cluster type: {executor.config.cluster_type}")
print(f" Config object: {type(executor.config).__name__}")
# Show available methods
methods = [method for method in dir(executor)
if not method.startswith('_') and callable(getattr(executor, method))]
print(f" Available methods: {', '.join(methods[:5])}...")
# Example of what cluster execution would look like
print("\nExample cluster execution pattern:")
print("""
# 1. Create and configure executor
executor = ClusterExecutor(config)
# 2. Connect to cluster
executor.connect()
# 3. Submit job
job_id = executor.submit_job(function, args, kwargs, job_config)
# 4. Monitor job status
status = executor.get_job_status(job_id)
# 5. Retrieve results
result = executor.get_result(job_id)
# 6. Cleanup
executor.cleanup_job(job_id)
executor.disconnect()
""")
return cluster_configs
# Demonstrate the API
cluster_configs = demonstrate_cluster_executor_api()
2. Job Management FunctionsΒΆ
Functions for managing cluster jobs:
[ ]:
def demonstrate_job_management():
"""
Demonstrate job management functions and patterns.
"""
print("Job Management Functions:")
print("=" * 30)
# Job submission patterns
job_patterns = {
'single_job': {
'description': 'Submit single job with specific resources',
'example': '''
@cluster(cores=8, memory="32GB", time="02:00:00")
def my_computation(data):
return process_data(data)
'''
},
'job_array': {
'description': 'Submit job array for parameter sweeps',
'example': '''
@cluster(cores=4, memory="16GB", array="1-100")
def parameter_sweep(base_params):
task_id = int(os.environ.get('SLURM_ARRAY_TASK_ID', '1'))
params = modify_params(base_params, task_id)
return run_simulation(params)
'''
},
'parallel_jobs': {
'description': 'Submit multiple independent jobs',
'example': '''
@cluster(cores=4, memory="16GB", parallel=True)
def parallel_analysis(datasets):
results = []
for dataset in datasets: # Each iteration becomes separate job
results.append(analyze_dataset(dataset))
return results
'''
},
'dependent_jobs': {
'description': 'Chain jobs with dependencies',
'example': '''
# Job 1: Data preprocessing
@cluster(cores=4, memory="16GB")
def preprocess_data(raw_data):
return clean_and_transform(raw_data)
# Job 2: Analysis (depends on Job 1)
@cluster(cores=8, memory="32GB", dependency="afterok:$JOB1_ID")
def analyze_processed_data(processed_data):
return run_analysis(processed_data)
'''
}
}
for pattern_name, pattern_info in job_patterns.items():
print(f"\n{pattern_name.upper().replace('_', ' ')}:")
print(f" Description: {pattern_info['description']}")
print(f" Example:{pattern_info['example']}")
# Job monitoring functions
print("\n" + "=" * 30)
print("Job Monitoring Functions:")
monitoring_functions = {
'get_job_status()': 'Check current status of submitted job',
'list_active_jobs()': 'List all active jobs for user',
'get_job_info()': 'Get detailed information about specific job',
'cancel_job()': 'Cancel running or queued job',
'get_job_output()': 'Retrieve stdout/stderr from completed job',
'get_job_resources()': 'Get resource usage statistics',
'estimate_queue_time()': 'Estimate queue wait time for job'
}
for func_name, description in monitoring_functions.items():
print(f" {func_name:20} - {description}")
return job_patterns
# Demonstrate job management
job_patterns = demonstrate_job_management()
Advanced FeaturesΒΆ
1. Custom SerializationΒΆ
Handle complex objects and custom serialization:
[ ]:
import pickle
import cloudpickle
import dill
class CustomClass:
"""A custom class to test serialization."""
def __init__(self, name, data):
self.name = name
self.data = data
def process(self):
return f"Processed {self.name} with {len(self.data)} items"
def __repr__(self):
return f"CustomClass(name='{self.name}', data_length={len(self.data)})"
# Test serialization with different libraries
@cluster(cores=2)
def test_serialization(custom_obj, serializer_name):
"""Test custom object serialization."""
result = custom_obj.process()
return {
'serializer': serializer_name,
'object_name': custom_obj.name,
'result': result,
'data_length': len(custom_obj.data)
}
# Create test object
test_obj = CustomClass("test_object", list(range(1000)))
# Test with different serializers
serializers = ['cloudpickle', 'dill', 'pickle']
print("Serialization Testing:")
print("=" * 25)
for serializer in serializers:
try:
result = test_serialization(test_obj, serializer)
print(f"\n{serializer.upper()}:")
print(f" β Serialization successful")
print(f" Object: {result['object_name']}")
print(f" Result: {result['result']}")
except Exception as e:
print(f"\n{serializer.upper()}:")
print(f" β Serialization failed: {e}")
# Test lambda function serialization
@cluster(cores=2)
def test_lambda_serialization(data, transform_func):
"""Test lambda function serialization."""
transformed = [transform_func(x) for x in data]
return {
'original_data': data,
'transformed_data': transformed,
'function_type': str(type(transform_func))
}
# Test with lambda
test_data = [1, 2, 3, 4, 5]
lambda_func = lambda x: x ** 2
try:
lambda_result = test_lambda_serialization(test_data, lambda_func)
print(f"\nLAMBDA FUNCTION SERIALIZATION:")
print(f" β Success")
print(f" Original: {lambda_result['original_data']}")
print(f" Transformed: {lambda_result['transformed_data']}")
except Exception as e:
print(f"\nLAMBDA FUNCTION SERIALIZATION:")
print(f" β Failed: {e}")
2. Environment ManagementΒΆ
Manage remote environments and dependencies:
[ ]:
def demonstrate_environment_management():
"""
Demonstrate environment management features.
"""
print("Environment Management Features:")
print("=" * 35)
# Environment configuration options
env_configs = {
'conda_environment': {
'description': 'Use conda environment on remote cluster',
'config': {
'conda_env_name': 'myproject',
'conda_path': '/opt/conda/bin/conda'
},
'usage': '''
@cluster(cores=4, conda_env="myproject")
def ml_computation(data):
import tensorflow as tf # Available in conda env
return train_model(data)
'''
},
'virtual_environment': {
'description': 'Use Python virtual environment',
'config': {
'virtualenv_path': '/home/user/venv/myproject',
'python_executable': 'python3'
},
'usage': '''
configure(
cluster_type="ssh",
virtualenv_path="/home/user/venv/myproject"
)
'''
},
'module_loading': {
'description': 'Load environment modules (HPC clusters)',
'config': {
'module_loads': ['python/3.9', 'gcc/9.3.0', 'openmpi/4.1']
},
'usage': '''
configure(
cluster_type="slurm",
module_loads=["python/3.9", "gcc/9.3.0"]
)
'''
},
'environment_variables': {
'description': 'Set custom environment variables',
'config': {
'environment_variables': {
'OMP_NUM_THREADS': '8',
'CUDA_VISIBLE_DEVICES': '0,1',
'PYTHONPATH': '/custom/path'
}
},
'usage': '''
@cluster(
cores=8,
environment={
'OMP_NUM_THREADS': '8',
'CUDA_VISIBLE_DEVICES': '0,1'
}
)
def gpu_computation(data):
return process_on_gpu(data)
'''
},
'dependency_management': {
'description': 'Automatic dependency installation',
'config': {
'pip_requirements': ['numpy>=1.20', 'scipy>=1.7', 'scikit-learn'],
'conda_packages': ['tensorflow', 'pytorch']
},
'usage': '''
# Clustrix automatically captures local environment
# and recreates it on remote cluster using pip freeze
@cluster(cores=4)
def analysis_with_deps(data):
import pandas as pd # Will be installed if missing
import sklearn # Will be installed if missing
return analyze_data(data)
'''
}
}
for env_type, env_info in env_configs.items():
print(f"\n{env_type.upper().replace('_', ' ')}:")
print(f" Description: {env_info['description']}")
print(f" Configuration:")
for key, value in env_info['config'].items():
print(f" {key}: {value}")
print(f" Usage example:{env_info['usage']}")
return env_configs
# Demonstrate environment management
env_configs = demonstrate_environment_management()
3. Error Handling and RecoveryΒΆ
Robust error handling and recovery mechanisms:
[ ]:
import random
# Function that may fail randomly
@cluster(cores=2)
def unreliable_computation(data, failure_rate=0.3):
"""A computation that may fail randomly."""
import random
import time
# Simulate random failures
if random.random() < failure_rate:
raise RuntimeError(f"Simulated failure during computation")
# Simulate work
time.sleep(0.1)
result = sum(x**2 for x in data)
return result
# Function with retry logic
@cluster(cores=2)
def computation_with_retry(data, max_retries=3):
"""Computation with built-in retry logic."""
import random
import time
for attempt in range(max_retries + 1):
try:
# Simulate potential failure
if random.random() < 0.4 and attempt < max_retries:
raise RuntimeError(f"Attempt {attempt + 1} failed")
# Actual computation
time.sleep(0.05)
result = sum(x**3 for x in data)
return {
'result': result,
'attempts': attempt + 1,
'success': True
}
except Exception as e:
if attempt == max_retries:
return {
'result': None,
'attempts': attempt + 1,
'success': False,
'error': str(e)
}
time.sleep(0.1 * (attempt + 1)) # Exponential backoff
# Function with graceful degradation
@cluster(cores=2)
def robust_computation(data, fallback_method=True):
"""Computation with fallback method."""
import numpy as np
try:
# Primary method (may fail)
if len(data) > 1000: # Simulate failure condition
raise MemoryError("Not enough memory for primary method")
# Primary computation
result = np.fft.fft(data).real
return {
'result': np.mean(result),
'method': 'primary_fft',
'success': True
}
except Exception as e:
if fallback_method:
# Fallback method
result = np.mean(data) # Simple fallback
return {
'result': result,
'method': 'fallback_mean',
'success': True,
'warning': f"Used fallback due to: {str(e)}"
}
else:
raise
print("Error Handling and Recovery:")
print("=" * 30)
# Test unreliable computation
test_data = list(range(50))
successes = 0
failures = 0
print("\n1. Testing Unreliable Computation:")
for i in range(10):
try:
result = unreliable_computation(test_data, failure_rate=0.3)
successes += 1
except Exception as e:
failures += 1
print(f" Successes: {successes}/10")
print(f" Failures: {failures}/10")
# Test computation with retry
print("\n2. Testing Computation with Retry:")
retry_results = []
for i in range(5):
result = computation_with_retry(test_data, max_retries=3)
retry_results.append(result)
status = "β" if result['success'] else "β"
print(f" {status} Attempt {i+1}: {result['attempts']} tries, Success: {result['success']}")
# Test robust computation with fallback
print("\n3. Testing Robust Computation:")
# Small data (should use primary method)
small_data = list(range(100))
small_result = robust_computation(small_data)
print(f" Small data: {small_result['method']}, Result: {small_result['result']:.4f}")
# Large data (should use fallback)
large_data = list(range(2000))
large_result = robust_computation(large_data)
print(f" Large data: {large_result['method']}, Result: {large_result['result']:.4f}")
if 'warning' in large_result:
print(f" Warning: {large_result['warning']}")
Monitoring and DebuggingΒΆ
1. Performance MonitoringΒΆ
Monitor execution performance and resource usage:
[ ]:
import psutil
import threading
import time
from datetime import datetime
class PerformanceMonitor:
"""Monitor performance during function execution."""
def __init__(self, interval=0.1):
self.interval = interval
self.monitoring = False
self.metrics = []
def start_monitoring(self):
"""Start performance monitoring."""
self.monitoring = True
self.metrics = []
def monitor():
while self.monitoring:
try:
cpu_percent = psutil.cpu_percent()
memory = psutil.virtual_memory()
self.metrics.append({
'timestamp': time.time(),
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'memory_used_gb': memory.used / (1024**3)
})
except:
pass # Skip if monitoring fails
time.sleep(self.interval)
self.monitor_thread = threading.Thread(target=monitor, daemon=True)
self.monitor_thread.start()
def stop_monitoring(self):
"""Stop performance monitoring."""
self.monitoring = False
if hasattr(self, 'monitor_thread'):
self.monitor_thread.join(timeout=1.0)
def get_summary(self):
"""Get performance summary."""
if not self.metrics:
return {'error': 'No metrics collected'}
cpu_values = [m['cpu_percent'] for m in self.metrics]
memory_values = [m['memory_percent'] for m in self.metrics]
return {
'duration_seconds': self.metrics[-1]['timestamp'] - self.metrics[0]['timestamp'],
'samples_collected': len(self.metrics),
'cpu_usage': {
'mean': np.mean(cpu_values),
'max': np.max(cpu_values),
'min': np.min(cpu_values),
'std': np.std(cpu_values)
},
'memory_usage': {
'mean': np.mean(memory_values),
'max': np.max(memory_values),
'min': np.min(memory_values),
'peak_gb': np.max([m['memory_used_gb'] for m in self.metrics])
}
}
# Monitored computation function
@cluster(cores=4)
def monitored_computation(size, complexity="medium"):
"""A computation that can be monitored for performance."""
import numpy as np
import time
# Different complexity levels
if complexity == "low":
data = np.random.random(size)
result = np.sum(data)
elif complexity == "medium":
data = np.random.random((size, 10))
result = np.sum(np.dot(data, data.T))
else: # high
data = np.random.random((size, size//10))
for _ in range(3):
data = np.dot(data, data.T[:data.shape[1], :])
result = np.sum(data)
return {
'result': float(result),
'size': size,
'complexity': complexity
}
print("Performance Monitoring:")
print("=" * 25)
# Test different complexity levels
test_cases = [
(1000, "low"),
(500, "medium"),
(100, "high")
]
for size, complexity in test_cases:
print(f"\nTesting {complexity} complexity (size={size}):")
# Start monitoring
monitor = PerformanceMonitor(interval=0.05)
monitor.start_monitoring()
# Run computation
start_time = time.time()
result = monitored_computation(size, complexity)
end_time = time.time()
# Stop monitoring
monitor.stop_monitoring()
# Get results
perf_summary = monitor.get_summary()
execution_time = end_time - start_time
print(f" Execution time: {execution_time:.3f} seconds")
print(f" Result: {result['result']:.2e}")
if 'error' not in perf_summary:
print(f" CPU usage: {perf_summary['cpu_usage']['mean']:.1f}% avg, {perf_summary['cpu_usage']['max']:.1f}% max")
print(f" Memory usage: {perf_summary['memory_usage']['mean']:.1f}% avg, {perf_summary['memory_usage']['peak_gb']:.2f} GB peak")
print(f" Samples collected: {perf_summary['samples_collected']}")
2. Debugging UtilitiesΒΆ
Utilities for debugging distributed computations:
[ ]:
import sys
import traceback
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Function with debug information
@cluster(cores=2)
def debug_computation(data, debug_level="info"):
"""Computation with extensive debugging information."""
import sys
import os
import platform
import time
from datetime import datetime
debug_info = {
'execution_start': datetime.now().isoformat(),
'python_version': sys.version,
'platform': platform.platform(),
'working_directory': os.getcwd(),
'process_id': os.getpid(),
'environment_vars': dict(os.environ),
'input_data_type': str(type(data)),
'input_data_length': len(data) if hasattr(data, '__len__') else 'unknown'
}
try:
# Simulate computation with progress tracking
if debug_level == "verbose":
print(f"Starting computation at {debug_info['execution_start']}")
print(f"Input data: {debug_info['input_data_type']} with {debug_info['input_data_length']} items")
result = 0
for i, value in enumerate(data):
if debug_level == "verbose" and i % (len(data) // 5) == 0:
print(f"Progress: {i}/{len(data)} ({100*i/len(data):.1f}%)")
result += value ** 2
# Simulate occasional issues
if i == len(data) // 2 and debug_level == "test_error":
raise ValueError(f"Test error at position {i}")
debug_info.update({
'execution_end': datetime.now().isoformat(),
'success': True,
'result': result,
'items_processed': len(data)
})
if debug_level in ["info", "verbose"]:
print(f"Computation completed successfully")
return debug_info
except Exception as e:
debug_info.update({
'execution_end': datetime.now().isoformat(),
'success': False,
'error_type': str(type(e).__name__),
'error_message': str(e),
'traceback': traceback.format_exc()
})
if debug_level in ["info", "verbose"]:
print(f"Computation failed: {e}")
return debug_info
# Function to test serialization issues
@cluster(cores=2)
def test_serialization_debug(problematic_object):
"""Test function that may have serialization issues."""
try:
# Try to use the problematic object
result = problematic_object.some_method() if hasattr(problematic_object, 'some_method') else str(problematic_object)
return {'success': True, 'result': result}
except Exception as e:
return {
'success': False,
'error': str(e),
'object_type': str(type(problematic_object))
}
print("Debugging Utilities:")
print("=" * 20)
# Test normal execution with debug info
print("\n1. Normal Execution with Debug Info:")
test_data = list(range(100))
debug_result = debug_computation(test_data, debug_level="info")
print(f" Success: {debug_result['success']}")
print(f" Platform: {debug_result['platform'][:50]}...")
print(f" Process ID: {debug_result['process_id']}")
print(f" Items processed: {debug_result.get('items_processed', 'N/A')}")
if 'result' in debug_result:
print(f" Result: {debug_result['result']}")
# Test error handling
print("\n2. Error Handling Test:")
error_result = debug_computation(test_data, debug_level="test_error")
print(f" Success: {error_result['success']}")
if not error_result['success']:
print(f" Error type: {error_result['error_type']}")
print(f" Error message: {error_result['error_message']}")
print(f" Traceback available: {'traceback' in error_result}")
# Test serialization debugging
print("\n3. Serialization Testing:")
# Test with simple object (should work)
simple_obj = [1, 2, 3, 4, 5]
simple_result = test_serialization_debug(simple_obj)
print(f" Simple object: {simple_result['success']}")
# Test with complex object (may have issues)
class ComplexObject:
def __init__(self):
self.data = "test"
def some_method(self):
return f"Method called on {self.data}"
complex_obj = ComplexObject()
complex_result = test_serialization_debug(complex_obj)
print(f" Complex object: {complex_result['success']}")
if complex_result['success']:
print(f" Result: {complex_result['result']}")
else:
print(f" Error: {complex_result['error'][:50]}...")
# Show debugging best practices
print("\n4. Debugging Best Practices:")
best_practices = [
"Use debug_level parameters to control output verbosity",
"Include execution environment information in results",
"Test serialization with simple objects first",
"Use try-catch blocks to capture and return error information",
"Include timestamps for performance analysis",
"Monitor resource usage during execution",
"Test with small datasets before scaling up"
]
for i, practice in enumerate(best_practices, 1):
print(f" {i}. {practice}")
Best PracticesΒΆ
1. Performance OptimizationΒΆ
Best practices for optimal performance:
[ ]:
def demonstrate_performance_best_practices():
"""
Demonstrate best practices for performance optimization.
"""
print("Performance Optimization Best Practices:")
print("=" * 45)
best_practices = {
'resource_allocation': {
'title': 'Resource Allocation',
'practices': [
"Profile your code locally before scaling to clusters",
"Use appropriate core counts (typically 1-2x physical cores)",
"Allocate memory with 20-30% buffer for overhead",
"Set realistic time limits with buffer for completion",
"Use parallel=True for CPU-bound loops",
"Consider I/O vs CPU workload for executor selection"
],
'example': '''
# Good resource allocation
@cluster(
cores=8, # Based on profiling
memory="32GB", # 25% buffer included
time="02:30:00", # 30min buffer for 2hr job
parallel=True # Enable for CPU-bound work
)
def optimized_computation(data):
return process_data_efficiently(data)
'''
},
'data_management': {
'title': 'Data Management',
'practices': [
"Minimize data transfer between local and remote",
"Use efficient data formats (NumPy arrays, not lists)",
"Chunk large datasets for parallel processing",
"Avoid loading unnecessary data into memory",
"Use generators for large data streams",
"Consider data locality for cluster placement"
],
'example': '''
# Efficient data handling
@cluster(cores=8, parallel=True)
def process_large_dataset(chunk_size=10000):
"""Process data in chunks to optimize memory usage."""
import numpy as np
results = []
for chunk_id in range(100): # Parallelized
# Generate chunk on remote (not transfer)
chunk = np.random.random(chunk_size)
result = np.mean(chunk ** 2) # Efficient NumPy
results.append(result)
return np.mean(results) # Return summary, not raw data
'''
},
'parallelization': {
'title': 'Parallelization Strategy',
'practices': [
"Identify embarrassingly parallel components",
"Minimize shared state between parallel tasks",
"Use appropriate chunk sizes for load balancing",
"Avoid fine-grained parallelism with high overhead",
"Consider communication costs in distributed algorithms",
"Test parallel efficiency with different core counts"
],
'example': '''
# Good parallelization pattern
@cluster(cores=16, parallel=True)
def parallel_monte_carlo(n_samples=1000000):
"""Monte Carlo with optimal chunk size."""
import numpy as np
results = []
chunk_size = n_samples // 100 # 100 chunks for load balancing
for chunk in range(100): # Parallelized across cores
# Independent computation per chunk
x = np.random.random(chunk_size)
y = np.random.random(chunk_size)
inside = (x**2 + y**2) <= 1
results.append(np.sum(inside))
return 4 * sum(results) / n_samples
'''
},
'cluster_optimization': {
'title': 'Cluster-Specific Optimization',
'practices': [
"Choose appropriate partitions/queues for workload",
"Use job arrays for parameter sweeps",
"Leverage cluster-specific features (GPUs, fast storage)",
"Monitor queue times and adjust submission strategy",
"Use checkpointing for long-running jobs",
"Clean up temporary files to avoid storage issues"
],
'example': '''
# Cluster-optimized job submission
@cluster(
cores=32,
memory="128GB",
time="12:00:00",
partition="bigmem", # Appropriate partition
array="1-100", # Parameter sweep
gres="gpu:2", # Request GPUs if needed
cleanup_on_success=True # Clean temporary files
)
def cluster_optimized_job(params):
return run_with_checkpointing(params)
'''
}
}
for category, info in best_practices.items():
print(f"\n{info['title'].upper()}:")
for i, practice in enumerate(info['practices'], 1):
print(f" {i}. {practice}")
print(f"\nExample:{info['example']}")
return best_practices
# Demonstrate performance best practices
perf_practices = demonstrate_performance_best_practices()
2. Security and ReliabilityΒΆ
Best practices for secure and reliable distributed computing:
[ ]:
def demonstrate_security_best_practices():
"""
Demonstrate security and reliability best practices.
"""
print("Security and Reliability Best Practices:")
print("=" * 45)
security_practices = {
'authentication': {
'title': 'Authentication and Access',
'practices': [
"Use SSH key authentication, never passwords",
"Protect private keys with strong passphrases",
"Use separate keys for different environments",
"Regularly rotate SSH keys (6-12 months)",
"Set proper file permissions (600 for private keys)",
"Use SSH config for consistent settings"
],
'example': '''
# Secure SSH configuration
configure(
cluster_type="slurm",
cluster_host="secure-cluster.edu",
username="researcher",
key_file="~/.ssh/clustrix_production_key", # Dedicated key
port=2222, # Non-standard port
# Never use password in production
)
'''
},
'data_security': {
'title': 'Data Security',
'practices': [
"Never include secrets or credentials in code",
"Use environment variables for sensitive data",
"Encrypt sensitive data before transfer",
"Clean up temporary files containing sensitive data",
"Use secure remote directories with proper permissions",
"Audit data access and transfers"
],
'example': '''
# Secure data handling
@cluster(cores=4, cleanup_on_success=True)
def secure_data_processing(encrypted_data):
"""Process data securely with cleanup."""
import os
import tempfile
# Use environment variable for decryption key
decryption_key = os.environ.get('DECRYPTION_KEY')
if not decryption_key:
raise ValueError("Decryption key not found")
# Process in temporary location
with tempfile.TemporaryDirectory() as temp_dir:
# Decrypt and process
data = decrypt_data(encrypted_data, decryption_key)
result = analyze_data(data)
# Clear sensitive data
del data, decryption_key
return result # Only return non-sensitive results
'''
},
'reliability': {
'title': 'Reliability and Fault Tolerance',
'practices': [
"Implement retry logic for transient failures",
"Use checkpointing for long-running computations",
"Validate inputs before expensive computations",
"Monitor resource usage to avoid exhaustion",
"Set appropriate timeouts for all operations",
"Log important events for debugging"
],
'example': '''
# Reliable computation with fault tolerance
@cluster(cores=8, time="04:00:00", backoff_limit=3)
def reliable_computation(data, checkpoint_interval=1000):
"""Computation with checkpointing and validation."""
import os
import pickle
import logging
# Validate inputs
if not data or len(data) == 0:
raise ValueError("Input data is empty")
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Check for existing checkpoint
checkpoint_file = "computation_checkpoint.pkl"
start_index = 0
results = []
if os.path.exists(checkpoint_file):
with open(checkpoint_file, 'rb') as f:
checkpoint = pickle.load(f)
start_index = checkpoint['index']
results = checkpoint['results']
logger.info(f"Resuming from checkpoint at index {start_index}")
# Process with checkpointing
for i in range(start_index, len(data)):
try:
result = expensive_operation(data[i])
results.append(result)
# Save checkpoint periodically
if (i + 1) % checkpoint_interval == 0:
checkpoint = {'index': i + 1, 'results': results}
with open(checkpoint_file, 'wb') as f:
pickle.dump(checkpoint, f)
logger.info(f"Checkpoint saved at index {i + 1}")
except Exception as e:
logger.error(f"Error at index {i}: {e}")
# Continue with next item
results.append(None)
# Cleanup checkpoint file
if os.path.exists(checkpoint_file):
os.unlink(checkpoint_file)
return {'results': results, 'success_rate': sum(1 for r in results if r is not None) / len(results)}
'''
},
'monitoring': {
'title': 'Monitoring and Maintenance',
'practices': [
"Monitor cluster resource usage regularly",
"Set up alerts for job failures",
"Track job completion times and success rates",
"Monitor disk usage in work directories",
"Keep logs of cluster operations",
"Regularly update and patch cluster software"
],
'example': '''
# Computation with monitoring
@cluster(cores=4, time="02:00:00")
def monitored_computation(data):
"""Computation with built-in monitoring."""
import psutil
import time
import logging
logger = logging.getLogger(__name__)
start_time = time.time()
# Log start
logger.info(f"Starting computation with {len(data)} items")
# Monitor resources
initial_memory = psutil.virtual_memory().percent
try:
result = process_data(data)
# Log success
execution_time = time.time() - start_time
final_memory = psutil.virtual_memory().percent
logger.info(f"Computation completed in {execution_time:.2f}s")
logger.info(f"Memory usage: {initial_memory:.1f}% -> {final_memory:.1f}%")
return {
'result': result,
'execution_time': execution_time,
'memory_delta': final_memory - initial_memory
}
except Exception as e:
logger.error(f"Computation failed after {time.time() - start_time:.2f}s: {e}")
raise
'''
}
}
for category, info in security_practices.items():
print(f"\n{info['title'].upper()}:")
for i, practice in enumerate(info['practices'], 1):
print(f" {i}. {practice}")
print(f"\nExample:{info['example']}")
return security_practices
# Demonstrate security best practices
security_practices = demonstrate_security_best_practices()
SummaryΒΆ
This notebook has demonstrated the complete Clustrix API including:
Core Functions:ΒΆ
clustrix.configure()- Configure cluster connections and defaults@clusterdecorator - Distributed function executionclustrix.get_config()- Retrieve current configurationClusterConfig.from_file()- Load configuration from files
Advanced Features:ΒΆ
Automatic Parallelization -
parallel=Truefor loop distributionResource Specification - cores, memory, time limits
Environment Management - conda, virtualenv, modules
Error Handling - robust error recovery and debugging
Performance Monitoring - resource usage tracking
Custom Serialization - handling complex objects
Cluster Types Supported:ΒΆ
Local - multiprocessing and threading
SLURM - HPC workload manager
PBS/Torque - batch systems
SGE - Sun Grid Engine
Kubernetes - containerized execution
SSH - direct remote execution
Best Practices Covered:ΒΆ
Performance optimization strategies
Security and authentication
Reliability and fault tolerance
Monitoring and debugging
Resource management
For more information, see: