SLURM Cluster Tutorial¶

Open In Colab

This tutorial demonstrates how to use Clustrix with SLURM (Simple Linux Utility for Resource Management) clusters. SLURM is one of the most popular workload managers for HPC clusters.

Prerequisites¶

  • Access to a SLURM cluster

  • SSH key configured for the cluster

  • Clustrix installed: pip install clustrix

Installation and Setup¶

First, install Clustrix if you haven’t already:

[ ]:
# Install Clustrix (uncomment if needed)
# !pip install clustrix

import clustrix
from clustrix import cluster, configure
import numpy as np
import time

Basic SLURM Configuration¶

Configure Clustrix to connect to your SLURM cluster:

[ ]:
# Configure for SLURM cluster
configure(
    cluster_type="slurm",
    cluster_host="your-slurm-cluster.edu",  # Replace with your cluster hostname
    username="your-username",               # Replace with your username
    key_file="~/.ssh/id_rsa",              # Path to your SSH key

    # Default resource requirements
    default_cores=4,
    default_memory="8GB",
    default_time="01:00:00",
    default_partition="normal",             # Replace with your default partition

    # Remote work directory
    remote_work_dir="/scratch/your-username/clustrix",  # Adjust for your cluster

    # Optional: Load modules on the cluster
    module_loads=["python/3.9", "gcc/9.3.0"],

    # Cleanup settings
    cleanup_on_success=True,
    max_parallel_jobs=20
)

print("SLURM cluster configured successfully!")

Example 1: Simple Mathematical Computation¶

Let’s start with a basic example that performs a mathematical computation on the cluster:

[ ]:
@cluster(cores=2, memory="4GB", time="00:10:00")
def calculate_pi_monte_carlo(n_samples=1000000):
    """
    Calculate pi using Monte Carlo method.
    This will run on the SLURM cluster.
    """
    import numpy as np

    # Generate random points
    x = np.random.uniform(-1, 1, n_samples)
    y = np.random.uniform(-1, 1, n_samples)

    # Check if points are inside unit circle
    inside_circle = (x**2 + y**2) <= 1

    # Estimate pi
    pi_estimate = 4 * np.sum(inside_circle) / n_samples

    return {
        'pi_estimate': pi_estimate,
        'n_samples': n_samples,
        'error': abs(pi_estimate - np.pi)
    }

# Execute on cluster (this will submit a SLURM job)
result = calculate_pi_monte_carlo(5000000)
print(f"Pi estimate: {result['pi_estimate']:.6f}")
print(f"Error: {result['error']:.6f}")
print(f"Samples used: {result['n_samples']:,}")

Example 2: Machine Learning Model Training¶

Train a machine learning model with specific resource requirements:

[ ]:
@cluster(
    cores=8,
    memory="32GB",
    time="02:00:00",
    partition="gpu",  # Use GPU partition if available
    gres="gpu:1"      # Request 1 GPU (SLURM-specific)
)
def train_random_forest(n_samples=100000, n_features=50, n_estimators=200):
    """
    Train a Random Forest model on synthetic data.
    """
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.datasets import make_classification
    from sklearn.model_selection import train_test_split, cross_val_score
    from sklearn.metrics import accuracy_score
    import numpy as np

    print(f"Generating dataset with {n_samples:,} samples and {n_features} features...")

    # Generate synthetic dataset
    X, y = make_classification(
        n_samples=n_samples,
        n_features=n_features,
        n_informative=int(n_features * 0.7),
        n_redundant=int(n_features * 0.2),
        n_clusters_per_class=2,
        random_state=42
    )

    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    print(f"Training Random Forest with {n_estimators} estimators...")

    # Train model
    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=20,
        min_samples_split=5,
        n_jobs=-1,  # Use all available cores
        random_state=42
    )

    model.fit(X_train, y_train)

    # Evaluate model
    train_accuracy = accuracy_score(y_train, model.predict(X_train))
    test_accuracy = accuracy_score(y_test, model.predict(X_test))

    # Cross-validation
    cv_scores = cross_val_score(model, X, y, cv=5, n_jobs=-1)

    return {
        'train_accuracy': train_accuracy,
        'test_accuracy': test_accuracy,
        'cv_mean': np.mean(cv_scores),
        'cv_std': np.std(cv_scores),
        'feature_importance': model.feature_importances_.tolist(),
        'n_samples': n_samples,
        'n_features': n_features,
        'n_estimators': n_estimators
    }

