Complete Clustrix API DemonstrationΒΆ

Open In Colab

This notebook provides a comprehensive demonstration of all Clustrix user-facing functions and features. It serves as both a tutorial and a reference for the complete API.

Table of ContentsΒΆ

  1. Installation and Setup

  2. Configuration Functions

  3. Cluster Decorator

  4. Local Execution

  5. Remote Cluster Execution

  6. Advanced Features

  7. Monitoring and Debugging

  8. Error Handling

  9. Best Practices

Installation and SetupΒΆ

First, let’s install and import Clustrix:

[ ]:
# Install Clustrix (uncomment if needed)
# !pip install clustrix
# !pip install clustrix[kubernetes]  # With Kubernetes support

# Import all Clustrix components
import clustrix
from clustrix import cluster, configure, get_config
from clustrix.config import ClusterConfig
from clustrix.executor import ClusterExecutor
from clustrix.local_executor import LocalExecutor

# Standard libraries for examples
import numpy as np
import time
import os
from datetime import datetime

print(f"Clustrix version: {clustrix.__version__ if hasattr(clustrix, '__version__') else 'development'}")
print(f"Import successful at {datetime.now()}")

Configuration FunctionsΒΆ

1. Basic ConfigurationΒΆ

The configure() function is the primary way to set up Clustrix:

[ ]:
# Basic local configuration
clustrix.configure(
    cluster_type="local",              # Use local execution
    default_cores=4,                   # Default number of cores
    default_memory="8GB",              # Default memory allocation
    auto_parallel=True,                # Enable automatic parallelization
    max_parallel_jobs=10               # Maximum concurrent jobs
)

print("βœ“ Basic local configuration set")

# Get current configuration
config = clustrix.get_config()
print(f"Current cluster type: {config.cluster_type}")
print(f"Default cores: {config.default_cores}")
print(f"Default memory: {config.default_memory}")
print(f"Auto parallel: {config.auto_parallel}")

2. All Configuration OptionsΒΆ

Comprehensive configuration with all available options:

[ ]:
def demonstrate_all_config_options():
    """
    Demonstrate all available configuration options for different cluster types.
    """

    configurations = {
        'local': {
            'cluster_type': 'local',
            'default_cores': 4,
            'default_memory': '8GB',
            'auto_parallel': True,
            'max_parallel_jobs': 8,
            'cleanup_on_success': True
        },
        'slurm': {
            'cluster_type': 'slurm',
            'cluster_host': 'slurm-cluster.university.edu',
            'username': 'researcher',
            'key_file': '~/.ssh/id_rsa',
            'port': 22,
            'default_cores': 8,
            'default_memory': '32GB',
            'default_time': '02:00:00',
            'default_partition': 'normal',
            'default_account': 'research_group',
            'default_qos': 'normal',
            'remote_work_dir': '/scratch/researcher/clustrix',
            'module_loads': ['python/3.9', 'gcc/9.3.0'],
            'conda_env_name': 'myproject',
            'cleanup_on_success': True,
            'max_parallel_jobs': 20
        },
        'pbs': {
            'cluster_type': 'pbs',
            'cluster_host': 'pbs-cluster.org',
            'username': 'scientist',
            'key_file': '~/.ssh/pbs_key',
            'default_cores': 6,
            'default_memory': '24GB',
            'default_time': '04:00:00',
            'default_queue': 'bioqueue',
            'remote_work_dir': '/home/scientist/clustrix',
            'walltime': '04:00:00',  # PBS-specific
            'features': 'infiniband',
            'cleanup_on_success': True
        },
        'sge': {
            'cluster_type': 'sge',
            'cluster_host': 'sge-cluster.example.com',
            'username': 'engineer',
            'key_file': '~/.ssh/sge_key',
            'default_cores': 12,
            'default_memory': '48GB',
            'default_time': '06:00:00',
            'default_queue': 'all.q',
            'pe': 'smp',  # SGE parallel environment
            'remote_work_dir': '/home/engineer/clustrix'
        },
        'kubernetes': {
            'cluster_type': 'kubernetes',
            'k8s_namespace': 'default',
            'k8s_config_file': '~/.kube/config',
            'default_cores': 4,
            'default_memory': '8Gi',
            'default_cpu_limit': 6,
            'default_memory_limit': '12Gi',
            'container_image': 'python:3.11-slim',
            'image_pull_policy': 'IfNotPresent',
            'job_ttl_seconds': 3600,
            'backoff_limit': 3,
            'restart_policy': 'OnFailure'
        },
        'ssh': {
            'cluster_type': 'ssh',
            'cluster_host': 'remote-server.example.com',
            'username': 'developer',
            'key_file': '~/.ssh/dev_key',
            'port': 22,
            'remote_work_dir': '/home/developer/clustrix',
            'python_executable': 'python3',
            'virtualenv_path': '/home/developer/venv/myproject',
            'cleanup_on_success': True,
            'max_parallel_jobs': 5
        }
    }

    print("Configuration Options for All Cluster Types:")
    print("=" * 50)

    for cluster_type, config_options in configurations.items():
        print(f"\n{cluster_type.upper()} Configuration:")
        for key, value in config_options.items():
            print(f"  {key}: {value}")

    return configurations

# Display all configuration options
all_configs = demonstrate_all_config_options()

3. Configuration from FileΒΆ

Load configuration from YAML files:

[ ]:
import tempfile
import yaml
import os

# Create a sample configuration file
sample_config = {
    'cluster_type': 'local',
    'default_cores': 6,
    'default_memory': '16GB',
    'auto_parallel': True,
    'max_parallel_jobs': 12,
    'cleanup_on_success': True,
    'environment_variables': {
        'OMP_NUM_THREADS': '6',
        'PYTHONPATH': '/custom/path'
    }
}

# Write to temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f:
    yaml.dump(sample_config, f)
    config_file = f.name

print(f"Created configuration file: {config_file}")

