PBS/Torque Cluster Tutorial¶
This tutorial demonstrates how to use Clustrix with PBS (Portable Batch System) and Torque clusters. PBS is widely used in academic and research computing environments.
Prerequisites¶
Access to a PBS/Torque cluster
SSH key configured for the cluster
Clustrix installed:
pip install clustrix
Installation and Setup¶
[ ]:
# Install Clustrix (uncomment if needed)
# !pip install clustrix
import clustrix
from clustrix import cluster, configure
import numpy as np
import pandas as pd
PBS Cluster Configuration¶
Configure Clustrix for your PBS/Torque cluster:
[ ]:
# Configure for PBS cluster
configure(
cluster_type="pbs",
cluster_host="pbs-cluster.university.edu", # Replace with your cluster
username="your-username", # Replace with your username
key_file="~/.ssh/id_rsa", # Path to SSH key
# Default PBS resource requirements
default_cores=4,
default_memory="16GB",
default_time="02:00:00",
default_queue="normal", # PBS queue name
# PBS-specific options
remote_work_dir="/home/your-username/clustrix", # Adjust for your cluster
# Environment setup
module_loads=["python/3.9", "openmpi/4.0"], # Common PBS modules
# Job management
cleanup_on_success=True,
max_parallel_jobs=25
)
print("PBS cluster configured successfully!")
Example 1: Bioinformatics - DNA Sequence Analysis¶
PBS clusters are popular in bioinformatics. Let’s analyze DNA sequences:
[ ]:
@cluster(
cores=8,
memory="32GB",
time="03:00:00",
queue="bioqueue", # Specialized bioinformatics queue
walltime="03:00:00" # PBS uses 'walltime' parameter
)
def analyze_dna_sequences(sequences, analysis_type="comprehensive"):
"""
Comprehensive DNA sequence analysis for bioinformatics research.
"""
import numpy as np
import random
from collections import Counter, defaultdict
import re
import math
def calculate_gc_content(sequence):
"""Calculate GC content percentage"""
gc_count = sequence.count('G') + sequence.count('C')
return (gc_count / len(sequence)) * 100 if sequence else 0
def find_orfs(sequence, min_length=100):
"""Find Open Reading Frames (ORFs)"""
start_codon = 'ATG'
stop_codons = ['TAA', 'TAG', 'TGA']
orfs = []
for frame in range(3): # Check all 3 reading frames
for i in range(frame, len(sequence) - 2, 3):
codon = sequence[i:i+3]
if codon == start_codon:
# Look for stop codon
for j in range(i+3, len(sequence) - 2, 3):
stop_codon = sequence[j:j+3]
if stop_codon in stop_codons:
orf_length = j - i + 3
if orf_length >= min_length:
orfs.append({
'start': i,
'end': j + 3,
'length': orf_length,
'frame': frame + 1,
'sequence': sequence[i:j+3]
})
break
return orfs
def analyze_codon_usage(sequence):
"""Analyze codon usage patterns"""
codons = [sequence[i:i+3] for i in range(0, len(sequence)-2, 3)
if len(sequence[i:i+3]) == 3]
codon_counts = Counter(codons)
# Standard genetic code mapping
genetic_code = {
'TTT': 'F', 'TTC': 'F', 'TTA': 'L', 'TTG': 'L',
'TCT': 'S', 'TCC': 'S', 'TCA': 'S', 'TCG': 'S',
'TAT': 'Y', 'TAC': 'Y', 'TAA': '*', 'TAG': '*',
'TGT': 'C', 'TGC': 'C', 'TGA': '*', 'TGG': 'W',
'CTT': 'L', 'CTC': 'L', 'CTA': 'L', 'CTG': 'L',
'CCT': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P',
'CAT': 'H', 'CAC': 'H', 'CAA': 'Q', 'CAG': 'Q',
'CGT': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R',
'ATT': 'I', 'ATC': 'I', 'ATA': 'I', 'ATG': 'M',
'ACT': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T',
'AAT': 'N', 'AAC': 'N', 'AAA': 'K', 'AAG': 'K',
'AGT': 'S', 'AGC': 'S', 'AGA': 'R', 'AGG': 'R',
'GTT': 'V', 'GTC': 'V', 'GTA': 'V', 'GTG': 'V',
'GCT': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A',
'GAT': 'D', 'GAC': 'D', 'GAA': 'E', 'GAG': 'E',
'GGT': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G'
}
amino_acid_counts = defaultdict(int)
for codon, count in codon_counts.items():
if codon in genetic_code:
amino_acid_counts[genetic_code[codon]] += count
return dict(codon_counts), dict(amino_acid_counts)
def find_tandem_repeats(sequence, min_repeat_length=3, max_repeat_length=20):
"""Find tandem repeats in DNA sequence"""
repeats = []
for repeat_len in range(min_repeat_length, max_repeat_length + 1):
for i in range(len(sequence) - repeat_len * 2 + 1):
motif = sequence[i:i + repeat_len]
count = 1
j = i + repeat_len
while j + repeat_len <= len(sequence) and sequence[j:j + repeat_len] == motif:
count += 1
j += repeat_len
if count >= 3: # At least 3 repeats
repeats.append({
'motif': motif,
'start': i,
'end': j,
'repeat_count': count,
'total_length': j - i
})
return repeats
# Main analysis loop
results = []
for seq_idx, sequence in enumerate(sequences):
print(f"Analyzing sequence {seq_idx + 1}/{len(sequences)} (length: {len(sequence)})...")
# Basic composition analysis
base_composition = Counter(sequence)
gc_content = calculate_gc_content(sequence)
# Advanced analyses
orfs = find_orfs(sequence, min_length=150)
codon_usage, amino_acid_freq = analyze_codon_usage(sequence)
tandem_repeats = find_tandem_repeats(sequence)
# CpG island detection (simplified)
cpg_sites = len(re.findall('CG', sequence))
cpg_density = (cpg_sites / (len(sequence) - 1)) * 100 if len(sequence) > 1 else 0
# Complexity analysis
def calculate_complexity(seq, window_size=50):
complexities = []
for i in range(0, len(seq) - window_size + 1, window_size):
window = seq[i:i + window_size]
counter = Counter(window)
entropy = -sum((count/window_size) * math.log2(count/window_size)
for count in counter.values() if count > 0)
complexities.append(entropy)
return np.mean(complexities) if complexities else 0
complexity = calculate_complexity(sequence)
sequence_result = {
'sequence_id': seq_idx,
'length': len(sequence),
'base_composition': dict(base_composition),
'gc_content': gc_content,
'complexity': complexity,
'orfs_found': len(orfs),
'longest_orf': max(orfs, key=lambda x: x['length'])['length'] if orfs else 0,
'cpg_sites': cpg_sites,
'cpg_density': cpg_density,
'tandem_repeats': len(tandem_repeats),
'repeat_details': tandem_repeats[:5], # Keep first 5 for analysis
'codon_diversity': len(codon_usage),
'amino_acid_diversity': len(amino_acid_freq),
'most_common_amino_acid': max(amino_acid_freq.items(), key=lambda x: x[1])[0] if amino_acid_freq else 'N/A'
}
results.append(sequence_result)
# Aggregate statistics
aggregate_stats = {
'total_sequences': len(results),
'total_base_pairs': sum(r['length'] for r in results),
'average_gc_content': np.mean([r['gc_content'] for r in results]),
'gc_content_std': np.std([r['gc_content'] for r in results]),
'average_complexity': np.mean([r['complexity'] for r in results]),
'total_orfs_found': sum(r['orfs_found'] for r in results),
'total_cpg_sites': sum(r['cpg_sites'] for r in results),
'sequences_with_repeats': sum(1 for r in results if r['tandem_repeats'] > 0),
'individual_results': results
}
return aggregate_stats
# Generate sample DNA sequences for analysis
def generate_realistic_dna(length, gc_content=0.5):
"""Generate realistic DNA sequences with specific GC content"""
bases = ['A', 'T', 'G', 'C']
gc_prob = gc_content / 2
at_prob = (1 - gc_content) / 2
probs = [at_prob, at_prob, gc_prob, gc_prob]
return ''.join(np.random.choice(bases, size=length, p=probs))
# Create test sequences
test_sequences = [
generate_realistic_dna(5000, 0.4), # AT-rich
generate_realistic_dna(8000, 0.6), # GC-rich
generate_realistic_dna(3000, 0.5), # Balanced
generate_realistic_dna(12000, 0.45), # Large AT-rich
generate_realistic_dna(6000, 0.55) # Medium GC-rich
]
# Run analysis on PBS cluster
bio_results = analyze_dna_sequences(test_sequences, analysis_type="comprehensive")
print(f"\nBIOINFORMATICS ANALYSIS COMPLETE")
print(f"Sequences analyzed: {bio_results['total_sequences']}")
print(f"Total base pairs: {bio_results['total_base_pairs']:,}")
print(f"Average GC content: {bio_results['average_gc_content']:.2f}% ± {bio_results['gc_content_std']:.2f}%")
print(f"Total ORFs found: {bio_results['total_orfs_found']}")
print(f"Total CpG sites: {bio_results['total_cpg_sites']}")
print(f"Sequences with tandem repeats: {bio_results['sequences_with_repeats']}/{bio_results['total_sequences']}")
Example 2: Materials Science - Molecular Dynamics Simulation¶
Simulate molecular systems commonly done on PBS clusters:
[ ]:
@cluster(
cores=16,
memory="64GB",
time="06:00:00",
queue="physics",
features="infiniband" # PBS feature for high-speed networking
)
def molecular_dynamics_simulation(n_particles=10000, n_steps=100000, temperature=300.0):
"""
Simplified molecular dynamics simulation for materials science.
"""
import numpy as np
import math
# Physical constants
kb = 1.380649e-23 # Boltzmann constant (J/K)
mass = 1.66054e-27 # Approximate atomic mass (kg)
dt = 1e-15 # Time step (s)
sigma = 3.4e-10 # Lennard-Jones parameter (m)
epsilon = 1.65e-21 # Lennard-Jones parameter (J)
print(f"Starting MD simulation with {n_particles:,} particles for {n_steps:,} steps...")
print(f"Temperature: {temperature} K")
# Initialize system
box_size = (n_particles / 0.8) ** (1/3) * sigma # Density ~0.8
# Random initial positions
positions = np.random.uniform(0, box_size, (n_particles, 3))
# Maxwell-Boltzmann velocity distribution
velocity_scale = math.sqrt(kb * temperature / mass)
velocities = np.random.normal(0, velocity_scale, (n_particles, 3))
# Remove center of mass motion
velocities -= np.mean(velocities, axis=0)
# Storage for analysis
energies = []
temperatures = []
pressures = []
radial_distribution = []
def lennard_jones_force(r):
"""Calculate Lennard-Jones force"""
if r < 1e-12: # Avoid division by zero
return 0
sr6 = (sigma / r) ** 6
sr12 = sr6 ** 2
return 24 * epsilon * (2 * sr12 - sr6) / r
def calculate_forces(pos):
"""Calculate forces on all particles"""
forces = np.zeros_like(pos)
potential_energy = 0
for i in range(n_particles):
for j in range(i + 1, n_particles):
# Distance vector with periodic boundary conditions
dr = pos[j] - pos[i]
dr = dr - box_size * np.round(dr / box_size)
r = np.linalg.norm(dr)
if r < 2.5 * sigma: # Cutoff distance
force_magnitude = lennard_jones_force(r)
force_vector = force_magnitude * dr / r
forces[i] += force_vector
forces[j] -= force_vector
# Potential energy
sr6 = (sigma / r) ** 6
sr12 = sr6 ** 2
potential_energy += 4 * epsilon * (sr12 - sr6)
return forces, potential_energy
def calculate_temperature(vel):
"""Calculate instantaneous temperature"""
kinetic_energy = 0.5 * mass * np.sum(vel ** 2)
return 2 * kinetic_energy / (3 * n_particles * kb)
def calculate_pressure(pos, forces):
"""Calculate pressure using virial theorem"""
kinetic_term = n_particles * kb * calculate_temperature(velocities)
virial = np.sum(positions * forces)
volume = box_size ** 3
return (kinetic_term + virial/3) / volume
# Main simulation loop
for step in range(n_steps):
if step % (n_steps // 10) == 0:
print(f"Step {step:,}/{n_steps:,} ({100*step/n_steps:.1f}%)")
# Calculate forces
forces, potential_energy = calculate_forces(positions)
# Velocity Verlet integration
# Update positions
positions += velocities * dt + 0.5 * forces / mass * dt ** 2
# Apply periodic boundary conditions
positions = positions % box_size
# Update velocities
new_forces, _ = calculate_forces(positions)
velocities += 0.5 * (forces + new_forces) / mass * dt
# Calculate thermodynamic properties
if step % 1000 == 0: # Sample every 1000 steps
kinetic_energy = 0.5 * mass * np.sum(velocities ** 2)
total_energy = kinetic_energy + potential_energy
temp = calculate_temperature(velocities)
pressure = calculate_pressure(positions, new_forces)
energies.append({
'step': step,
'kinetic': kinetic_energy,
'potential': potential_energy,
'total': total_energy
})
temperatures.append(temp)
pressures.append(pressure)
# Simple thermostat (velocity rescaling)
if step % 100 == 0: # Apply every 100 steps
current_temp = calculate_temperature(velocities)
if current_temp > 0:
scaling_factor = math.sqrt(temperature / current_temp)
velocities *= scaling_factor
# Calculate radial distribution function (simplified)
def calculate_rdf(pos, n_bins=100, max_r=None):
if max_r is None:
max_r = box_size / 2
bin_width = max_r / n_bins
rdf = np.zeros(n_bins)
for i in range(min(1000, n_particles)): # Sample subset for efficiency
for j in range(i + 1, min(1000, n_particles)):
dr = pos[j] - pos[i]
dr = dr - box_size * np.round(dr / box_size)
r = np.linalg.norm(dr)
if r < max_r:
bin_index = int(r / bin_width)
if bin_index < n_bins:
rdf[bin_index] += 1
# Normalize
for i in range(n_bins):
r = (i + 0.5) * bin_width
volume = 4 * math.pi * r ** 2 * bin_width
density = n_particles / box_size ** 3
rdf[i] /= (volume * density * 1000) # 1000 particles sampled
return rdf, np.arange(0.5 * bin_width, max_r, bin_width)
rdf_values, rdf_distances = calculate_rdf(positions)
# Final analysis
avg_temperature = np.mean(temperatures[-50:]) # Last 50 samples
avg_pressure = np.mean(pressures[-50:])
final_energy = energies[-1]['total'] if energies else 0
simulation_results = {
'n_particles': n_particles,
'n_steps': n_steps,
'target_temperature': temperature,
'average_temperature': avg_temperature,
'temperature_stability': np.std(temperatures[-50:]),
'average_pressure': avg_pressure,
'final_energy': final_energy,
'box_size': box_size,
'density': n_particles / box_size ** 3,
'energy_trajectory': energies[::10], # Every 10th point
'temperature_trajectory': temperatures[::10],
'pressure_trajectory': pressures[::10],
'radial_distribution': {
'distances': rdf_distances.tolist(),
'values': rdf_values.tolist()
},
'simulation_time_ns': n_steps * dt * 1e9 # Convert to nanoseconds
}
return simulation_results
# Run molecular dynamics simulation
md_results = molecular_dynamics_simulation(
n_particles=5000,
n_steps=50000,
temperature=298.15 # Room temperature
)
print(f"\nMOLECULAR DYNAMICS SIMULATION COMPLETE")
print(f"Particles: {md_results['n_particles']:,}")
print(f"Steps: {md_results['n_steps']:,}")
print(f"Simulation time: {md_results['simulation_time_ns']:.2f} ns")
print(f"Target temperature: {md_results['target_temperature']:.1f} K")
print(f"Average temperature: {md_results['average_temperature']:.1f} K")
print(f"Temperature stability: ±{md_results['temperature_stability']:.1f} K")
print(f"Average pressure: {md_results['average_pressure']:.2e} Pa")
print(f"System density: {md_results['density']:.2e} particles/m³")
Example 3: Environmental Science - Climate Data Analysis¶
Analyze large climate datasets commonly processed on research clusters:
[ ]:
@cluster(
cores=12,
memory="48GB",
time="04:00:00",
queue="climate",
parallel=True # Enable automatic parallelization
)
def analyze_climate_data(years_to_analyze=50, stations_per_year=1000):
"""
Comprehensive climate data analysis for environmental research.
"""
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import random
from scipy import stats
import math
def generate_realistic_climate_data(year, station_id, latitude, longitude):
"""Generate realistic climate data for a station"""
np.random.seed(year * 1000 + station_id) # Reproducible but varied
# Base temperature influenced by latitude
base_temp = 25 - abs(latitude) * 0.6 # Cooler at higher latitudes
# Generate daily data for the year
start_date = datetime(year, 1, 1)
days_in_year = 366 if year % 4 == 0 else 365
daily_data = []
for day in range(days_in_year):
date = start_date + timedelta(days=day)
day_of_year = day + 1
# Seasonal temperature variation
seasonal_temp = base_temp + 15 * math.cos(2 * math.pi * (day_of_year - 172) / 365)
# Add random variation and trends
climate_trend = 0.01 * (year - 1970) # 0.01°C/year warming
daily_temp = seasonal_temp + climate_trend + np.random.normal(0, 3)
# Precipitation (higher in tropics and certain seasons)
base_precip = max(0, 10 - abs(latitude) * 0.3)
seasonal_precip_factor = 1 + 0.5 * math.cos(2 * math.pi * (day_of_year - 30) / 365)
daily_precip = max(0, np.random.exponential(base_precip * seasonal_precip_factor))
# Humidity (correlated with temperature and precipitation)
base_humidity = 60 - abs(latitude) * 0.5
humidity = base_humidity + daily_precip * 0.5 - (daily_temp - base_temp) * 0.3
humidity = max(10, min(100, humidity + np.random.normal(0, 5)))
# Wind speed (more variable at higher latitudes)
base_wind = 5 + abs(latitude) * 0.1
wind_speed = max(0, np.random.gamma(2, base_wind / 2))
# Atmospheric pressure (altitude and weather dependent)
base_pressure = 1013.25 # Sea level
pressure = base_pressure + np.random.normal(0, 10)
daily_data.append({
'date': date,
'temperature': daily_temp,
'precipitation': daily_precip,
'humidity': humidity,
'wind_speed': wind_speed,
'pressure': pressure
})
return daily_data
def analyze_station_trends(station_data):
"""Analyze trends for a single weather station"""
df = pd.DataFrame(station_data)
# Calculate annual statistics
annual_stats = {
'mean_temperature': df['temperature'].mean(),
'temperature_range': df['temperature'].max() - df['temperature'].min(),
'total_precipitation': df['precipitation'].sum(),
'mean_humidity': df['humidity'].mean(),
'mean_wind_speed': df['wind_speed'].mean(),
'mean_pressure': df['pressure'].mean(),
'temperature_std': df['temperature'].std(),
'precipitation_days': (df['precipitation'] > 1.0).sum(),
'extreme_heat_days': (df['temperature'] > df['temperature'].quantile(0.95)).sum(),
'extreme_cold_days': (df['temperature'] < df['temperature'].quantile(0.05)).sum()
}
# Seasonal analysis
df['month'] = df['date'].dt.month
seasonal_temps = df.groupby(df['month'])['temperature'].mean()
seasonal_precip = df.groupby(df['month'])['precipitation'].sum()
annual_stats['seasonal_temperature_variation'] = seasonal_temps.std()
annual_stats['wettest_month'] = seasonal_precip.idxmax()
annual_stats['driest_month'] = seasonal_precip.idxmin()
return annual_stats
print(f"Analyzing climate data for {years_to_analyze} years, {stations_per_year} stations per year...")
print(f"Total data points: {years_to_analyze * stations_per_year * 365:,}")
all_station_results = []
# This loop will be automatically parallelized by Clustrix
for year in range(1970, 1970 + years_to_analyze):
print(f"Processing year {year}...")
year_results = []
for station_id in range(stations_per_year):
# Generate random station location
latitude = np.random.uniform(-60, 75) # Inhabitable latitudes
longitude = np.random.uniform(-180, 180)
# Generate climate data for this station and year
station_data = generate_realistic_climate_data(year, station_id, latitude, longitude)
# Analyze the station data
station_analysis = analyze_station_trends(station_data)
station_analysis['year'] = year
station_analysis['station_id'] = station_id
station_analysis['latitude'] = latitude
station_analysis['longitude'] = longitude
year_results.append(station_analysis)
all_station_results.extend(year_results)
# Convert to DataFrame for analysis
results_df = pd.DataFrame(all_station_results)
# Global trend analysis
yearly_global_temps = results_df.groupby('year')['mean_temperature'].mean()
yearly_global_precip = results_df.groupby('year')['total_precipitation'].mean()
# Calculate trends
years = yearly_global_temps.index
temp_trend, temp_intercept, temp_r_value, temp_p_value, temp_std_err = stats.linregress(years, yearly_global_temps)
precip_trend, precip_intercept, precip_r_value, precip_p_value, precip_std_err = stats.linregress(years, yearly_global_precip)
# Regional analysis
def classify_climate_zone(lat):
if abs(lat) < 23.5:
return "Tropical"
elif abs(lat) < 35:
return "Subtropical"
elif abs(lat) < 50:
return "Temperate"
else:
return "Polar"
results_df['climate_zone'] = results_df['latitude'].apply(classify_climate_zone)
zone_analysis = results_df.groupby('climate_zone').agg({
'mean_temperature': ['mean', 'std'],
'total_precipitation': ['mean', 'std'],
'temperature_range': 'mean',
'extreme_heat_days': 'mean',
'extreme_cold_days': 'mean'
}).round(2)
# Extreme events analysis
extreme_heat_threshold = results_df['mean_temperature'].quantile(0.95)
extreme_cold_threshold = results_df['mean_temperature'].quantile(0.05)
drought_threshold = results_df['total_precipitation'].quantile(0.1)
flood_threshold = results_df['total_precipitation'].quantile(0.9)
extreme_events = {
'extreme_heat_stations': (results_df['mean_temperature'] > extreme_heat_threshold).sum(),
'extreme_cold_stations': (results_df['mean_temperature'] < extreme_cold_threshold).sum(),
'drought_affected_stations': (results_df['total_precipitation'] < drought_threshold).sum(),
'flood_risk_stations': (results_df['total_precipitation'] > flood_threshold).sum()
}
# Compile final results
climate_analysis = {
'analysis_summary': {
'years_analyzed': years_to_analyze,
'stations_per_year': stations_per_year,
'total_station_years': len(results_df),
'data_points_analyzed': len(results_df) * 365
},
'global_trends': {
'temperature_trend_per_decade': temp_trend * 10,
'temperature_trend_significance': temp_p_value,
'temperature_correlation': temp_r_value ** 2,
'precipitation_trend_per_decade': precip_trend * 10,
'precipitation_trend_significance': precip_p_value,
'precipitation_correlation': precip_r_value ** 2
},
'current_climate_state': {
'global_mean_temperature': yearly_global_temps.iloc[-1],
'global_mean_precipitation': yearly_global_precip.iloc[-1],
'temperature_warming_since_start': yearly_global_temps.iloc[-1] - yearly_global_temps.iloc[0],
'precipitation_change_since_start': yearly_global_precip.iloc[-1] - yearly_global_precip.iloc[0]
},
'regional_analysis': zone_analysis.to_dict(),
'extreme_events': extreme_events,
'statistical_summary': {
'mean_global_temperature': results_df['mean_temperature'].mean(),
'temperature_standard_deviation': results_df['mean_temperature'].std(),
'mean_global_precipitation': results_df['total_precipitation'].mean(),
'precipitation_standard_deviation': results_df['total_precipitation'].std(),
'warmest_station_temp': results_df['mean_temperature'].max(),
'coldest_station_temp': results_df['mean_temperature'].min(),
'wettest_station_precip': results_df['total_precipitation'].max(),
'driest_station_precip': results_df['total_precipitation'].min()
}
}
return climate_analysis
# Run climate analysis
climate_results = analyze_climate_data(years_to_analyze=30, stations_per_year=200)
print(f"\nCLIMATE DATA ANALYSIS COMPLETE")
print(f"Years analyzed: {climate_results['analysis_summary']['years_analyzed']}")
print(f"Total station-years: {climate_results['analysis_summary']['total_station_years']:,}")
print(f"Data points: {climate_results['analysis_summary']['data_points_analyzed']:,}")
print("\nGlobal Trends:")
trends = climate_results['global_trends']
print(f" Temperature trend: {trends['temperature_trend_per_decade']:.3f}°C per decade (p={trends['temperature_trend_significance']:.4f})")
print(f" Precipitation trend: {trends['precipitation_trend_per_decade']:.1f} mm per decade (p={trends['precipitation_trend_significance']:.4f})")
print("\nCurrent Climate State:")
current = climate_results['current_climate_state']
print(f" Global mean temperature: {current['global_mean_temperature']:.2f}°C")
print(f" Temperature change since start: {current['temperature_warming_since_start']:.2f}°C")
print(f" Global mean precipitation: {current['global_mean_precipitation']:.1f} mm/year")
print("\nExtreme Events:")
extremes = climate_results['extreme_events']
total_stations = climate_results['analysis_summary']['total_station_years']
print(f" Extreme heat affected: {extremes['extreme_heat_stations']} stations ({100*extremes['extreme_heat_stations']/total_stations:.1f}%)")
print(f" Drought affected: {extremes['drought_affected_stations']} stations ({100*extremes['drought_affected_stations']/total_stations:.1f}%)")
print(f" Flood risk: {extremes['flood_risk_stations']} stations ({100*extremes['flood_risk_stations']/total_stations:.1f}%)")
PBS Queue Management and Resource Selection¶
Understanding how to choose appropriate PBS queues and resources:
[ ]:
def select_pbs_resources(workload_type, data_size_mb, urgency="normal"):
"""
Intelligent PBS resource selection based on workload characteristics.
"""
# Base resource templates
resource_templates = {
"bioinformatics": {
"small": {"cores": 4, "memory": "16GB", "time": "02:00:00", "queue": "bioqueue"},
"medium": {"cores": 8, "memory": "32GB", "time": "06:00:00", "queue": "bioqueue"},
"large": {"cores": 16, "memory": "64GB", "time": "12:00:00", "queue": "bioqueue_long"}
},
"physics": {
"small": {"cores": 8, "memory": "32GB", "time": "04:00:00", "queue": "physics"},
"medium": {"cores": 16, "memory": "64GB", "time": "12:00:00", "queue": "physics"},
"large": {"cores": 32, "memory": "128GB", "time": "24:00:00", "queue": "physics_long"}
},
"climate": {
"small": {"cores": 6, "memory": "24GB", "time": "03:00:00", "queue": "climate"},
"medium": {"cores": 12, "memory": "48GB", "time": "08:00:00", "queue": "climate"},
"large": {"cores": 24, "memory": "96GB", "time": "16:00:00", "queue": "climate_long"}
},
"ml": {
"small": {"cores": 4, "memory": "16GB", "time": "01:00:00", "queue": "gpu", "gres": "gpu:1"},
"medium": {"cores": 8, "memory": "32GB", "time": "04:00:00", "queue": "gpu", "gres": "gpu:2"},
"large": {"cores": 16, "memory": "64GB", "time": "12:00:00", "queue": "gpu_long", "gres": "gpu:4"}
}
}
# Determine size category based on data
if data_size_mb < 100:
size_category = "small"
elif data_size_mb < 1000:
size_category = "medium"
else:
size_category = "large"
# Get base configuration
if workload_type not in resource_templates:
workload_type = "physics" # Default fallback
config = resource_templates[workload_type][size_category].copy()
# Adjust for urgency
if urgency == "urgent":
# Use express queue with reduced resources
config["queue"] = "express"
config["time"] = "00:30:00"
config["cores"] = min(4, config["cores"])
elif urgency == "low":
# Use long queue with more resources
config["queue"] = config["queue"].replace("queue", "queue_long")
config["cores"] = int(config["cores"] * 1.5)
# Increase time limit
time_parts = config["time"].split(":")
hours = int(time_parts[0]) * 2
config["time"] = f"{hours:02d}:{time_parts[1]}:{time_parts[2]}"
return config
# Example resource selections
example_workloads = [
("bioinformatics", 500, "normal"),
("physics", 2000, "low"),
("climate", 150, "urgent"),
("ml", 800, "normal")
]
print("PBS Resource Selection Examples:")
print("=" * 70)
for workload, data_size, urgency in example_workloads:
resources = select_pbs_resources(workload, data_size, urgency)
print(f"\n{workload.upper()} ({data_size} MB, {urgency} priority):")
for key, value in resources.items():
print(f" {key}: {value}")
PBS Job Arrays for Parameter Studies¶
Use PBS job arrays for efficient parameter sweeps:
[ ]:
@cluster(
cores=4,
memory="16GB",
time="01:00:00",
queue="normal",
pbs_array="1-20" # PBS job array with 20 tasks
)
def drug_discovery_parameter_sweep(base_config):
"""
Pharmaceutical research parameter sweep using PBS job arrays.
Each array task tests different molecular parameters.
"""
import os
import numpy as np
import random
from math import exp, log
# Get PBS array index
array_index = int(os.environ.get('PBS_ARRAYID', '1'))
# Define parameter space for drug discovery
molecular_weights = np.linspace(150, 500, 20) # Typical drug MW range
logp_values = np.linspace(-1, 5, 20) # Lipophilicity
hbd_counts = list(range(0, 6)) # Hydrogen bond donors
hba_counts = list(range(0, 11)) # Hydrogen bond acceptors
# Select parameters for this array task
mw = molecular_weights[array_index - 1]
logp = logp_values[array_index - 1]
# Random selection for other parameters
np.random.seed(array_index * 42)
hbd = random.choice(hbd_counts)
hba = random.choice(hba_counts)
print(f"Array task {array_index}: MW={mw:.1f}, LogP={logp:.2f}, HBD={hbd}, HBA={hba}")
def calculate_drug_likeness(mw, logp, hbd, hba):
"""Calculate drug-likeness using Lipinski's Rule of Five"""
violations = 0
if mw > 500:
violations += 1
if logp > 5:
violations += 1
if hbd > 5:
violations += 1
if hba > 10:
violations += 1
drug_likeness = max(0, 1.0 - violations * 0.25)
return drug_likeness, violations
def simulate_binding_affinity(mw, logp, hbd, hba):
"""Simulate binding affinity to target protein"""
# Simplified model based on molecular properties
optimal_mw = 350
optimal_logp = 2.5
optimal_hbd = 2
optimal_hba = 6
mw_score = exp(-((mw - optimal_mw) / 100) ** 2)
logp_score = exp(-((logp - optimal_logp) / 1.5) ** 2)
hbd_score = exp(-((hbd - optimal_hbd) / 1.5) ** 2)
hba_score = exp(-((hba - optimal_hba) / 2.5) ** 2)
# Combine scores with some randomness
base_affinity = (mw_score * logp_score * hbd_score * hba_score) ** 0.5
random_factor = np.random.uniform(0.7, 1.3)
binding_affinity = base_affinity * random_factor
ic50 = 10 ** (-6 - 3 * binding_affinity) # Convert to IC50 (M)
return binding_affinity, ic50
def simulate_admet_properties(mw, logp, hbd, hba):
"""Simulate ADMET (Absorption, Distribution, Metabolism, Excretion, Toxicity)"""
# Absorption (permeability)
permeability = 1 / (1 + exp(-(logp - 1.5)))
permeability *= np.random.uniform(0.8, 1.2)
# Distribution (plasma protein binding)
ppb = min(99, max(10, 20 + logp * 15 + np.random.normal(0, 10)))
# Metabolism (hepatic clearance)
clearance = 0.5 + 0.3 * (1 / (1 + exp(-(mw - 300) / 50)))
clearance *= np.random.uniform(0.7, 1.3)
# Excretion (renal clearance)
renal_clearance = max(0.1, 0.8 - logp * 0.1 + np.random.normal(0, 0.1))
# Toxicity (simplified hERG channel binding)
herg_risk = 1 / (1 + exp(-(logp - 3.5)))
if mw > 400:
herg_risk *= 1.5
return {
'permeability': permeability,
'plasma_protein_binding': ppb,
'hepatic_clearance': clearance,
'renal_clearance': renal_clearance,
'herg_risk': herg_risk
}
def calculate_developability_score(drug_likeness, binding_affinity, admet):
"""Calculate overall drug developability score"""
# Weight different factors
likeness_weight = 0.2
affinity_weight = 0.4
admet_weight = 0.4
# ADMET composite score
admet_score = (
admet['permeability'] * 0.3 +
(1 - admet['herg_risk']) * 0.3 +
(1 - admet['hepatic_clearance']) * 0.2 +
admet['renal_clearance'] * 0.2
)
total_score = (
drug_likeness * likeness_weight +
binding_affinity * affinity_weight +
admet_score * admet_weight
)
return total_score, admet_score
# Run simulations
drug_likeness, ro5_violations = calculate_drug_likeness(mw, logp, hbd, hba)
binding_affinity, ic50 = simulate_binding_affinity(mw, logp, hbd, hba)
admet_props = simulate_admet_properties(mw, logp, hbd, hba)
developability_score, admet_score = calculate_developability_score(
drug_likeness, binding_affinity, admet_props
)
# Compile results
compound_results = {
'array_task_id': array_index,
'molecular_properties': {
'molecular_weight': mw,
'logp': logp,
'hbd_count': hbd,
'hba_count': hba
},
'drug_likeness': {
'score': drug_likeness,
'ro5_violations': ro5_violations,
'passes_ro5': ro5_violations <= 1
},
'target_binding': {
'affinity_score': binding_affinity,
'ic50_M': ic50,
'pic50': -log(ic50, 10) if ic50 > 0 else 0
},
'admet_properties': admet_props,
'overall_assessment': {
'developability_score': developability_score,
'admet_score': admet_score,
'promising_candidate': developability_score > 0.6 and binding_affinity > 0.5
}
}
return compound_results
# Run drug discovery parameter sweep
drug_config = {
'target_name': 'EGFR',
'assay_type': 'binding',
'screening_library': 'chembl'
}
# This will run as one task of the PBS array
drug_result = drug_discovery_parameter_sweep(drug_config)
print(f"\nDRUG DISCOVERY ANALYSIS - Task {drug_result['array_task_id']}")
print("=" * 60)
mol_props = drug_result['molecular_properties']
print(f"Molecular Weight: {mol_props['molecular_weight']:.1f} Da")
print(f"LogP: {mol_props['logp']:.2f}")
print(f"H-bond donors: {mol_props['hbd_count']}")
print(f"H-bond acceptors: {mol_props['hba_count']}")
drug_like = drug_result['drug_likeness']
print(f"\nDrug-likeness score: {drug_like['score']:.3f}")
print(f"Rule of 5 violations: {drug_like['ro5_violations']}")
print(f"Passes Lipinski's Rule: {drug_like['passes_ro5']}")
binding = drug_result['target_binding']
print(f"\nBinding affinity score: {binding['affinity_score']:.3f}")
print(f"IC50: {binding['ic50_M']:.2e} M")
print(f"pIC50: {binding['pic50']:.2f}")
assessment = drug_result['overall_assessment']
print(f"\nDevelopability score: {assessment['developability_score']:.3f}")
print(f"ADMET score: {assessment['admet_score']:.3f}")
print(f"Promising candidate: {assessment['promising_candidate']}")
Monitoring PBS Jobs¶
Monitor and manage PBS jobs using Clustrix:
[ ]:
from clustrix import ClusterExecutor
# Get the configured executor
config = clustrix.get_config()
executor = ClusterExecutor(config)
try:
executor.connect()
print("✓ Successfully connected to PBS cluster")
# Check PBS version
stdout, stderr = executor._execute_command("qstat --version")
if stdout:
print(f"✓ PBS version: {stdout.strip()}")
# Check available queues
stdout, stderr = executor._execute_command("qstat -Q")
if stdout:
print("\nAvailable queues:")
lines = stdout.strip().split('\n')
for line in lines[2:7]: # Skip header, show first 5 queues
parts = line.split()
if len(parts) >= 3:
queue_name = parts[0]
max_jobs = parts[1] if parts[1] != '--' else 'unlimited'
total_jobs = parts[2]
print(f" {queue_name}: {total_jobs} jobs, max: {max_jobs}")
# Check node status
stdout, stderr = executor._execute_command("pbsnodes -a | grep -E '^(\w+|\s+state)' | head -20")
if stdout:
print("\nNode status (sample):")
lines = stdout.strip().split('\n')
current_node = None
for line in lines[:10]: # Show first few nodes
if not line.startswith(' '):
current_node = line.strip()
elif 'state' in line:
state = line.split('=')[1].strip() if '=' in line else 'unknown'
print(f" {current_node}: {state}")
# Check user's job status
username = config.username
stdout, stderr = executor._execute_command(f"qstat -u {username}")
if stdout and len(stdout.strip().split('\n')) > 2:
print(f"\nYour current jobs:")
lines = stdout.strip().split('\n')
for line in lines[2:]: # Skip headers
print(f" {line}")
else:
print(f"\n✓ No jobs currently running for user {username}")
executor.disconnect()
print("\n✓ PBS cluster monitoring completed successfully")
except Exception as e:
print(f"✗ Connection or monitoring failed: {e}")
print("Please check your PBS cluster configuration and connectivity")
PBS Configuration Best Practices¶
Environment-Specific Configuration Files¶
Create different configurations for different PBS environments:
[ ]:
# Create PBS configuration for different research domains
bioinformatics_config = {
'cluster_type': 'pbs',
'cluster_host': 'bio-cluster.university.edu',
'username': 'researcher',
'default_queue': 'bioqueue',
'default_cores': 8,
'default_memory': '32GB',
'default_time': '06:00:00',
'module_loads': ['python/3.9', 'blast/2.12', 'hmmer/3.3'],
'remote_work_dir': '/scratch/bio/clustrix',
'max_parallel_jobs': 20
}
physics_config = {
'cluster_type': 'pbs',
'cluster_host': 'physics-hpc.university.edu',
'username': 'physicist',
'default_queue': 'physics',
'default_cores': 16,
'default_memory': '64GB',
'default_time': '12:00:00',
'module_loads': ['python/3.9', 'openmpi/4.1', 'fftw/3.3'],
'remote_work_dir': '/home/physicist/clustrix',
'features': 'infiniband', # Request high-speed interconnect
'max_parallel_jobs': 10
}
climate_config = {
'cluster_type': 'pbs',
'cluster_host': 'climate-compute.noaa.gov',
'username': 'climatologist',
'default_queue': 'climate',
'default_cores': 12,
'default_memory': '48GB',
'default_time': '08:00:00',
'module_loads': ['python/3.9', 'netcdf/4.8', 'gdal/3.4'],
'remote_work_dir': '/data/climate/clustrix',
'max_parallel_jobs': 15
}
# Example of selecting configuration based on research domain
def configure_for_domain(domain):
configs = {
'bioinformatics': bioinformatics_config,
'physics': physics_config,
'climate': climate_config
}
if domain in configs:
clustrix.configure(**configs[domain])
print(f"Configured Clustrix for {domain} research")
return configs[domain]
else:
print(f"Unknown domain: {domain}. Available: {list(configs.keys())}")
return None
# Configure for bioinformatics research
selected_config = configure_for_domain('bioinformatics')
if selected_config:
print("\nConfiguration details:")
for key, value in selected_config.items():
print(f" {key}: {value}")
Summary¶
This tutorial covered PBS/Torque cluster usage with Clustrix:
PBS Configuration - Setting up Clustrix for PBS clusters
Bioinformatics Applications - DNA sequence analysis and genomics
Materials Science - Molecular dynamics simulations
Climate Research - Large-scale environmental data analysis
Drug Discovery - Pharmaceutical parameter sweeps with job arrays
Resource Management - Intelligent queue and resource selection
Job Monitoring - PBS cluster status and job management
Best Practices - Domain-specific configurations
Key PBS Features:¶
Queue Management: Choose appropriate queues for different workload types
Resource Specification: Use PBS directives for cores, memory, and time
Job Arrays: Efficient parameter sweeps with
pbs_arrayparameterFeature Requests: Specify hardware features like InfiniBand
Module Loading: Automatic environment setup with required software
Walltime Management: Realistic time estimates for job completion
Next Steps:¶
Explore SLURM Tutorial for SLURM-specific features
Try Kubernetes Tutorial for containerized computing
Review SGE Tutorial for Sun Grid Engine clusters
Check the SSH Setup Guide for secure authentication
For more information, visit the Clustrix Documentation.