# Train model on cluster
ml_result = train_random_forest(n_samples=50000, n_features=30, n_estimators=100)

print(f"Training Accuracy: {ml_result['train_accuracy']:.4f}")
print(f"Test Accuracy: {ml_result['test_accuracy']:.4f}")
print(f"Cross-validation: {ml_result['cv_mean']:.4f} ± {ml_result['cv_std']:.4f}")

Example 3: Parallel Data Processing with Automatic Loop Distribution¶

Process multiple data chunks in parallel using Clustrix’s automatic loop parallelization:

[ ]:
@cluster(
    cores=16,
    memory="64GB",
    time="01:30:00",
    parallel=True  # Enable automatic loop parallelization
)
def process_data_chunks(chunk_size=10000, num_chunks=20):
    """
    Process multiple data chunks in parallel.
    The for loop will be automatically distributed across cores.
    """
    import numpy as np
    from scipy import stats

    results = []

    # This loop will be automatically parallelized by Clustrix
    for chunk_id in range(num_chunks):
        # Generate chunk data with different random seed
        np.random.seed(chunk_id * 42)
        data = np.random.exponential(scale=2.0, size=chunk_size)

        # Perform statistical analysis on chunk
        chunk_stats = {
            'chunk_id': chunk_id,
            'mean': np.mean(data),
            'std': np.std(data),
            'median': np.median(data),
            'skewness': stats.skew(data),
            'kurtosis': stats.kurtosis(data),
            'min': np.min(data),
            'max': np.max(data),
            'percentile_95': np.percentile(data, 95)
        }

        results.append(chunk_stats)

    # Aggregate results
    overall_stats = {
        'num_chunks': len(results),
        'total_samples': num_chunks * chunk_size,
        'mean_of_means': np.mean([r['mean'] for r in results]),
        'std_of_means': np.std([r['mean'] for r in results]),
        'chunk_results': results
    }

    return overall_stats

# Process data chunks in parallel
parallel_result = process_data_chunks(chunk_size=5000, num_chunks=10)

print(f"Processed {parallel_result['num_chunks']} chunks")
print(f"Total samples: {parallel_result['total_samples']:,}")
print(f"Mean of chunk means: {parallel_result['mean_of_means']:.4f}")
print(f"Std of chunk means: {parallel_result['std_of_means']:.4f}")

# Display first few chunk results
print("\nFirst 3 chunk results:")
for i, chunk in enumerate(parallel_result['chunk_results'][:3]):
    print(f"  Chunk {chunk['chunk_id']}: mean={chunk['mean']:.3f}, std={chunk['std']:.3f}")

Example 4: Scientific Computing - Numerical Integration¶

Perform numerical integration using high-performance computing resources:

[ ]:
@cluster(
    cores=32,
    memory="128GB",
    time="03:00:00",
    partition="bigmem"  # Use high-memory partition
)
def numerical_integration_adaptive(function_type="gaussian", intervals=1000000, precision_target=1e-8):
    """
    Perform high-precision numerical integration using adaptive methods.
    """
    import numpy as np
    from scipy import integrate
    import math

    def gaussian_function(x):
        """Standard Gaussian function"""
        return np.exp(-x**2 / 2) / np.sqrt(2 * np.pi)

    def oscillatory_function(x):
        """Highly oscillatory function"""
        return np.sin(100 * x) * np.exp(-x**2)

    def polynomial_function(x):
        """High-degree polynomial"""
        return x**10 * np.exp(-x)

    # Select function based on type
    functions = {
        "gaussian": (gaussian_function, -5, 5, math.erf(5/np.sqrt(2)) - math.erf(-5/np.sqrt(2))),
        "oscillatory": (oscillatory_function, -2, 2, None),  # No analytical solution
        "polynomial": (polynomial_function, 0, 10, math.gamma(11))  # Analytical: 10!
    }

    if function_type not in functions:
        raise ValueError(f"Unknown function type: {function_type}")

    func, a, b, analytical = functions[function_type]

    print(f"Integrating {function_type} function from {a} to {b}...")
    print(f"Target precision: {precision_target}")

    # High-precision adaptive integration
    result, error = integrate.quad(
        func, a, b,
        epsabs=precision_target,
        epsrel=precision_target,
        limit=intervals
    )

    # Monte Carlo integration for comparison
    n_mc = 10000000  # 10 million samples
    x_mc = np.random.uniform(a, b, n_mc)
    y_mc = func(x_mc)
    mc_result = (b - a) * np.mean(y_mc)
    mc_error = (b - a) * np.std(y_mc) / np.sqrt(n_mc)

    integration_result = {
        'function_type': function_type,
        'integration_bounds': [a, b],
        'adaptive_result': result,
        'adaptive_error': error,
        'monte_carlo_result': mc_result,
        'monte_carlo_error': mc_error,
        'precision_target': precision_target,
        'mc_samples': n_mc
    }

    if analytical is not None:
        integration_result['analytical_result'] = analytical
        integration_result['adaptive_vs_analytical'] = abs(result - analytical)
        integration_result['mc_vs_analytical'] = abs(mc_result - analytical)

    return integration_result