# Load configuration from file
config = ClusterConfig.from_file(config_file)
print(f"\nLoaded configuration:")
print(f"  Cluster type: {config.cluster_type}")
print(f"  Cores: {config.default_cores}")
print(f"  Memory: {config.default_memory}")
print(f"  Max parallel jobs: {config.max_parallel_jobs}")

# Apply the configuration
clustrix.configure(**config.__dict__)

# Cleanup
os.unlink(config_file)
print("\nβœ“ Configuration loaded from file and applied")

Cluster DecoratorΒΆ

1. Basic Decorator UsageΒΆ

The @cluster decorator is the main interface for distributed execution:

[ ]:
# Basic decorator usage
@cluster
def simple_function(x, y):
    """A simple function with default cluster settings."""
    import time
    time.sleep(0.1)  # Simulate some work
    return x + y

result = simple_function(5, 10)
print(f"Simple function result: {result}")

# Decorator with resource specification
@cluster(cores=4, memory="8GB")
def resource_specific_function(data_size):
    """Function with specific resource requirements."""
    import numpy as np
    data = np.random.random(data_size)
    return {
        'mean': np.mean(data),
        'std': np.std(data),
        'size': len(data)
    }

stats = resource_specific_function(100000)
print(f"Resource-specific function result: mean={stats['mean']:.4f}, std={stats['std']:.4f}")

2. All Decorator ParametersΒΆ

Comprehensive demonstration of all decorator parameters:

[ ]:
def demonstrate_decorator_parameters():
    """
    Show all available parameters for the @cluster decorator.
    """

    # Basic resource parameters
    @cluster(
        cores=8,                    # Number of CPU cores
        memory="32GB",              # Memory allocation
        time="02:00:00",            # Time limit (HH:MM:SS)
        parallel=True               # Enable automatic parallelization
    )
    def basic_resources_demo(n):
        """Basic resource specification."""
        return sum(i**2 for i in range(n))

    # Scheduler-specific parameters
    @cluster(
        cores=16,
        memory="64GB",
        time="04:00:00",
        # SLURM-specific
        partition="gpu",            # SLURM partition
        account="research_group",   # SLURM account
        qos="high",                 # Quality of Service
        gres="gpu:2",               # Generic resources (GPUs)
        constraint="haswell",       # Node constraints
        array="1-10",               # Job array specification
        # PBS-specific
        queue="bioqueue",           # PBS queue
        walltime="04:00:00",        # PBS walltime
        features="infiniband",      # PBS features
        # SGE-specific
        pe="smp 16",                # SGE parallel environment
        sge_array="1-20"            # SGE task array
    )
    def scheduler_specific_demo(data):
        """Scheduler-specific parameter demonstration."""
        import numpy as np
        return np.mean(data)

    # Kubernetes-specific parameters
    @cluster(
        cores=4,
        memory="16Gi",              # Kubernetes memory format
        cpu_limit=6,                # CPU limit (can exceed cores)
        memory_limit="24Gi",        # Memory limit
        container_image="python:3.11",  # Container image
        job_name="custom-job",      # Kubernetes job name
        parallelism=3,              # Parallel pod execution
        completions=10,             # Total completions needed
        backoff_limit=3,            # Retry limit on failure
        restart_policy="OnFailure", # Pod restart policy
        job_ttl_seconds=7200,       # Job cleanup time
        active_deadline_seconds=3600  # Maximum job runtime
    )
    def kubernetes_demo(task_id):
        """Kubernetes-specific parameter demonstration."""
        import os
        import time
        time.sleep(1)
        return {
            'task_id': task_id,
            'pod_name': os.environ.get('HOSTNAME', 'unknown'),
            'completion_time': time.time()
        }

    # Environment and execution parameters
    @cluster(
        cores=4,
        memory="16GB",
        environment={'OMP_NUM_THREADS': '4', 'CUDA_VISIBLE_DEVICES': '0'},
        conda_env="myproject",       # Conda environment
        virtualenv_path="/path/to/venv",  # Virtual environment
        python_executable="python3", # Python command
        working_directory="/tmp",    # Working directory
        cleanup_files=True,          # Cleanup temporary files
        timeout=3600                 # Execution timeout
    )
    def environment_demo(message):
        """Environment configuration demonstration."""
        import os
        return {
            'message': message,
            'omp_threads': os.environ.get('OMP_NUM_THREADS', 'not_set'),
            'cuda_devices': os.environ.get('CUDA_VISIBLE_DEVICES', 'not_set'),
            'working_dir': os.getcwd()
        }

    print("Decorator Parameter Demonstrations:")
    print("=" * 40)

    # Run basic resources demo
    print("\n1. Basic Resources Demo:")
    result1 = basic_resources_demo(1000)
    print(f"   Sum of squares: {result1:,}")

    # Run environment demo
    print("\n2. Environment Demo:")
    result2 = environment_demo("Hello from cluster!")
    print(f"   Message: {result2['message']}")
    print(f"   OMP threads: {result2['omp_threads']}")
    print(f"   Working dir: {result2['working_dir']}")

    return {
        'basic_resources': basic_resources_demo,
        'scheduler_specific': scheduler_specific_demo,
        'kubernetes': kubernetes_demo,
        'environment': environment_demo
    }

# Demonstrate all decorator parameters
decorator_functions = demonstrate_decorator_parameters()

3. Automatic ParallelizationΒΆ

Clustrix can automatically parallelize loops:

[ ]:
# Sequential execution (default)
@cluster(cores=4, parallel=False)
def sequential_processing(items):
    """Process items sequentially."""
    import time
    results = []
    for item in items:
        time.sleep(0.01)  # Simulate work
        results.append(item ** 2)
    return results

# Parallel execution
@cluster(cores=4, parallel=True)
def parallel_processing(items):
    """Process items in parallel."""
    import time
    results = []
    for item in items:  # This loop will be parallelized
        time.sleep(0.01)  # Simulate work
        results.append(item ** 2)
    return results

# Test data
test_items = list(range(20))

# Time sequential execution
start = time.time()
seq_result = sequential_processing(test_items)
seq_time = time.time() - start

