PBS/Torque Cluster TutorialΒΆ
This tutorial demonstrates how to use Clustrix with PBS (Portable Batch System) and Torque clusters, commonly used in academic and research computing environments.
PrerequisitesΒΆ
Access to a PBS/Torque cluster
SSH key setup (see SSH Key Setup for Remote Clusters)
Clustrix installed with:
pip install clustrix
Configuration OptionsΒΆ
Option 1: Interactive Widget (Recommended for Jupyter)
For Jupyter notebook users, use the interactive configuration widget:
import clustrix # Auto-loads the magic command
# Use the magic command to open the configuration widget
%%clusterfy
# Interactive widget appears with PBS/Torque templates and GUI configuration
Option 2: Programmatic Configuration
Configure Clustrix programmatically for your PBS cluster:
from clustrix import configure
configure(
cluster_type="pbs",
cluster_host="pbs.university.edu",
username="your_username",
key_file="~/.ssh/pbs_key",
remote_work_dir="/home/your_username/clustrix"
)
PBS Resource SpecificationΒΆ
PBS uses different resource syntax compared to SLURM:
from clustrix import cluster
@cluster(
cores=8, # Number of CPU cores
memory="16GB", # Memory requirement
time="02:00:00", # Wall time (HH:MM:SS)
queue="batch", # PBS queue name
nodes=1, # Number of nodes
ppn=8 # Processors per node (PBS-specific)
)
def pbs_computation():
"""Example computation on PBS cluster."""
import numpy as np
# Create large matrix
size = 5000
matrix_a = np.random.rand(size, size)
matrix_b = np.random.rand(size, size)
# Matrix multiplication
result = np.dot(matrix_a, matrix_b)
return {
'shape': result.shape,
'mean': float(np.mean(result)),
'std': float(np.std(result))
}
# Execute on PBS cluster
result = pbs_computation()
print(f"Matrix computation result: {result}")
Advanced PBS ConfigurationΒΆ
Environment and Queue SetupΒΆ
configure(
cluster_type="pbs",
cluster_host="torque.research.org",
username="researcher",
# PBS-specific settings
default_queue="normal", # Default queue
default_walltime="04:00:00", # Default wall time
# Resource defaults
default_cores=4,
default_memory="8GB",
default_nodes=1,
# Environment setup
environment_variables={
"PBS_O_WORKDIR": "/home/researcher/work",
"OMP_NUM_THREADS": "4"
}
)
Configuration File for PBSΒΆ
Create ~/.clustrix/config.yml:
cluster_type: "pbs"
cluster_host: "pbs.cluster.edu"
username: "researcher"
key_file: "~/.ssh/pbs_key"
remote_work_dir: "/home/researcher/clustrix"
# PBS-specific settings
default_queue: "batch"
default_walltime: "02:00:00"
# Resource defaults
default_cores: 8
default_memory: "16GB"
default_nodes: 1
# Job management
job_poll_interval: 30 # Check job status every 30 seconds
cleanup_on_success: true
PBS Job ExamplesΒΆ
Array-style ProcessingΒΆ
@cluster(cores=4, memory="8GB", queue="batch")
def process_file(file_id, operation="mean"):
"""Process a single file."""
import numpy as np
import time
# Simulate file processing
print(f"Processing file {file_id} with operation: {operation}")
# Generate synthetic data (simulating file loading)
data = np.random.rand(10000, 100) * file_id
if operation == "mean":
result = np.mean(data)
elif operation == "std":
result = np.std(data)
elif operation == "sum":
result = np.sum(data)
else:
result = np.median(data)
# Simulate processing time
time.sleep(1)
return {
'file_id': file_id,
'operation': operation,
'result': float(result),
'data_shape': data.shape
}
# Process multiple files
file_ids = range(1, 11) # Files 1-10
results = []
for file_id in file_ids:
result = process_file(file_id, operation="mean")
results.append(result)
print(f"Processed {len(results)} files")
for r in results[:3]: # Show first 3 results
print(f"File {r['file_id']}: {r['result']:.4f}")
Bioinformatics PipelineΒΆ
@cluster(cores=8, memory="32GB", time="06:00:00", queue="bioqueue")
def analyze_genome_sequence(sequence_id, analysis_params):
"""Analyze a genome sequence."""
import random
import string
# Simulate sequence analysis
print(f"Analyzing sequence {sequence_id}")
# Generate mock sequence
bases = ['A', 'T', 'G', 'C']
sequence_length = analysis_params.get('length', 100000)
sequence = ''.join(random.choices(bases, k=sequence_length))
# Mock analysis results
gc_content = (sequence.count('G') + sequence.count('C')) / len(sequence)
# Simulate finding patterns
patterns_found = []
for i in range(5):
pattern_length = random.randint(5, 10)
pattern = ''.join(random.choices(bases, k=pattern_length))
count = sequence.count(pattern)
if count > 0:
patterns_found.append({
'pattern': pattern,
'count': count,
'frequency': count / (len(sequence) - pattern_length + 1)
})
return {
'sequence_id': sequence_id,
'sequence_length': len(sequence),
'gc_content': gc_content,
'patterns_found': patterns_found,
'analysis_params': analysis_params
}
# Analyze multiple sequences
sequences = [
{'id': 'seq_001', 'params': {'length': 50000}},
{'id': 'seq_002', 'params': {'length': 75000}},
{'id': 'seq_003', 'params': {'length': 100000}}
]
results = []
for seq in sequences:
result = analyze_genome_sequence(seq['id'], seq['params'])
results.append(result)
# Summary statistics
avg_gc = sum(r['gc_content'] for r in results) / len(results)
print(f"Average GC content: {avg_gc:.3f}")
PBS Job ManagementΒΆ
Resource MonitoringΒΆ
@cluster(cores=4, memory="8GB", time="01:00:00")
def resource_intensive_task():
"""Task that monitors its resource usage."""
import psutil
import time
import numpy as np
# Get initial resource info
process = psutil.Process()
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
print(f"Initial memory usage: {initial_memory:.2f} MB")
# Gradually increase memory usage
data_chunks = []
for i in range(10):
# Create 100MB of data
chunk = np.random.rand(100, 1250, 1000) # ~100MB
data_chunks.append(chunk)
current_memory = process.memory_info().rss / 1024 / 1024
print(f"Step {i+1}: Memory usage: {current_memory:.2f} MB")
time.sleep(5) # Wait 5 seconds
# Final computation
total_sum = sum(np.sum(chunk) for chunk in data_chunks)
final_memory = process.memory_info().rss / 1024 / 1024
return {
'initial_memory_mb': initial_memory,
'final_memory_mb': final_memory,
'memory_increase_mb': final_memory - initial_memory,
'computation_result': float(total_sum),
'chunks_processed': len(data_chunks)
}
result = resource_intensive_task()
print(f"Memory increased by: {result['memory_increase_mb']:.2f} MB")
Error Handling and DebuggingΒΆ
Handling PBS-specific ErrorsΒΆ
@cluster(cores=2, memory="4GB", queue="debug")
def debug_function(test_case="success"):
"""Function for testing error handling."""
if test_case == "memory_error":
# Try to allocate too much memory
import numpy as np
huge_array = np.zeros((100000, 100000)) # ~80GB
return "This shouldn't succeed"
elif test_case == "time_limit":
# Exceed time limit
import time
time.sleep(7200) # 2 hours
return "This took too long"
elif test_case == "import_error":
# Missing package
import nonexistent_package
return "This package doesn't exist"
else:
# Successful execution
return f"Test case '{test_case}' completed successfully"
# Test different scenarios
test_cases = ["success", "import_error"] # Start with safe tests
for case in test_cases:
try:
result = debug_function(case)
print(f"β {case}: {result}")
except Exception as e:
print(f"β {case}: {type(e).__name__}: {e}")
Debugging with LogsΒΆ
import logging
logging.basicConfig(level=logging.DEBUG)
from clustrix import configure, cluster
# Enable detailed logging
configure(
cluster_type="pbs",
cluster_host="pbs.cluster.edu",
username="your_user"
)
@cluster(cores=2, memory="4GB")
def logged_function():
"""Function with detailed logging."""
import logging
# Create logger for remote execution
logger = logging.getLogger(__name__)
logger.info("Starting computation")
try:
import numpy as np
data = np.random.rand(1000, 1000)
result = np.mean(data)
logger.info(f"Computation successful: {result}")
return result
except Exception as e:
logger.error(f"Computation failed: {e}")
raise
result = logged_function()
Best Practices for PBSΒΆ
Queue Selection StrategyΒΆ
def select_pbs_queue(cores, memory_gb, time_hours):
"""Select appropriate PBS queue based on resources."""
if time_hours <= 1 and cores <= 4:
return "express" # Fast turnaround for small jobs
elif time_hours <= 4 and cores <= 16:
return "normal" # Standard queue
elif time_hours <= 24:
return "long" # Long-running jobs
elif cores > 32:
return "bigmem" # High-memory/high-core jobs
else:
return "batch" # Default fallback
# Use dynamic queue selection
cores = 8
memory_gb = 32
time_hours = 6
selected_queue = select_pbs_queue(cores, memory_gb, time_hours)
@cluster(cores=cores, memory=f"{memory_gb}GB",
time=f"{time_hours:02d}:00:00", queue=selected_queue)
def adaptive_computation():
return "Computation with optimal queue selection"
Efficient Data HandlingΒΆ
@cluster(cores=4, memory="16GB", time="03:00:00")
def efficient_data_processing(chunk_size=1000):
"""Process data in chunks to manage memory."""
import numpy as np
total_sum = 0
chunk_count = 0
# Process data in chunks to avoid memory issues
for i in range(100): # 100 chunks
# Generate chunk
chunk = np.random.rand(chunk_size, chunk_size)
# Process chunk
chunk_sum = np.sum(chunk)
total_sum += chunk_sum
chunk_count += 1
# Clear memory
del chunk
if i % 10 == 0:
print(f"Processed {i+1} chunks")
return {
'total_sum': float(total_sum),
'chunks_processed': chunk_count,
'average_chunk_sum': float(total_sum / chunk_count)
}
result = efficient_data_processing()
print(f"Processed {result['chunks_processed']} chunks efficiently")
Complete PBS ExampleΒΆ
Scientific Computing WorkflowΒΆ
from clustrix import configure, cluster
import numpy as np
# Configure PBS cluster
configure(
cluster_type="pbs",
cluster_host="pbs.research.edu",
username="scientist",
remote_work_dir="/home/scientist/clustrix",
# PBS-specific settings
default_queue="normal",
default_walltime="04:00:00",
# Default resources
default_cores=8,
default_memory="16GB",
# Environment
environment_variables={
"OMP_NUM_THREADS": "8",
"TMPDIR": "/tmp"
}
)
@cluster(cores=16, memory="32GB", time="06:00:00", queue="compute")
def monte_carlo_integration(n_samples, dimensions):
"""Monte Carlo integration in high dimensions."""
import numpy as np
import time
start_time = time.time()
def integrand(x):
"""Function to integrate: exp(-sum(x^2))"""
return np.exp(-np.sum(x**2, axis=-1))
# Generate random samples in [-1, 1]^dimensions
samples = np.random.uniform(-1, 1, (n_samples, dimensions))
# Evaluate integrand
values = integrand(samples)
# Monte Carlo estimate
volume = 2**dimensions # Volume of [-1,1]^d
integral_estimate = volume * np.mean(values)
error_estimate = volume * np.std(values) / np.sqrt(n_samples)
end_time = time.time()
return {
'dimensions': dimensions,
'n_samples': n_samples,
'integral_estimate': float(integral_estimate),
'error_estimate': float(error_estimate),
'computation_time': end_time - start_time,
'samples_per_second': n_samples / (end_time - start_time)
}
# Run integration for different dimensions
dimensions_list = [2, 4, 6, 8]
n_samples = 1000000
results = []
for dim in dimensions_list:
print(f"Computing {dim}D integral...")
result = monte_carlo_integration(n_samples, dim)
results.append(result)
print(f" Result: {result['integral_estimate']:.6f} Β± {result['error_estimate']:.6f}")
print(f" Time: {result['computation_time']:.2f}s")
# Analysis
print("\nSummary:")
for r in results:
efficiency = r['samples_per_second'] / 1000 # K samples/sec
print(f"{r['dimensions']}D: {r['integral_estimate']:.4f} ({efficiency:.1f}K samples/s)")
This tutorial provides comprehensive coverage of using Clustrix with PBS/Torque clusters, including resource specification, job management, and best practices for scientific computing workloads.