# Perform numerical integration
integration_results = []

for func_type in ["gaussian", "polynomial", "oscillatory"]:
    result = numerical_integration_adaptive(func_type, precision_target=1e-10)
    integration_results.append(result)

    print(f"\n{func_type.upper()} FUNCTION INTEGRATION:")
    print(f"Adaptive result: {result['adaptive_result']:.10f} ± {result['adaptive_error']:.2e}")
    print(f"Monte Carlo result: {result['monte_carlo_result']:.10f} ± {result['monte_carlo_error']:.2e}")

    if 'analytical_result' in result:
        print(f"Analytical result: {result['analytical_result']:.10f}")
        print(f"Adaptive error vs analytical: {result['adaptive_vs_analytical']:.2e}")
        print(f"MC error vs analytical: {result['mc_vs_analytical']:.2e}")

Example 5: Bioinformatics - Sequence Analysis¶

Analyze biological sequences using cluster computing:

[ ]:
@cluster(
    cores=24,
    memory="96GB",
    time="04:00:00",
    partition="bioqueue"  # Specialized bioinformatics partition
)
def analyze_genome_sequences(num_sequences=1000, sequence_length=10000):
    """
    Analyze synthetic genome sequences for various biological properties.
    """
    import numpy as np
    import random
    from collections import Counter
    import re

    # DNA bases
    bases = ['A', 'T', 'G', 'C']

    # Common biological motifs
    motifs = {
        'CpG_sites': 'CG',
        'TATA_box': 'TATAAA',
        'start_codon': 'ATG',
        'stop_codons': ['TAA', 'TAG', 'TGA'],
        'poly_A': 'AAAAAAA',  # 7 consecutive A's
        'GC_rich': 'GCGCGC'
    }

    def generate_sequence(length, gc_content=0.5):
        """Generate a random DNA sequence with specified GC content"""
        # Adjust probabilities for GC content
        gc_prob = gc_content / 2  # Equal prob for G and C
        at_prob = (1 - gc_content) / 2  # Equal prob for A and T

        probs = [at_prob, at_prob, gc_prob, gc_prob]  # A, T, G, C
        return ''.join(np.random.choice(bases, size=length, p=probs))

    def analyze_sequence(sequence):
        """Analyze a single sequence for biological properties"""
        # Basic composition
        composition = Counter(sequence)
        total_bases = len(sequence)

        gc_content = (composition['G'] + composition['C']) / total_bases
        at_content = (composition['A'] + composition['T']) / total_bases

        # Motif analysis
        motif_counts = {}
        motif_counts['CpG_sites'] = len(re.findall(motifs['CpG_sites'], sequence))
        motif_counts['TATA_boxes'] = len(re.findall(motifs['TATA_box'], sequence))
        motif_counts['start_codons'] = len(re.findall(motifs['start_codon'], sequence))
        motif_counts['poly_A_signals'] = len(re.findall(motifs['poly_A'], sequence))
        motif_counts['GC_rich_regions'] = len(re.findall(motifs['GC_rich'], sequence))

        # Stop codons (any of the three)
        stop_codon_count = sum(len(re.findall(codon, sequence)) for codon in motifs['stop_codons'])
        motif_counts['stop_codons'] = stop_codon_count

        # Calculate complexity (entropy)
        entropy = -sum((count/total_bases) * np.log2(count/total_bases)
                      for count in composition.values() if count > 0)

        # Find longest homopolymer runs
        max_runs = {}
        for base in bases:
            runs = re.findall(f'{base}+', sequence)
            max_runs[f'max_{base}_run'] = max(len(run) for run in runs) if runs else 0

        return {
            'length': total_bases,
            'gc_content': gc_content,
            'at_content': at_content,
            'base_composition': dict(composition),
            'entropy': entropy,
            'motif_counts': motif_counts,
            'max_homopolymer_runs': max_runs
        }

    print(f"Generating and analyzing {num_sequences:,} sequences of length {sequence_length:,}...")

    # Generate sequences with varying GC content
    gc_contents = np.random.uniform(0.3, 0.7, num_sequences)  # Realistic range

    sequence_analyses = []

    for i, gc_content in enumerate(gc_contents):
        if i % 100 == 0:
            print(f"Analyzing sequence {i+1}/{num_sequences}...")

        sequence = generate_sequence(sequence_length, gc_content)
        analysis = analyze_sequence(sequence)
        analysis['target_gc_content'] = gc_content
        analysis['sequence_id'] = i
        sequence_analyses.append(analysis)

    # Aggregate statistics
    gc_contents_actual = [s['gc_content'] for s in sequence_analyses]
    entropies = [s['entropy'] for s in sequence_analyses]

    # Motif statistics
    all_motif_counts = {motif: [s['motif_counts'][motif] for s in sequence_analyses]
                       for motif in sequence_analyses[0]['motif_counts'].keys()}

    aggregate_results = {
        'num_sequences_analyzed': len(sequence_analyses),
        'total_bases_analyzed': len(sequence_analyses) * sequence_length,
        'gc_content_stats': {
            'mean': np.mean(gc_contents_actual),
            'std': np.std(gc_contents_actual),
            'min': np.min(gc_contents_actual),
            'max': np.max(gc_contents_actual)
        },
        'entropy_stats': {
            'mean': np.mean(entropies),
            'std': np.std(entropies),
            'min': np.min(entropies),
            'max': np.max(entropies)
        },
        'motif_statistics': {
            motif: {
                'total_found': sum(counts),
                'mean_per_sequence': np.mean(counts),
                'std_per_sequence': np.std(counts),
                'sequences_with_motif': sum(1 for c in counts if c > 0)
            } for motif, counts in all_motif_counts.items()
        },
        'individual_analyses': sequence_analyses[:10]  # Return first 10 for inspection
    }

    return aggregate_results

# Analyze genome sequences
genome_results = analyze_genome_sequences(num_sequences=500, sequence_length=5000)

print(f"\nGENOME SEQUENCE ANALYSIS COMPLETE")
print(f"Sequences analyzed: {genome_results['num_sequences_analyzed']:,}")
print(f"Total bases: {genome_results['total_bases_analyzed']:,}")

print("\nGC Content Statistics:")
gc_stats = genome_results['gc_content_stats']
print(f"  Mean: {gc_stats['mean']:.3f} ± {gc_stats['std']:.3f}")
print(f"  Range: {gc_stats['min']:.3f} - {gc_stats['max']:.3f}")

print("\nSequence Complexity (Entropy):")
entropy_stats = genome_results['entropy_stats']
print(f"  Mean: {entropy_stats['mean']:.3f} ± {entropy_stats['std']:.3f}")
print(f"  Range: {entropy_stats['min']:.3f} - {entropy_stats['max']:.3f}")

print("\nMotif Analysis:")
for motif, stats in genome_results['motif_statistics'].items():
    print(f"  {motif}: {stats['total_found']} total, "
          f"{stats['mean_per_sequence']:.1f}±{stats['std_per_sequence']:.1f} per sequence, "
          f"{stats['sequences_with_motif']} sequences contain motif")

Advanced SLURM Features¶

Job Arrays for Parameter Sweeps¶

Use SLURM job arrays to efficiently run parameter sweeps:

[ ]:
@cluster(
    cores=4,
    memory="16GB",
    time="00:30:00",
    array="1-10"  # SLURM job array with 10 tasks
)
def parameter_sweep_simulation(base_params):
    """
    Run simulation with parameter variations using SLURM job arrays.
    Each array task will run with different parameters.
    """
    import os
    import numpy as np

    # Get SLURM array task ID
    task_id = int(os.environ.get('SLURM_ARRAY_TASK_ID', '1'))

    # Define parameter variations
    learning_rates = np.logspace(-4, -1, 10)  # 10 different learning rates
    learning_rate = learning_rates[task_id - 1]  # SLURM arrays start from 1

    # Update parameters
    params = base_params.copy()
    params['learning_rate'] = learning_rate
    params['task_id'] = task_id

    print(f"Task {task_id}: Running with learning_rate = {learning_rate:.6f}")

    # Simulate training process
    np.random.seed(task_id * 42)  # Reproducible but different per task

    losses = []
    current_loss = 10.0  # Starting loss

    for epoch in range(params['epochs']):
        # Simulate gradient descent
        gradient = np.random.normal(0, 0.1) + 0.1 * current_loss
        current_loss -= learning_rate * gradient
        current_loss = max(0.01, current_loss)  # Prevent negative loss
        losses.append(current_loss)

    final_loss = losses[-1]
    convergence_epoch = next((i for i, loss in enumerate(losses) if loss < 0.1), len(losses))

    return {
        'task_id': task_id,
        'learning_rate': learning_rate,
        'final_loss': final_loss,
        'convergence_epoch': convergence_epoch,
        'loss_history': losses[::10],  # Every 10th loss for brevity
        'converged': final_loss < 0.1
    }

# Run parameter sweep
base_parameters = {
    'epochs': 1000,
    'batch_size': 32,
    'model_size': 'medium'
}

# This will submit a SLURM job array with 10 tasks
sweep_results = parameter_sweep_simulation(base_parameters)

print(f"Parameter sweep completed for task {sweep_results['task_id']}")
print(f"Learning rate: {sweep_results['learning_rate']:.6f}")
print(f"Final loss: {sweep_results['final_loss']:.4f}")
print(f"Converged: {sweep_results['converged']}")
if sweep_results['converged']:
    print(f"Convergence epoch: {sweep_results['convergence_epoch']}")

Monitoring and Debugging¶

Use Clustrix’s built-in monitoring capabilities:

[ ]:
from clustrix import ClusterExecutor

# Get the configured executor
config = clustrix.get_config()
executor = ClusterExecutor(config)

# Check cluster connectivity
try:
    executor.connect()
    print("✓ Successfully connected to SLURM cluster")

    # Test basic command execution
    stdout, stderr = executor._execute_command("sinfo --version")
    print(f"✓ SLURM version: {stdout.strip()}")

    # Check available partitions
    stdout, stderr = executor._execute_command("sinfo -h -o '%P %A %l'")
    print("\nAvailable partitions:")
    for line in stdout.strip().split('\n')[:5]:  # Show first 5 partitions
        parts = line.split()
        if len(parts) >= 3:
            partition, avail, timelimit = parts[0], parts[1], parts[2]
            print(f"  {partition}: {avail} nodes available, time limit: {timelimit}")

    executor.disconnect()
    print("\n✓ Connection test completed successfully")

except Exception as e:
    print(f"✗ Connection failed: {e}")
    print("Please check your cluster configuration and SSH setup")

Configuration Best Practices¶

1. Environment-Specific Configuration¶

Create different configurations for different environments:

[ ]:
# Development configuration (smaller resources)
dev_config = {
    'cluster_type': 'slurm',
    'cluster_host': 'dev-cluster.university.edu',
    'username': 'your-username',
    'default_cores': 2,
    'default_memory': '4GB',
    'default_time': '00:15:00',
    'default_partition': 'debug',
    'max_parallel_jobs': 5
}

# Production configuration (larger resources)
prod_config = {
    'cluster_type': 'slurm',
    'cluster_host': 'hpc-cluster.university.edu',
    'username': 'your-username',
    'default_cores': 16,
    'default_memory': '64GB',
    'default_time': '04:00:00',
    'default_partition': 'normal',
    'max_parallel_jobs': 50
}