# Time parallel execution
start = time.time()
par_result = parallel_processing(test_items)
par_time = time.time() - start

print(f"Sequential execution: {seq_time:.3f} seconds")
print(f"Parallel execution: {par_time:.3f} seconds")
print(f"Speedup: {seq_time/par_time:.2f}x")
print(f"Results match: {seq_result == par_result}")
print(f"Sample results: {seq_result[:5]}")

Local ExecutionΒΆ

1. Local Executor Direct UsageΒΆ

Use the LocalExecutor directly for fine-grained control:

[ ]:
from clustrix.local_executor import LocalExecutor

# Create local executor
local_config = ClusterConfig(
    cluster_type="local",
    default_cores=4,
    auto_parallel=True
)

executor = LocalExecutor(local_config)

# Define a function to execute
def compute_statistics(data):
    """Compute basic statistics on data."""
    import numpy as np
    return {
        'mean': np.mean(data),
        'std': np.std(data),
        'median': np.median(data),
        'min': np.min(data),
        'max': np.max(data)
    }

# Execute function with local executor
test_data = np.random.normal(100, 15, 10000)
result = executor.execute_function(compute_statistics, (test_data,), {})

print("Local Executor Results:")
for key, value in result.items():
    print(f"  {key}: {value:.4f}")

# Test parallel loop execution
def parallel_computation(n_iterations):
    """Function with parallelizable loop."""
    import numpy as np
    results = []
    for i in range(n_iterations):
        # Simulate CPU-intensive work
        data = np.random.random(1000)
        result = np.sum(data ** 2)
        results.append(result)
    return np.mean(results)

# Execute with automatic parallelization
start_time = time.time()
parallel_result = executor.execute_loop_parallel(
    parallel_computation,
    'i',
    range(100),  # Will be chunked across cores
    cores=4
)
execution_time = time.time() - start_time

print(f"\nParallel loop execution:")
print(f"  Result: {parallel_result:.6f}")
print(f"  Execution time: {execution_time:.3f} seconds")

2. CPU vs I/O DetectionΒΆ

Clustrix automatically chooses between multiprocessing and threading:

[ ]:
from clustrix.local_executor import choose_executor_type

# CPU-intensive function
def cpu_intensive_task(n):
    """CPU-bound computation."""
    total = 0
    for i in range(n):
        total += i ** 0.5
    return total

# I/O-intensive function
def io_intensive_task(filename):
    """I/O-bound operation."""
    import time
    time.sleep(0.1)  # Simulate I/O wait
    with open(filename, 'w') as f:
        f.write("test data")
    return f"File {filename} written"

# Function with network I/O patterns
def network_task(url):
    """Network request simulation."""
    import urllib.request
    import time
    time.sleep(0.05)  # Simulate network latency
    return f"Fetched {url}"

# Test executor type selection
test_cases = [
    (cpu_intensive_task, (10000,), {}),
    (io_intensive_task, ("/tmp/test.txt",), {}),
    (network_task, ("http://example.com",), {})
]

print("Executor Type Selection:")
print("=" * 30)

for func, args, kwargs in test_cases:
    use_threads = choose_executor_type(func, args, kwargs)
    executor_type = "ThreadPoolExecutor" if use_threads else "ProcessPoolExecutor"
    task_type = "I/O-bound" if use_threads else "CPU-bound"

    print(f"Function: {func.__name__}")
    print(f"  Detected as: {task_type}")
    print(f"  Will use: {executor_type}")
    print()

Remote Cluster ExecutionΒΆ

1. Cluster Executor Direct UsageΒΆ

Use ClusterExecutor for direct cluster operations:

[ ]:
# Note: This section demonstrates the API but won't actually connect to remote clusters
# in this demo notebook

def demonstrate_cluster_executor_api():
    """
    Demonstrate the ClusterExecutor API without actually connecting.
    """

    # Example configurations for different cluster types
    cluster_configs = {
        'slurm': ClusterConfig(
            cluster_type="slurm",
            cluster_host="slurm-cluster.edu",
            username="researcher",
            key_file="~/.ssh/id_rsa",
            default_partition="normal"
        ),
        'pbs': ClusterConfig(
            cluster_type="pbs",
            cluster_host="pbs-cluster.org",
            username="scientist",
            default_queue="bioqueue"
        ),
        'kubernetes': ClusterConfig(
            cluster_type="kubernetes",
            k8s_namespace="default",
            container_image="python:3.11-slim"
        )
    }

    print("Cluster Executor API Demonstration:")
    print("=" * 40)

    for cluster_type, config in cluster_configs.items():
        print(f"\n{cluster_type.upper()} Executor:")

        # Create executor (but don't connect)
        executor = ClusterExecutor(config)

        print(f"  Cluster type: {executor.config.cluster_type}")
        print(f"  Config object: {type(executor.config).__name__}")

        # Show available methods
        methods = [method for method in dir(executor)
                  if not method.startswith('_') and callable(getattr(executor, method))]
        print(f"  Available methods: {', '.join(methods[:5])}...")

    # Example of what cluster execution would look like
    print("\nExample cluster execution pattern:")
    print("""
    # 1. Create and configure executor
    executor = ClusterExecutor(config)

    # 2. Connect to cluster
    executor.connect()

    # 3. Submit job
    job_id = executor.submit_job(function, args, kwargs, job_config)

    # 4. Monitor job status
    status = executor.get_job_status(job_id)

    # 5. Retrieve results
    result = executor.get_result(job_id)

    # 6. Cleanup
    executor.cleanup_job(job_id)
    executor.disconnect()
    """)

    return cluster_configs

# Demonstrate the API
cluster_configs = demonstrate_cluster_executor_api()

2. Job Management FunctionsΒΆ

Functions for managing cluster jobs:

[ ]:
def demonstrate_job_management():
    """
    Demonstrate job management functions and patterns.
    """

    print("Job Management Functions:")
    print("=" * 30)

    # Job submission patterns
    job_patterns = {
        'single_job': {
            'description': 'Submit single job with specific resources',
            'example': '''
@cluster(cores=8, memory="32GB", time="02:00:00")
def my_computation(data):
    return process_data(data)
            '''
        },
        'job_array': {
            'description': 'Submit job array for parameter sweeps',
            'example': '''
@cluster(cores=4, memory="16GB", array="1-100")
def parameter_sweep(base_params):
    task_id = int(os.environ.get('SLURM_ARRAY_TASK_ID', '1'))
    params = modify_params(base_params, task_id)
    return run_simulation(params)
            '''
        },
        'parallel_jobs': {
            'description': 'Submit multiple independent jobs',
            'example': '''
@cluster(cores=4, memory="16GB", parallel=True)
def parallel_analysis(datasets):
    results = []
    for dataset in datasets:  # Each iteration becomes separate job
        results.append(analyze_dataset(dataset))
    return results
            '''
        },
        'dependent_jobs': {
            'description': 'Chain jobs with dependencies',
            'example': '''
# Job 1: Data preprocessing
@cluster(cores=4, memory="16GB")
def preprocess_data(raw_data):
    return clean_and_transform(raw_data)

# Job 2: Analysis (depends on Job 1)
@cluster(cores=8, memory="32GB", dependency="afterok:$JOB1_ID")
def analyze_processed_data(processed_data):
    return run_analysis(processed_data)
            '''
        }
    }

    for pattern_name, pattern_info in job_patterns.items():
        print(f"\n{pattern_name.upper().replace('_', ' ')}:")
        print(f"  Description: {pattern_info['description']}")
        print(f"  Example:{pattern_info['example']}")

    # Job monitoring functions
    print("\n" + "=" * 30)
    print("Job Monitoring Functions:")

    monitoring_functions = {
        'get_job_status()': 'Check current status of submitted job',
        'list_active_jobs()': 'List all active jobs for user',
        'get_job_info()': 'Get detailed information about specific job',
        'cancel_job()': 'Cancel running or queued job',
        'get_job_output()': 'Retrieve stdout/stderr from completed job',
        'get_job_resources()': 'Get resource usage statistics',
        'estimate_queue_time()': 'Estimate queue wait time for job'
    }

    for func_name, description in monitoring_functions.items():
        print(f"  {func_name:20} - {description}")

    return job_patterns

# Demonstrate job management
job_patterns = demonstrate_job_management()

Advanced FeaturesΒΆ

1. Custom SerializationΒΆ

Handle complex objects and custom serialization:

[ ]:
import pickle
import cloudpickle
import dill

class CustomClass:
    """A custom class to test serialization."""

    def __init__(self, name, data):
        self.name = name
        self.data = data

    def process(self):
        return f"Processed {self.name} with {len(self.data)} items"

    def __repr__(self):
        return f"CustomClass(name='{self.name}', data_length={len(self.data)})"

# Test serialization with different libraries
@cluster(cores=2)
def test_serialization(custom_obj, serializer_name):
    """Test custom object serialization."""
    result = custom_obj.process()
    return {
        'serializer': serializer_name,
        'object_name': custom_obj.name,
        'result': result,
        'data_length': len(custom_obj.data)
    }

# Create test object
test_obj = CustomClass("test_object", list(range(1000)))

# Test with different serializers
serializers = ['cloudpickle', 'dill', 'pickle']

print("Serialization Testing:")
print("=" * 25)

for serializer in serializers:
    try:
        result = test_serialization(test_obj, serializer)
        print(f"\n{serializer.upper()}:")
        print(f"  βœ“ Serialization successful")
        print(f"  Object: {result['object_name']}")
        print(f"  Result: {result['result']}")
    except Exception as e:
        print(f"\n{serializer.upper()}:")
        print(f"  βœ— Serialization failed: {e}")

# Test lambda function serialization
@cluster(cores=2)
def test_lambda_serialization(data, transform_func):
    """Test lambda function serialization."""
    transformed = [transform_func(x) for x in data]
    return {
        'original_data': data,
        'transformed_data': transformed,
        'function_type': str(type(transform_func))
    }

# Test with lambda
test_data = [1, 2, 3, 4, 5]
lambda_func = lambda x: x ** 2

try:
    lambda_result = test_lambda_serialization(test_data, lambda_func)
    print(f"\nLAMBDA FUNCTION SERIALIZATION:")
    print(f"  βœ“ Success")
    print(f"  Original: {lambda_result['original_data']}")
    print(f"  Transformed: {lambda_result['transformed_data']}")
except Exception as e:
    print(f"\nLAMBDA FUNCTION SERIALIZATION:")
    print(f"  βœ— Failed: {e}")

2. Environment ManagementΒΆ

Manage remote environments and dependencies:

[ ]:
def demonstrate_environment_management():
    """
    Demonstrate environment management features.
    """

    print("Environment Management Features:")
    print("=" * 35)

    # Environment configuration options
    env_configs = {
        'conda_environment': {
            'description': 'Use conda environment on remote cluster',
            'config': {
                'conda_env_name': 'myproject',
                'conda_path': '/opt/conda/bin/conda'
            },
            'usage': '''
@cluster(cores=4, conda_env="myproject")
def ml_computation(data):
    import tensorflow as tf  # Available in conda env
    return train_model(data)
            '''
        },
        'virtual_environment': {
            'description': 'Use Python virtual environment',
            'config': {
                'virtualenv_path': '/home/user/venv/myproject',
                'python_executable': 'python3'
            },
            'usage': '''
configure(
    cluster_type="ssh",
    virtualenv_path="/home/user/venv/myproject"
)
            '''
        },
        'module_loading': {
            'description': 'Load environment modules (HPC clusters)',
            'config': {
                'module_loads': ['python/3.9', 'gcc/9.3.0', 'openmpi/4.1']
            },
            'usage': '''
configure(
    cluster_type="slurm",
    module_loads=["python/3.9", "gcc/9.3.0"]
)
            '''
        },
        'environment_variables': {
            'description': 'Set custom environment variables',
            'config': {
                'environment_variables': {
                    'OMP_NUM_THREADS': '8',
                    'CUDA_VISIBLE_DEVICES': '0,1',
                    'PYTHONPATH': '/custom/path'
                }
            },
            'usage': '''
@cluster(
    cores=8,
    environment={
        'OMP_NUM_THREADS': '8',
        'CUDA_VISIBLE_DEVICES': '0,1'
    }
)
def gpu_computation(data):
    return process_on_gpu(data)
            '''
        },
        'dependency_management': {
            'description': 'Automatic dependency installation',
            'config': {
                'pip_requirements': ['numpy>=1.20', 'scipy>=1.7', 'scikit-learn'],
                'conda_packages': ['tensorflow', 'pytorch']
            },
            'usage': '''
# Clustrix automatically captures local environment
# and recreates it on remote cluster using pip freeze
@cluster(cores=4)
def analysis_with_deps(data):
    import pandas as pd  # Will be installed if missing
    import sklearn       # Will be installed if missing
    return analyze_data(data)
            '''
        }
    }

    for env_type, env_info in env_configs.items():
        print(f"\n{env_type.upper().replace('_', ' ')}:")
        print(f"  Description: {env_info['description']}")
        print(f"  Configuration:")
        for key, value in env_info['config'].items():
            print(f"    {key}: {value}")
        print(f"  Usage example:{env_info['usage']}")

    return env_configs

# Demonstrate environment management
env_configs = demonstrate_environment_management()

3. Error Handling and RecoveryΒΆ

Robust error handling and recovery mechanisms:

[ ]:
import random

# Function that may fail randomly
@cluster(cores=2)
def unreliable_computation(data, failure_rate=0.3):
    """A computation that may fail randomly."""
    import random
    import time

    # Simulate random failures
    if random.random() < failure_rate:
        raise RuntimeError(f"Simulated failure during computation")

    # Simulate work
    time.sleep(0.1)
    result = sum(x**2 for x in data)
    return result

# Function with retry logic
@cluster(cores=2)
def computation_with_retry(data, max_retries=3):
    """Computation with built-in retry logic."""
    import random
    import time

    for attempt in range(max_retries + 1):
        try:
            # Simulate potential failure
            if random.random() < 0.4 and attempt < max_retries:
                raise RuntimeError(f"Attempt {attempt + 1} failed")

            # Actual computation
            time.sleep(0.05)
            result = sum(x**3 for x in data)

            return {
                'result': result,
                'attempts': attempt + 1,
                'success': True
            }

        except Exception as e:
            if attempt == max_retries:
                return {
                    'result': None,
                    'attempts': attempt + 1,
                    'success': False,
                    'error': str(e)
                }
            time.sleep(0.1 * (attempt + 1))  # Exponential backoff

# Function with graceful degradation
@cluster(cores=2)
def robust_computation(data, fallback_method=True):
    """Computation with fallback method."""
    import numpy as np

    try:
        # Primary method (may fail)
        if len(data) > 1000:  # Simulate failure condition
            raise MemoryError("Not enough memory for primary method")

        # Primary computation
        result = np.fft.fft(data).real
        return {
            'result': np.mean(result),
            'method': 'primary_fft',
            'success': True
        }

    except Exception as e:
        if fallback_method:
            # Fallback method
            result = np.mean(data)  # Simple fallback
            return {
                'result': result,
                'method': 'fallback_mean',
                'success': True,
                'warning': f"Used fallback due to: {str(e)}"
            }
        else:
            raise

print("Error Handling and Recovery:")
print("=" * 30)

# Test unreliable computation
test_data = list(range(50))
successes = 0
failures = 0

print("\n1. Testing Unreliable Computation:")
for i in range(10):
    try:
        result = unreliable_computation(test_data, failure_rate=0.3)
        successes += 1
    except Exception as e:
        failures += 1

print(f"   Successes: {successes}/10")
print(f"   Failures: {failures}/10")

# Test computation with retry
print("\n2. Testing Computation with Retry:")
retry_results = []
for i in range(5):
    result = computation_with_retry(test_data, max_retries=3)
    retry_results.append(result)
    status = "βœ“" if result['success'] else "βœ—"
    print(f"   {status} Attempt {i+1}: {result['attempts']} tries, Success: {result['success']}")

# Test robust computation with fallback
print("\n3. Testing Robust Computation:")

# Small data (should use primary method)
small_data = list(range(100))
small_result = robust_computation(small_data)
print(f"   Small data: {small_result['method']}, Result: {small_result['result']:.4f}")

# Large data (should use fallback)
large_data = list(range(2000))
large_result = robust_computation(large_data)
print(f"   Large data: {large_result['method']}, Result: {large_result['result']:.4f}")
if 'warning' in large_result:
    print(f"   Warning: {large_result['warning']}")

Monitoring and DebuggingΒΆ

1. Performance MonitoringΒΆ

Monitor execution performance and resource usage:

[ ]:
import psutil
import threading
import time
from datetime import datetime

