SLURM Cluster Tutorial¶
This tutorial demonstrates how to use Clustrix with SLURM (Simple Linux Utility for Resource Management) clusters. SLURM is one of the most popular workload managers for HPC clusters.
Prerequisites¶
Access to a SLURM cluster
SSH key configured for the cluster
Clustrix installed:
pip install clustrix
Installation and Setup¶
First, install Clustrix if you havenât already:
[ ]:
# Install Clustrix (uncomment if needed)
# !pip install clustrix
import clustrix
from clustrix import cluster, configure
import numpy as np
import time
Basic SLURM Configuration¶
Configure Clustrix to connect to your SLURM cluster:
[ ]:
# Configure for SLURM cluster
configure(
cluster_type="slurm",
cluster_host="your-slurm-cluster.edu", # Replace with your cluster hostname
username="your-username", # Replace with your username
key_file="~/.ssh/id_rsa", # Path to your SSH key
# Default resource requirements
default_cores=4,
default_memory="8GB",
default_time="01:00:00",
default_partition="normal", # Replace with your default partition
# Remote work directory
remote_work_dir="/scratch/your-username/clustrix", # Adjust for your cluster
# Optional: Load modules on the cluster
module_loads=["python/3.9", "gcc/9.3.0"],
# Cleanup settings
cleanup_on_success=True,
max_parallel_jobs=20
)
print("SLURM cluster configured successfully!")
Example 1: Simple Mathematical Computation¶
Letâs start with a basic example that performs a mathematical computation on the cluster:
[ ]:
@cluster(cores=2, memory="4GB", time="00:10:00")
def calculate_pi_monte_carlo(n_samples=1000000):
"""
Calculate pi using Monte Carlo method.
This will run on the SLURM cluster.
"""
import numpy as np
# Generate random points
x = np.random.uniform(-1, 1, n_samples)
y = np.random.uniform(-1, 1, n_samples)
# Check if points are inside unit circle
inside_circle = (x**2 + y**2) <= 1
# Estimate pi
pi_estimate = 4 * np.sum(inside_circle) / n_samples
return {
'pi_estimate': pi_estimate,
'n_samples': n_samples,
'error': abs(pi_estimate - np.pi)
}
# Execute on cluster (this will submit a SLURM job)
result = calculate_pi_monte_carlo(5000000)
print(f"Pi estimate: {result['pi_estimate']:.6f}")
print(f"Error: {result['error']:.6f}")
print(f"Samples used: {result['n_samples']:,}")
Example 2: Machine Learning Model Training¶
Train a machine learning model with specific resource requirements:
[ ]:
@cluster(
cores=8,
memory="32GB",
time="02:00:00",
partition="gpu", # Use GPU partition if available
gres="gpu:1" # Request 1 GPU (SLURM-specific)
)
def train_random_forest(n_samples=100000, n_features=50, n_estimators=200):
"""
Train a Random Forest model on synthetic data.
"""
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score
import numpy as np
print(f"Generating dataset with {n_samples:,} samples and {n_features} features...")
# Generate synthetic dataset
X, y = make_classification(
n_samples=n_samples,
n_features=n_features,
n_informative=int(n_features * 0.7),
n_redundant=int(n_features * 0.2),
n_clusters_per_class=2,
random_state=42
)
# Split the data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
print(f"Training Random Forest with {n_estimators} estimators...")
# Train model
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=20,
min_samples_split=5,
n_jobs=-1, # Use all available cores
random_state=42
)
model.fit(X_train, y_train)
# Evaluate model
train_accuracy = accuracy_score(y_train, model.predict(X_train))
test_accuracy = accuracy_score(y_test, model.predict(X_test))
# Cross-validation
cv_scores = cross_val_score(model, X, y, cv=5, n_jobs=-1)
return {
'train_accuracy': train_accuracy,
'test_accuracy': test_accuracy,
'cv_mean': np.mean(cv_scores),
'cv_std': np.std(cv_scores),
'feature_importance': model.feature_importances_.tolist(),
'n_samples': n_samples,
'n_features': n_features,
'n_estimators': n_estimators
}
# Train model on cluster
ml_result = train_random_forest(n_samples=50000, n_features=30, n_estimators=100)
print(f"Training Accuracy: {ml_result['train_accuracy']:.4f}")
print(f"Test Accuracy: {ml_result['test_accuracy']:.4f}")
print(f"Cross-validation: {ml_result['cv_mean']:.4f} ± {ml_result['cv_std']:.4f}")
Example 3: Parallel Data Processing with Automatic Loop Distribution¶
Process multiple data chunks in parallel using Clustrixâs automatic loop parallelization:
[ ]:
@cluster(
cores=16,
memory="64GB",
time="01:30:00",
parallel=True # Enable automatic loop parallelization
)
def process_data_chunks(chunk_size=10000, num_chunks=20):
"""
Process multiple data chunks in parallel.
The for loop will be automatically distributed across cores.
"""
import numpy as np
from scipy import stats
results = []
# This loop will be automatically parallelized by Clustrix
for chunk_id in range(num_chunks):
# Generate chunk data with different random seed
np.random.seed(chunk_id * 42)
data = np.random.exponential(scale=2.0, size=chunk_size)
# Perform statistical analysis on chunk
chunk_stats = {
'chunk_id': chunk_id,
'mean': np.mean(data),
'std': np.std(data),
'median': np.median(data),
'skewness': stats.skew(data),
'kurtosis': stats.kurtosis(data),
'min': np.min(data),
'max': np.max(data),
'percentile_95': np.percentile(data, 95)
}
results.append(chunk_stats)
# Aggregate results
overall_stats = {
'num_chunks': len(results),
'total_samples': num_chunks * chunk_size,
'mean_of_means': np.mean([r['mean'] for r in results]),
'std_of_means': np.std([r['mean'] for r in results]),
'chunk_results': results
}
return overall_stats
# Process data chunks in parallel
parallel_result = process_data_chunks(chunk_size=5000, num_chunks=10)
print(f"Processed {parallel_result['num_chunks']} chunks")
print(f"Total samples: {parallel_result['total_samples']:,}")
print(f"Mean of chunk means: {parallel_result['mean_of_means']:.4f}")
print(f"Std of chunk means: {parallel_result['std_of_means']:.4f}")
# Display first few chunk results
print("\nFirst 3 chunk results:")
for i, chunk in enumerate(parallel_result['chunk_results'][:3]):
print(f" Chunk {chunk['chunk_id']}: mean={chunk['mean']:.3f}, std={chunk['std']:.3f}")
Example 4: Scientific Computing - Numerical Integration¶
Perform numerical integration using high-performance computing resources:
[ ]:
@cluster(
cores=32,
memory="128GB",
time="03:00:00",
partition="bigmem" # Use high-memory partition
)
def numerical_integration_adaptive(function_type="gaussian", intervals=1000000, precision_target=1e-8):
"""
Perform high-precision numerical integration using adaptive methods.
"""
import numpy as np
from scipy import integrate
import math
def gaussian_function(x):
"""Standard Gaussian function"""
return np.exp(-x**2 / 2) / np.sqrt(2 * np.pi)
def oscillatory_function(x):
"""Highly oscillatory function"""
return np.sin(100 * x) * np.exp(-x**2)
def polynomial_function(x):
"""High-degree polynomial"""
return x**10 * np.exp(-x)
# Select function based on type
functions = {
"gaussian": (gaussian_function, -5, 5, math.erf(5/np.sqrt(2)) - math.erf(-5/np.sqrt(2))),
"oscillatory": (oscillatory_function, -2, 2, None), # No analytical solution
"polynomial": (polynomial_function, 0, 10, math.gamma(11)) # Analytical: 10!
}
if function_type not in functions:
raise ValueError(f"Unknown function type: {function_type}")
func, a, b, analytical = functions[function_type]
print(f"Integrating {function_type} function from {a} to {b}...")
print(f"Target precision: {precision_target}")
# High-precision adaptive integration
result, error = integrate.quad(
func, a, b,
epsabs=precision_target,
epsrel=precision_target,
limit=intervals
)
# Monte Carlo integration for comparison
n_mc = 10000000 # 10 million samples
x_mc = np.random.uniform(a, b, n_mc)
y_mc = func(x_mc)
mc_result = (b - a) * np.mean(y_mc)
mc_error = (b - a) * np.std(y_mc) / np.sqrt(n_mc)
integration_result = {
'function_type': function_type,
'integration_bounds': [a, b],
'adaptive_result': result,
'adaptive_error': error,
'monte_carlo_result': mc_result,
'monte_carlo_error': mc_error,
'precision_target': precision_target,
'mc_samples': n_mc
}
if analytical is not None:
integration_result['analytical_result'] = analytical
integration_result['adaptive_vs_analytical'] = abs(result - analytical)
integration_result['mc_vs_analytical'] = abs(mc_result - analytical)
return integration_result
# Perform numerical integration
integration_results = []
for func_type in ["gaussian", "polynomial", "oscillatory"]:
result = numerical_integration_adaptive(func_type, precision_target=1e-10)
integration_results.append(result)
print(f"\n{func_type.upper()} FUNCTION INTEGRATION:")
print(f"Adaptive result: {result['adaptive_result']:.10f} ± {result['adaptive_error']:.2e}")
print(f"Monte Carlo result: {result['monte_carlo_result']:.10f} ± {result['monte_carlo_error']:.2e}")
if 'analytical_result' in result:
print(f"Analytical result: {result['analytical_result']:.10f}")
print(f"Adaptive error vs analytical: {result['adaptive_vs_analytical']:.2e}")
print(f"MC error vs analytical: {result['mc_vs_analytical']:.2e}")
Example 5: Bioinformatics - Sequence Analysis¶
Analyze biological sequences using cluster computing:
[ ]:
@cluster(
cores=24,
memory="96GB",
time="04:00:00",
partition="bioqueue" # Specialized bioinformatics partition
)
def analyze_genome_sequences(num_sequences=1000, sequence_length=10000):
"""
Analyze synthetic genome sequences for various biological properties.
"""
import numpy as np
import random
from collections import Counter
import re
# DNA bases
bases = ['A', 'T', 'G', 'C']
# Common biological motifs
motifs = {
'CpG_sites': 'CG',
'TATA_box': 'TATAAA',
'start_codon': 'ATG',
'stop_codons': ['TAA', 'TAG', 'TGA'],
'poly_A': 'AAAAAAA', # 7 consecutive A's
'GC_rich': 'GCGCGC'
}
def generate_sequence(length, gc_content=0.5):
"""Generate a random DNA sequence with specified GC content"""
# Adjust probabilities for GC content
gc_prob = gc_content / 2 # Equal prob for G and C
at_prob = (1 - gc_content) / 2 # Equal prob for A and T
probs = [at_prob, at_prob, gc_prob, gc_prob] # A, T, G, C
return ''.join(np.random.choice(bases, size=length, p=probs))
def analyze_sequence(sequence):
"""Analyze a single sequence for biological properties"""
# Basic composition
composition = Counter(sequence)
total_bases = len(sequence)
gc_content = (composition['G'] + composition['C']) / total_bases
at_content = (composition['A'] + composition['T']) / total_bases
# Motif analysis
motif_counts = {}
motif_counts['CpG_sites'] = len(re.findall(motifs['CpG_sites'], sequence))
motif_counts['TATA_boxes'] = len(re.findall(motifs['TATA_box'], sequence))
motif_counts['start_codons'] = len(re.findall(motifs['start_codon'], sequence))
motif_counts['poly_A_signals'] = len(re.findall(motifs['poly_A'], sequence))
motif_counts['GC_rich_regions'] = len(re.findall(motifs['GC_rich'], sequence))
# Stop codons (any of the three)
stop_codon_count = sum(len(re.findall(codon, sequence)) for codon in motifs['stop_codons'])
motif_counts['stop_codons'] = stop_codon_count
# Calculate complexity (entropy)
entropy = -sum((count/total_bases) * np.log2(count/total_bases)
for count in composition.values() if count > 0)
# Find longest homopolymer runs
max_runs = {}
for base in bases:
runs = re.findall(f'{base}+', sequence)
max_runs[f'max_{base}_run'] = max(len(run) for run in runs) if runs else 0
return {
'length': total_bases,
'gc_content': gc_content,
'at_content': at_content,
'base_composition': dict(composition),
'entropy': entropy,
'motif_counts': motif_counts,
'max_homopolymer_runs': max_runs
}
print(f"Generating and analyzing {num_sequences:,} sequences of length {sequence_length:,}...")
# Generate sequences with varying GC content
gc_contents = np.random.uniform(0.3, 0.7, num_sequences) # Realistic range
sequence_analyses = []
for i, gc_content in enumerate(gc_contents):
if i % 100 == 0:
print(f"Analyzing sequence {i+1}/{num_sequences}...")
sequence = generate_sequence(sequence_length, gc_content)
analysis = analyze_sequence(sequence)
analysis['target_gc_content'] = gc_content
analysis['sequence_id'] = i
sequence_analyses.append(analysis)
# Aggregate statistics
gc_contents_actual = [s['gc_content'] for s in sequence_analyses]
entropies = [s['entropy'] for s in sequence_analyses]
# Motif statistics
all_motif_counts = {motif: [s['motif_counts'][motif] for s in sequence_analyses]
for motif in sequence_analyses[0]['motif_counts'].keys()}
aggregate_results = {
'num_sequences_analyzed': len(sequence_analyses),
'total_bases_analyzed': len(sequence_analyses) * sequence_length,
'gc_content_stats': {
'mean': np.mean(gc_contents_actual),
'std': np.std(gc_contents_actual),
'min': np.min(gc_contents_actual),
'max': np.max(gc_contents_actual)
},
'entropy_stats': {
'mean': np.mean(entropies),
'std': np.std(entropies),
'min': np.min(entropies),
'max': np.max(entropies)
},
'motif_statistics': {
motif: {
'total_found': sum(counts),
'mean_per_sequence': np.mean(counts),
'std_per_sequence': np.std(counts),
'sequences_with_motif': sum(1 for c in counts if c > 0)
} for motif, counts in all_motif_counts.items()
},
'individual_analyses': sequence_analyses[:10] # Return first 10 for inspection
}
return aggregate_results
# Analyze genome sequences
genome_results = analyze_genome_sequences(num_sequences=500, sequence_length=5000)
print(f"\nGENOME SEQUENCE ANALYSIS COMPLETE")
print(f"Sequences analyzed: {genome_results['num_sequences_analyzed']:,}")
print(f"Total bases: {genome_results['total_bases_analyzed']:,}")
print("\nGC Content Statistics:")
gc_stats = genome_results['gc_content_stats']
print(f" Mean: {gc_stats['mean']:.3f} ± {gc_stats['std']:.3f}")
print(f" Range: {gc_stats['min']:.3f} - {gc_stats['max']:.3f}")
print("\nSequence Complexity (Entropy):")
entropy_stats = genome_results['entropy_stats']
print(f" Mean: {entropy_stats['mean']:.3f} ± {entropy_stats['std']:.3f}")
print(f" Range: {entropy_stats['min']:.3f} - {entropy_stats['max']:.3f}")
print("\nMotif Analysis:")
for motif, stats in genome_results['motif_statistics'].items():
print(f" {motif}: {stats['total_found']} total, "
f"{stats['mean_per_sequence']:.1f}±{stats['std_per_sequence']:.1f} per sequence, "
f"{stats['sequences_with_motif']} sequences contain motif")
Advanced SLURM Features¶
Job Arrays for Parameter Sweeps¶
Use SLURM job arrays to efficiently run parameter sweeps:
[ ]:
@cluster(
cores=4,
memory="16GB",
time="00:30:00",
array="1-10" # SLURM job array with 10 tasks
)
def parameter_sweep_simulation(base_params):
"""
Run simulation with parameter variations using SLURM job arrays.
Each array task will run with different parameters.
"""
import os
import numpy as np
# Get SLURM array task ID
task_id = int(os.environ.get('SLURM_ARRAY_TASK_ID', '1'))
# Define parameter variations
learning_rates = np.logspace(-4, -1, 10) # 10 different learning rates
learning_rate = learning_rates[task_id - 1] # SLURM arrays start from 1
# Update parameters
params = base_params.copy()
params['learning_rate'] = learning_rate
params['task_id'] = task_id
print(f"Task {task_id}: Running with learning_rate = {learning_rate:.6f}")
# Simulate training process
np.random.seed(task_id * 42) # Reproducible but different per task
losses = []
current_loss = 10.0 # Starting loss
for epoch in range(params['epochs']):
# Simulate gradient descent
gradient = np.random.normal(0, 0.1) + 0.1 * current_loss
current_loss -= learning_rate * gradient
current_loss = max(0.01, current_loss) # Prevent negative loss
losses.append(current_loss)
final_loss = losses[-1]
convergence_epoch = next((i for i, loss in enumerate(losses) if loss < 0.1), len(losses))
return {
'task_id': task_id,
'learning_rate': learning_rate,
'final_loss': final_loss,
'convergence_epoch': convergence_epoch,
'loss_history': losses[::10], # Every 10th loss for brevity
'converged': final_loss < 0.1
}
# Run parameter sweep
base_parameters = {
'epochs': 1000,
'batch_size': 32,
'model_size': 'medium'
}
# This will submit a SLURM job array with 10 tasks
sweep_results = parameter_sweep_simulation(base_parameters)
print(f"Parameter sweep completed for task {sweep_results['task_id']}")
print(f"Learning rate: {sweep_results['learning_rate']:.6f}")
print(f"Final loss: {sweep_results['final_loss']:.4f}")
print(f"Converged: {sweep_results['converged']}")
if sweep_results['converged']:
print(f"Convergence epoch: {sweep_results['convergence_epoch']}")
Monitoring and Debugging¶
Use Clustrixâs built-in monitoring capabilities:
[ ]:
from clustrix import ClusterExecutor
# Get the configured executor
config = clustrix.get_config()
executor = ClusterExecutor(config)
# Check cluster connectivity
try:
executor.connect()
print("â Successfully connected to SLURM cluster")
# Test basic command execution
stdout, stderr = executor._execute_command("sinfo --version")
print(f"â SLURM version: {stdout.strip()}")
# Check available partitions
stdout, stderr = executor._execute_command("sinfo -h -o '%P %A %l'")
print("\nAvailable partitions:")
for line in stdout.strip().split('\n')[:5]: # Show first 5 partitions
parts = line.split()
if len(parts) >= 3:
partition, avail, timelimit = parts[0], parts[1], parts[2]
print(f" {partition}: {avail} nodes available, time limit: {timelimit}")
executor.disconnect()
print("\nâ Connection test completed successfully")
except Exception as e:
print(f"â Connection failed: {e}")
print("Please check your cluster configuration and SSH setup")
Configuration Best Practices¶
1. Environment-Specific Configuration¶
Create different configurations for different environments:
[ ]:
# Development configuration (smaller resources)
dev_config = {
'cluster_type': 'slurm',
'cluster_host': 'dev-cluster.university.edu',
'username': 'your-username',
'default_cores': 2,
'default_memory': '4GB',
'default_time': '00:15:00',
'default_partition': 'debug',
'max_parallel_jobs': 5
}
# Production configuration (larger resources)
prod_config = {
'cluster_type': 'slurm',
'cluster_host': 'hpc-cluster.university.edu',
'username': 'your-username',
'default_cores': 16,
'default_memory': '64GB',
'default_time': '04:00:00',
'default_partition': 'normal',
'max_parallel_jobs': 50
}
# Choose configuration based on environment
import os
environment = os.environ.get('CLUSTRIX_ENV', 'development')
if environment == 'production':
clustrix.configure(**prod_config)
print("Configured for production environment")
else:
clustrix.configure(**dev_config)
print("Configured for development environment")
2. Resource Estimation Guidelines¶
Guidelines for choosing appropriate resources:
[ ]:
def estimate_resources(task_type, data_size_mb, complexity='medium'):
"""
Estimate computational resources needed for different task types.
"""
base_configs = {
'data_processing': {
'cores': max(2, min(16, data_size_mb // 100)),
'memory_gb': max(4, min(64, data_size_mb // 10)),
'time_hours': max(0.5, min(8, data_size_mb / 1000))
},
'machine_learning': {
'cores': max(4, min(32, data_size_mb // 50)),
'memory_gb': max(8, min(128, data_size_mb // 5)),
'time_hours': max(1, min(12, data_size_mb / 500))
},
'simulation': {
'cores': max(8, min(64, data_size_mb // 25)),
'memory_gb': max(16, min(256, data_size_mb // 2)),
'time_hours': max(2, min(24, data_size_mb / 100))
},
'bioinformatics': {
'cores': max(4, min(24, data_size_mb // 20)),
'memory_gb': max(16, min(128, data_size_mb // 2)),
'time_hours': max(1, min(16, data_size_mb / 200))
}
}
if task_type not in base_configs:
raise ValueError(f"Unknown task type: {task_type}")
config = base_configs[task_type].copy()
# Adjust for complexity
complexity_multipliers = {
'low': 0.7,
'medium': 1.0,
'high': 1.5,
'very_high': 2.0
}
multiplier = complexity_multipliers.get(complexity, 1.0)
config['cores'] = int(config['cores'] * multiplier)
config['memory_gb'] = int(config['memory_gb'] * multiplier)
config['time_hours'] = config['time_hours'] * multiplier
# Format time as HH:MM:SS
hours = int(config['time_hours'])
minutes = int((config['time_hours'] - hours) * 60)
config['time_formatted'] = f"{hours:02d}:{minutes:02d}:00"
return config
# Example usage
examples = [
('machine_learning', 1000, 'high'),
('data_processing', 5000, 'medium'),
('simulation', 100, 'very_high'),
('bioinformatics', 2000, 'high')
]
print("Resource Estimation Examples:")
print("=" * 80)
for task_type, data_size, complexity in examples:
resources = estimate_resources(task_type, data_size, complexity)
print(f"\n{task_type.replace('_', ' ').title()} ({data_size} MB, {complexity} complexity):")
print(f" Cores: {resources['cores']}")
print(f" Memory: {resources['memory_gb']} GB")
print(f" Time: {resources['time_formatted']} ({resources['time_hours']:.1f} hours)")
Summary¶
This tutorial covered:
Basic SLURM Configuration - Setting up Clustrix for SLURM clusters
Simple Computations - Monte Carlo methods and mathematical functions
Machine Learning - Training models with GPU support
Parallel Processing - Automatic loop distribution across cores
Scientific Computing - High-precision numerical integration
Bioinformatics - Genome sequence analysis
Advanced Features - Job arrays and parameter sweeps
Monitoring - Connection testing and debugging
Best Practices - Resource estimation and configuration management
Key Takeaways:¶
Resource Planning: Always estimate resources based on your data size and complexity
Partition Selection: Choose appropriate SLURM partitions for your workload
Time Limits: Set realistic time limits with some buffer for completion
Memory Management: Monitor memory usage and adjust accordingly
Parallel Efficiency: Use automatic parallelization for loop-heavy computations
Error Handling: Always test connectivity and handle failures gracefully
Next Steps:¶
Check out the PBS Tutorial for Torque/PBS clusters
Explore Kubernetes Tutorial for containerized computing
Review the SSH Setup Guide for secure authentication
Read the API Documentation for advanced decorator options
For more information, visit the Clustrix Documentation.