# Choose configuration based on environment
import os
environment = os.environ.get('CLUSTRIX_ENV', 'development')

if environment == 'production':
    clustrix.configure(**prod_config)
    print("Configured for production environment")
else:
    clustrix.configure(**dev_config)
    print("Configured for development environment")

2. Resource Estimation Guidelines¶

Guidelines for choosing appropriate resources:

[ ]:
def estimate_resources(task_type, data_size_mb, complexity='medium'):
    """
    Estimate computational resources needed for different task types.
    """

    base_configs = {
        'data_processing': {
            'cores': max(2, min(16, data_size_mb // 100)),
            'memory_gb': max(4, min(64, data_size_mb // 10)),
            'time_hours': max(0.5, min(8, data_size_mb / 1000))
        },
        'machine_learning': {
            'cores': max(4, min(32, data_size_mb // 50)),
            'memory_gb': max(8, min(128, data_size_mb // 5)),
            'time_hours': max(1, min(12, data_size_mb / 500))
        },
        'simulation': {
            'cores': max(8, min(64, data_size_mb // 25)),
            'memory_gb': max(16, min(256, data_size_mb // 2)),
            'time_hours': max(2, min(24, data_size_mb / 100))
        },
        'bioinformatics': {
            'cores': max(4, min(24, data_size_mb // 20)),
            'memory_gb': max(16, min(128, data_size_mb // 2)),
            'time_hours': max(1, min(16, data_size_mb / 200))
        }
    }

    if task_type not in base_configs:
        raise ValueError(f"Unknown task type: {task_type}")

    config = base_configs[task_type].copy()

    # Adjust for complexity
    complexity_multipliers = {
        'low': 0.7,
        'medium': 1.0,
        'high': 1.5,
        'very_high': 2.0
    }

    multiplier = complexity_multipliers.get(complexity, 1.0)

    config['cores'] = int(config['cores'] * multiplier)
    config['memory_gb'] = int(config['memory_gb'] * multiplier)
    config['time_hours'] = config['time_hours'] * multiplier

    # Format time as HH:MM:SS
    hours = int(config['time_hours'])
    minutes = int((config['time_hours'] - hours) * 60)
    config['time_formatted'] = f"{hours:02d}:{minutes:02d}:00"

    return config

# Example usage
examples = [
    ('machine_learning', 1000, 'high'),
    ('data_processing', 5000, 'medium'),
    ('simulation', 100, 'very_high'),
    ('bioinformatics', 2000, 'high')
]

print("Resource Estimation Examples:")
print("=" * 80)

for task_type, data_size, complexity in examples:
    resources = estimate_resources(task_type, data_size, complexity)
    print(f"\n{task_type.replace('_', ' ').title()} ({data_size} MB, {complexity} complexity):")
    print(f"  Cores: {resources['cores']}")
    print(f"  Memory: {resources['memory_gb']} GB")
    print(f"  Time: {resources['time_formatted']} ({resources['time_hours']:.1f} hours)")

Summary¶

This tutorial covered:

  1. Basic SLURM Configuration - Setting up Clustrix for SLURM clusters

  2. Simple Computations - Monte Carlo methods and mathematical functions

  3. Machine Learning - Training models with GPU support

  4. Parallel Processing - Automatic loop distribution across cores

  5. Scientific Computing - High-precision numerical integration

  6. Bioinformatics - Genome sequence analysis

  7. Advanced Features - Job arrays and parameter sweeps

  8. Monitoring - Connection testing and debugging

  9. Best Practices - Resource estimation and configuration management

Key Takeaways:¶

  • Resource Planning: Always estimate resources based on your data size and complexity

  • Partition Selection: Choose appropriate SLURM partitions for your workload

  • Time Limits: Set realistic time limits with some buffer for completion

  • Memory Management: Monitor memory usage and adjust accordingly

  • Parallel Efficiency: Use automatic parallelization for loop-heavy computations

  • Error Handling: Always test connectivity and handle failures gracefully

Next Steps:¶

For more information, visit the Clustrix Documentation.