class PerformanceMonitor:
    """Monitor performance during function execution."""

    def __init__(self, interval=0.1):
        self.interval = interval
        self.monitoring = False
        self.metrics = []

    def start_monitoring(self):
        """Start performance monitoring."""
        self.monitoring = True
        self.metrics = []

        def monitor():
            while self.monitoring:
                try:
                    cpu_percent = psutil.cpu_percent()
                    memory = psutil.virtual_memory()

                    self.metrics.append({
                        'timestamp': time.time(),
                        'cpu_percent': cpu_percent,
                        'memory_percent': memory.percent,
                        'memory_used_gb': memory.used / (1024**3)
                    })
                except:
                    pass  # Skip if monitoring fails

                time.sleep(self.interval)

        self.monitor_thread = threading.Thread(target=monitor, daemon=True)
        self.monitor_thread.start()

    def stop_monitoring(self):
        """Stop performance monitoring."""
        self.monitoring = False
        if hasattr(self, 'monitor_thread'):
            self.monitor_thread.join(timeout=1.0)

    def get_summary(self):
        """Get performance summary."""
        if not self.metrics:
            return {'error': 'No metrics collected'}

        cpu_values = [m['cpu_percent'] for m in self.metrics]
        memory_values = [m['memory_percent'] for m in self.metrics]

        return {
            'duration_seconds': self.metrics[-1]['timestamp'] - self.metrics[0]['timestamp'],
            'samples_collected': len(self.metrics),
            'cpu_usage': {
                'mean': np.mean(cpu_values),
                'max': np.max(cpu_values),
                'min': np.min(cpu_values),
                'std': np.std(cpu_values)
            },
            'memory_usage': {
                'mean': np.mean(memory_values),
                'max': np.max(memory_values),
                'min': np.min(memory_values),
                'peak_gb': np.max([m['memory_used_gb'] for m in self.metrics])
            }
        }

# Monitored computation function
@cluster(cores=4)
def monitored_computation(size, complexity="medium"):
    """A computation that can be monitored for performance."""
    import numpy as np
    import time

    # Different complexity levels
    if complexity == "low":
        data = np.random.random(size)
        result = np.sum(data)
    elif complexity == "medium":
        data = np.random.random((size, 10))
        result = np.sum(np.dot(data, data.T))
    else:  # high
        data = np.random.random((size, size//10))
        for _ in range(3):
            data = np.dot(data, data.T[:data.shape[1], :])
        result = np.sum(data)

    return {
        'result': float(result),
        'size': size,
        'complexity': complexity
    }

print("Performance Monitoring:")
print("=" * 25)

# Test different complexity levels
test_cases = [
    (1000, "low"),
    (500, "medium"),
    (100, "high")
]

for size, complexity in test_cases:
    print(f"\nTesting {complexity} complexity (size={size}):")

    # Start monitoring
    monitor = PerformanceMonitor(interval=0.05)
    monitor.start_monitoring()

    # Run computation
    start_time = time.time()
    result = monitored_computation(size, complexity)
    end_time = time.time()

    # Stop monitoring
    monitor.stop_monitoring()

    # Get results
    perf_summary = monitor.get_summary()
    execution_time = end_time - start_time

    print(f"  Execution time: {execution_time:.3f} seconds")
    print(f"  Result: {result['result']:.2e}")

    if 'error' not in perf_summary:
        print(f"  CPU usage: {perf_summary['cpu_usage']['mean']:.1f}% avg, {perf_summary['cpu_usage']['max']:.1f}% max")
        print(f"  Memory usage: {perf_summary['memory_usage']['mean']:.1f}% avg, {perf_summary['memory_usage']['peak_gb']:.2f} GB peak")
        print(f"  Samples collected: {perf_summary['samples_collected']}")

2. Debugging UtilitiesΒΆ

Utilities for debugging distributed computations:

[ ]:
import sys
import traceback
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Function with debug information
@cluster(cores=2)
def debug_computation(data, debug_level="info"):
    """Computation with extensive debugging information."""
    import sys
    import os
    import platform
    import time
    from datetime import datetime

    debug_info = {
        'execution_start': datetime.now().isoformat(),
        'python_version': sys.version,
        'platform': platform.platform(),
        'working_directory': os.getcwd(),
        'process_id': os.getpid(),
        'environment_vars': dict(os.environ),
        'input_data_type': str(type(data)),
        'input_data_length': len(data) if hasattr(data, '__len__') else 'unknown'
    }

    try:
        # Simulate computation with progress tracking
        if debug_level == "verbose":
            print(f"Starting computation at {debug_info['execution_start']}")
            print(f"Input data: {debug_info['input_data_type']} with {debug_info['input_data_length']} items")

        result = 0
        for i, value in enumerate(data):
            if debug_level == "verbose" and i % (len(data) // 5) == 0:
                print(f"Progress: {i}/{len(data)} ({100*i/len(data):.1f}%)")

            result += value ** 2

            # Simulate occasional issues
            if i == len(data) // 2 and debug_level == "test_error":
                raise ValueError(f"Test error at position {i}")

        debug_info.update({
            'execution_end': datetime.now().isoformat(),
            'success': True,
            'result': result,
            'items_processed': len(data)
        })

        if debug_level in ["info", "verbose"]:
            print(f"Computation completed successfully")

        return debug_info

    except Exception as e:
        debug_info.update({
            'execution_end': datetime.now().isoformat(),
            'success': False,
            'error_type': str(type(e).__name__),
            'error_message': str(e),
            'traceback': traceback.format_exc()
        })

        if debug_level in ["info", "verbose"]:
            print(f"Computation failed: {e}")

        return debug_info

# Function to test serialization issues
@cluster(cores=2)
def test_serialization_debug(problematic_object):
    """Test function that may have serialization issues."""
    try:
        # Try to use the problematic object
        result = problematic_object.some_method() if hasattr(problematic_object, 'some_method') else str(problematic_object)
        return {'success': True, 'result': result}
    except Exception as e:
        return {
            'success': False,
            'error': str(e),
            'object_type': str(type(problematic_object))
        }

print("Debugging Utilities:")
print("=" * 20)

# Test normal execution with debug info
print("\n1. Normal Execution with Debug Info:")
test_data = list(range(100))
debug_result = debug_computation(test_data, debug_level="info")

print(f"   Success: {debug_result['success']}")
print(f"   Platform: {debug_result['platform'][:50]}...")
print(f"   Process ID: {debug_result['process_id']}")
print(f"   Items processed: {debug_result.get('items_processed', 'N/A')}")
if 'result' in debug_result:
    print(f"   Result: {debug_result['result']}")

# Test error handling
print("\n2. Error Handling Test:")
error_result = debug_computation(test_data, debug_level="test_error")

print(f"   Success: {error_result['success']}")
if not error_result['success']:
    print(f"   Error type: {error_result['error_type']}")
    print(f"   Error message: {error_result['error_message']}")
    print(f"   Traceback available: {'traceback' in error_result}")

# Test serialization debugging
print("\n3. Serialization Testing:")

# Test with simple object (should work)
simple_obj = [1, 2, 3, 4, 5]
simple_result = test_serialization_debug(simple_obj)
print(f"   Simple object: {simple_result['success']}")

# Test with complex object (may have issues)
class ComplexObject:
    def __init__(self):
        self.data = "test"

    def some_method(self):
        return f"Method called on {self.data}"

complex_obj = ComplexObject()
complex_result = test_serialization_debug(complex_obj)
print(f"   Complex object: {complex_result['success']}")
if complex_result['success']:
    print(f"   Result: {complex_result['result']}")
else:
    print(f"   Error: {complex_result['error'][:50]}...")

# Show debugging best practices
print("\n4. Debugging Best Practices:")
best_practices = [
    "Use debug_level parameters to control output verbosity",
    "Include execution environment information in results",
    "Test serialization with simple objects first",
    "Use try-catch blocks to capture and return error information",
    "Include timestamps for performance analysis",
    "Monitor resource usage during execution",
    "Test with small datasets before scaling up"
]

for i, practice in enumerate(best_practices, 1):
    print(f"   {i}. {practice}")

Best PracticesΒΆ

1. Performance OptimizationΒΆ

Best practices for optimal performance:

[ ]:
def demonstrate_performance_best_practices():
    """
    Demonstrate best practices for performance optimization.
    """

    print("Performance Optimization Best Practices:")
    print("=" * 45)

    best_practices = {
        'resource_allocation': {
            'title': 'Resource Allocation',
            'practices': [
                "Profile your code locally before scaling to clusters",
                "Use appropriate core counts (typically 1-2x physical cores)",
                "Allocate memory with 20-30% buffer for overhead",
                "Set realistic time limits with buffer for completion",
                "Use parallel=True for CPU-bound loops",
                "Consider I/O vs CPU workload for executor selection"
            ],
            'example': '''
# Good resource allocation
@cluster(
    cores=8,                    # Based on profiling
    memory="32GB",              # 25% buffer included
    time="02:30:00",            # 30min buffer for 2hr job
    parallel=True               # Enable for CPU-bound work
)
def optimized_computation(data):
    return process_data_efficiently(data)
            '''
        },
        'data_management': {
            'title': 'Data Management',
            'practices': [
                "Minimize data transfer between local and remote",
                "Use efficient data formats (NumPy arrays, not lists)",
                "Chunk large datasets for parallel processing",
                "Avoid loading unnecessary data into memory",
                "Use generators for large data streams",
                "Consider data locality for cluster placement"
            ],
            'example': '''
# Efficient data handling
@cluster(cores=8, parallel=True)
def process_large_dataset(chunk_size=10000):
    """Process data in chunks to optimize memory usage."""
    import numpy as np

    results = []
    for chunk_id in range(100):  # Parallelized
        # Generate chunk on remote (not transfer)
        chunk = np.random.random(chunk_size)
        result = np.mean(chunk ** 2)  # Efficient NumPy
        results.append(result)

    return np.mean(results)  # Return summary, not raw data
            '''
        },
        'parallelization': {
            'title': 'Parallelization Strategy',
            'practices': [
                "Identify embarrassingly parallel components",
                "Minimize shared state between parallel tasks",
                "Use appropriate chunk sizes for load balancing",
                "Avoid fine-grained parallelism with high overhead",
                "Consider communication costs in distributed algorithms",
                "Test parallel efficiency with different core counts"
            ],
            'example': '''
# Good parallelization pattern
@cluster(cores=16, parallel=True)
def parallel_monte_carlo(n_samples=1000000):
    """Monte Carlo with optimal chunk size."""
    import numpy as np

    results = []
    chunk_size = n_samples // 100  # 100 chunks for load balancing

    for chunk in range(100):  # Parallelized across cores
        # Independent computation per chunk
        x = np.random.random(chunk_size)
        y = np.random.random(chunk_size)
        inside = (x**2 + y**2) <= 1
        results.append(np.sum(inside))

    return 4 * sum(results) / n_samples
            '''
        },
        'cluster_optimization': {
            'title': 'Cluster-Specific Optimization',
            'practices': [
                "Choose appropriate partitions/queues for workload",
                "Use job arrays for parameter sweeps",
                "Leverage cluster-specific features (GPUs, fast storage)",
                "Monitor queue times and adjust submission strategy",
                "Use checkpointing for long-running jobs",
                "Clean up temporary files to avoid storage issues"
            ],
            'example': '''
# Cluster-optimized job submission
@cluster(
    cores=32,
    memory="128GB",
    time="12:00:00",
    partition="bigmem",          # Appropriate partition
    array="1-100",              # Parameter sweep
    gres="gpu:2",               # Request GPUs if needed
    cleanup_on_success=True     # Clean temporary files
)
def cluster_optimized_job(params):
    return run_with_checkpointing(params)
            '''
        }
    }

    for category, info in best_practices.items():
        print(f"\n{info['title'].upper()}:")
        for i, practice in enumerate(info['practices'], 1):
            print(f"  {i}. {practice}")
        print(f"\nExample:{info['example']}")

    return best_practices

# Demonstrate performance best practices
perf_practices = demonstrate_performance_best_practices()

2. Security and ReliabilityΒΆ

Best practices for secure and reliable distributed computing:

[ ]:
def demonstrate_security_best_practices():
    """
    Demonstrate security and reliability best practices.
    """

    print("Security and Reliability Best Practices:")
    print("=" * 45)

    security_practices = {
        'authentication': {
            'title': 'Authentication and Access',
            'practices': [
                "Use SSH key authentication, never passwords",
                "Protect private keys with strong passphrases",
                "Use separate keys for different environments",
                "Regularly rotate SSH keys (6-12 months)",
                "Set proper file permissions (600 for private keys)",
                "Use SSH config for consistent settings"
            ],
            'example': '''
# Secure SSH configuration
configure(
    cluster_type="slurm",
    cluster_host="secure-cluster.edu",
    username="researcher",
    key_file="~/.ssh/clustrix_production_key",  # Dedicated key
    port=2222,                                   # Non-standard port
    # Never use password in production
)
            '''
        },
        'data_security': {
            'title': 'Data Security',
            'practices': [
                "Never include secrets or credentials in code",
                "Use environment variables for sensitive data",
                "Encrypt sensitive data before transfer",
                "Clean up temporary files containing sensitive data",
                "Use secure remote directories with proper permissions",
                "Audit data access and transfers"
            ],
            'example': '''
# Secure data handling
@cluster(cores=4, cleanup_on_success=True)
def secure_data_processing(encrypted_data):
    """Process data securely with cleanup."""
    import os
    import tempfile

    # Use environment variable for decryption key
    decryption_key = os.environ.get('DECRYPTION_KEY')
    if not decryption_key:
        raise ValueError("Decryption key not found")

    # Process in temporary location
    with tempfile.TemporaryDirectory() as temp_dir:
        # Decrypt and process
        data = decrypt_data(encrypted_data, decryption_key)
        result = analyze_data(data)

        # Clear sensitive data
        del data, decryption_key

        return result  # Only return non-sensitive results
            '''
        },
        'reliability': {
            'title': 'Reliability and Fault Tolerance',
            'practices': [
                "Implement retry logic for transient failures",
                "Use checkpointing for long-running computations",
                "Validate inputs before expensive computations",
                "Monitor resource usage to avoid exhaustion",
                "Set appropriate timeouts for all operations",
                "Log important events for debugging"
            ],
            'example': '''
# Reliable computation with fault tolerance
@cluster(cores=8, time="04:00:00", backoff_limit=3)
def reliable_computation(data, checkpoint_interval=1000):
    """Computation with checkpointing and validation."""
    import os
    import pickle
    import logging

    # Validate inputs
    if not data or len(data) == 0:
        raise ValueError("Input data is empty")

    # Setup logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    # Check for existing checkpoint
    checkpoint_file = "computation_checkpoint.pkl"
    start_index = 0
    results = []

    if os.path.exists(checkpoint_file):
        with open(checkpoint_file, 'rb') as f:
            checkpoint = pickle.load(f)
            start_index = checkpoint['index']
            results = checkpoint['results']
        logger.info(f"Resuming from checkpoint at index {start_index}")

    # Process with checkpointing
    for i in range(start_index, len(data)):
        try:
            result = expensive_operation(data[i])
            results.append(result)

            # Save checkpoint periodically
            if (i + 1) % checkpoint_interval == 0:
                checkpoint = {'index': i + 1, 'results': results}
                with open(checkpoint_file, 'wb') as f:
                    pickle.dump(checkpoint, f)
                logger.info(f"Checkpoint saved at index {i + 1}")

        except Exception as e:
            logger.error(f"Error at index {i}: {e}")
            # Continue with next item
            results.append(None)

    # Cleanup checkpoint file
    if os.path.exists(checkpoint_file):
        os.unlink(checkpoint_file)

    return {'results': results, 'success_rate': sum(1 for r in results if r is not None) / len(results)}
            '''
        },
        'monitoring': {
            'title': 'Monitoring and Maintenance',
            'practices': [
                "Monitor cluster resource usage regularly",
                "Set up alerts for job failures",
                "Track job completion times and success rates",
                "Monitor disk usage in work directories",
                "Keep logs of cluster operations",
                "Regularly update and patch cluster software"
            ],
            'example': '''
# Computation with monitoring
@cluster(cores=4, time="02:00:00")
def monitored_computation(data):
    """Computation with built-in monitoring."""
    import psutil
    import time
    import logging

    logger = logging.getLogger(__name__)
    start_time = time.time()

    # Log start
    logger.info(f"Starting computation with {len(data)} items")

    # Monitor resources
    initial_memory = psutil.virtual_memory().percent

    try:
        result = process_data(data)

        # Log success
        execution_time = time.time() - start_time
        final_memory = psutil.virtual_memory().percent

        logger.info(f"Computation completed in {execution_time:.2f}s")
        logger.info(f"Memory usage: {initial_memory:.1f}% -> {final_memory:.1f}%")

        return {
            'result': result,
            'execution_time': execution_time,
            'memory_delta': final_memory - initial_memory
        }

    except Exception as e:
        logger.error(f"Computation failed after {time.time() - start_time:.2f}s: {e}")
        raise
            '''
        }
    }

    for category, info in security_practices.items():
        print(f"\n{info['title'].upper()}:")
        for i, practice in enumerate(info['practices'], 1):
            print(f"  {i}. {practice}")
        print(f"\nExample:{info['example']}")

    return security_practices

# Demonstrate security best practices
security_practices = demonstrate_security_best_practices()

SummaryΒΆ

This notebook has demonstrated the complete Clustrix API including:

Core Functions:ΒΆ

  • clustrix.configure() - Configure cluster connections and defaults

  • @cluster decorator - Distributed function execution

  • clustrix.get_config() - Retrieve current configuration

  • ClusterConfig.from_file() - Load configuration from files

Advanced Features:ΒΆ

  • Automatic Parallelization - parallel=True for loop distribution

  • Resource Specification - cores, memory, time limits

  • Environment Management - conda, virtualenv, modules

  • Error Handling - robust error recovery and debugging

  • Performance Monitoring - resource usage tracking

  • Custom Serialization - handling complex objects

Cluster Types Supported:ΒΆ

  • Local - multiprocessing and threading

  • SLURM - HPC workload manager

  • PBS/Torque - batch systems

  • SGE - Sun Grid Engine

  • Kubernetes - containerized execution

  • SSH - direct remote execution

Best Practices Covered:ΒΆ

  • Performance optimization strategies

  • Security and authentication

  • Reliability and fault tolerance

  • Monitoring and debugging

  • Resource management

For more information, see: