PBS/Torque Cluster Tutorial

Open In Colab

This tutorial demonstrates how to use Clustrix with PBS (Portable Batch System) and Torque clusters. PBS is widely used in academic and research computing environments.

Prerequisites

  • Access to a PBS/Torque cluster

  • SSH key configured for the cluster

  • Clustrix installed: pip install clustrix

Installation and Setup

[ ]:
# Install Clustrix (uncomment if needed)
# !pip install clustrix

import clustrix
from clustrix import cluster, configure
import numpy as np
import pandas as pd

PBS Cluster Configuration

Configure Clustrix for your PBS/Torque cluster:

[ ]:
# Configure for PBS cluster
configure(
    cluster_type="pbs",
    cluster_host="pbs-cluster.university.edu",  # Replace with your cluster
    username="your-username",                   # Replace with your username
    key_file="~/.ssh/id_rsa",                  # Path to SSH key

    # Default PBS resource requirements
    default_cores=4,
    default_memory="16GB",
    default_time="02:00:00",
    default_queue="normal",                     # PBS queue name

    # PBS-specific options
    remote_work_dir="/home/your-username/clustrix",  # Adjust for your cluster

    # Environment setup
    module_loads=["python/3.9", "openmpi/4.0"],  # Common PBS modules

    # Job management
    cleanup_on_success=True,
    max_parallel_jobs=25
)

print("PBS cluster configured successfully!")

Example 1: Bioinformatics - DNA Sequence Analysis

PBS clusters are popular in bioinformatics. Let’s analyze DNA sequences:

[ ]:
@cluster(
    cores=8,
    memory="32GB",
    time="03:00:00",
    queue="bioqueue",  # Specialized bioinformatics queue
    walltime="03:00:00"  # PBS uses 'walltime' parameter
)
def analyze_dna_sequences(sequences, analysis_type="comprehensive"):
    """
    Comprehensive DNA sequence analysis for bioinformatics research.
    """
    import numpy as np
    import random
    from collections import Counter, defaultdict
    import re
    import math

    def calculate_gc_content(sequence):
        """Calculate GC content percentage"""
        gc_count = sequence.count('G') + sequence.count('C')
        return (gc_count / len(sequence)) * 100 if sequence else 0

    def find_orfs(sequence, min_length=100):
        """Find Open Reading Frames (ORFs)"""
        start_codon = 'ATG'
        stop_codons = ['TAA', 'TAG', 'TGA']
        orfs = []

        for frame in range(3):  # Check all 3 reading frames
            for i in range(frame, len(sequence) - 2, 3):
                codon = sequence[i:i+3]
                if codon == start_codon:
                    # Look for stop codon
                    for j in range(i+3, len(sequence) - 2, 3):
                        stop_codon = sequence[j:j+3]
                        if stop_codon in stop_codons:
                            orf_length = j - i + 3
                            if orf_length >= min_length:
                                orfs.append({
                                    'start': i,
                                    'end': j + 3,
                                    'length': orf_length,
                                    'frame': frame + 1,
                                    'sequence': sequence[i:j+3]
                                })
                            break
        return orfs

    def analyze_codon_usage(sequence):
        """Analyze codon usage patterns"""
        codons = [sequence[i:i+3] for i in range(0, len(sequence)-2, 3)
                 if len(sequence[i:i+3]) == 3]
        codon_counts = Counter(codons)

        # Standard genetic code mapping
        genetic_code = {
            'TTT': 'F', 'TTC': 'F', 'TTA': 'L', 'TTG': 'L',
            'TCT': 'S', 'TCC': 'S', 'TCA': 'S', 'TCG': 'S',
            'TAT': 'Y', 'TAC': 'Y', 'TAA': '*', 'TAG': '*',
            'TGT': 'C', 'TGC': 'C', 'TGA': '*', 'TGG': 'W',
            'CTT': 'L', 'CTC': 'L', 'CTA': 'L', 'CTG': 'L',
            'CCT': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P',
            'CAT': 'H', 'CAC': 'H', 'CAA': 'Q', 'CAG': 'Q',
            'CGT': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R',
            'ATT': 'I', 'ATC': 'I', 'ATA': 'I', 'ATG': 'M',
            'ACT': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T',
            'AAT': 'N', 'AAC': 'N', 'AAA': 'K', 'AAG': 'K',
            'AGT': 'S', 'AGC': 'S', 'AGA': 'R', 'AGG': 'R',
            'GTT': 'V', 'GTC': 'V', 'GTA': 'V', 'GTG': 'V',
            'GCT': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A',
            'GAT': 'D', 'GAC': 'D', 'GAA': 'E', 'GAG': 'E',
            'GGT': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G'
        }

        amino_acid_counts = defaultdict(int)
        for codon, count in codon_counts.items():
            if codon in genetic_code:
                amino_acid_counts[genetic_code[codon]] += count

        return dict(codon_counts), dict(amino_acid_counts)

    def find_tandem_repeats(sequence, min_repeat_length=3, max_repeat_length=20):
        """Find tandem repeats in DNA sequence"""
        repeats = []

        for repeat_len in range(min_repeat_length, max_repeat_length + 1):
            for i in range(len(sequence) - repeat_len * 2 + 1):
                motif = sequence[i:i + repeat_len]
                count = 1
                j = i + repeat_len

                while j + repeat_len <= len(sequence) and sequence[j:j + repeat_len] == motif:
                    count += 1
                    j += repeat_len

                if count >= 3:  # At least 3 repeats
                    repeats.append({
                        'motif': motif,
                        'start': i,
                        'end': j,
                        'repeat_count': count,
                        'total_length': j - i
                    })

        return repeats

    # Main analysis loop
    results = []

    for seq_idx, sequence in enumerate(sequences):
        print(f"Analyzing sequence {seq_idx + 1}/{len(sequences)} (length: {len(sequence)})...")

        # Basic composition analysis
        base_composition = Counter(sequence)
        gc_content = calculate_gc_content(sequence)

        # Advanced analyses
        orfs = find_orfs(sequence, min_length=150)
        codon_usage, amino_acid_freq = analyze_codon_usage(sequence)
        tandem_repeats = find_tandem_repeats(sequence)

        # CpG island detection (simplified)
        cpg_sites = len(re.findall('CG', sequence))
        cpg_density = (cpg_sites / (len(sequence) - 1)) * 100 if len(sequence) > 1 else 0

        # Complexity analysis
        def calculate_complexity(seq, window_size=50):
            complexities = []
            for i in range(0, len(seq) - window_size + 1, window_size):
                window = seq[i:i + window_size]
                counter = Counter(window)
                entropy = -sum((count/window_size) * math.log2(count/window_size)
                              for count in counter.values() if count > 0)
                complexities.append(entropy)
            return np.mean(complexities) if complexities else 0

        complexity = calculate_complexity(sequence)

        sequence_result = {
            'sequence_id': seq_idx,
            'length': len(sequence),
            'base_composition': dict(base_composition),
            'gc_content': gc_content,
            'complexity': complexity,
            'orfs_found': len(orfs),
            'longest_orf': max(orfs, key=lambda x: x['length'])['length'] if orfs else 0,
            'cpg_sites': cpg_sites,
            'cpg_density': cpg_density,
            'tandem_repeats': len(tandem_repeats),
            'repeat_details': tandem_repeats[:5],  # Keep first 5 for analysis
            'codon_diversity': len(codon_usage),
            'amino_acid_diversity': len(amino_acid_freq),
            'most_common_amino_acid': max(amino_acid_freq.items(), key=lambda x: x[1])[0] if amino_acid_freq else 'N/A'
        }

        results.append(sequence_result)

    # Aggregate statistics
    aggregate_stats = {
        'total_sequences': len(results),
        'total_base_pairs': sum(r['length'] for r in results),
        'average_gc_content': np.mean([r['gc_content'] for r in results]),
        'gc_content_std': np.std([r['gc_content'] for r in results]),
        'average_complexity': np.mean([r['complexity'] for r in results]),
        'total_orfs_found': sum(r['orfs_found'] for r in results),
        'total_cpg_sites': sum(r['cpg_sites'] for r in results),
        'sequences_with_repeats': sum(1 for r in results if r['tandem_repeats'] > 0),
        'individual_results': results
    }

    return aggregate_stats

# Generate sample DNA sequences for analysis
def generate_realistic_dna(length, gc_content=0.5):
    """Generate realistic DNA sequences with specific GC content"""
    bases = ['A', 'T', 'G', 'C']
    gc_prob = gc_content / 2
    at_prob = (1 - gc_content) / 2
    probs = [at_prob, at_prob, gc_prob, gc_prob]

    return ''.join(np.random.choice(bases, size=length, p=probs))

# Create test sequences
test_sequences = [
    generate_realistic_dna(5000, 0.4),   # AT-rich
    generate_realistic_dna(8000, 0.6),   # GC-rich
    generate_realistic_dna(3000, 0.5),   # Balanced
    generate_realistic_dna(12000, 0.45), # Large AT-rich
    generate_realistic_dna(6000, 0.55)   # Medium GC-rich
]

# Run analysis on PBS cluster
bio_results = analyze_dna_sequences(test_sequences, analysis_type="comprehensive")

print(f"\nBIOINFORMATICS ANALYSIS COMPLETE")
print(f"Sequences analyzed: {bio_results['total_sequences']}")
print(f"Total base pairs: {bio_results['total_base_pairs']:,}")
print(f"Average GC content: {bio_results['average_gc_content']:.2f}% ± {bio_results['gc_content_std']:.2f}%")
print(f"Total ORFs found: {bio_results['total_orfs_found']}")
print(f"Total CpG sites: {bio_results['total_cpg_sites']}")
print(f"Sequences with tandem repeats: {bio_results['sequences_with_repeats']}/{bio_results['total_sequences']}")

Example 2: Materials Science - Molecular Dynamics Simulation

Simulate molecular systems commonly done on PBS clusters:

[ ]:
@cluster(
    cores=16,
    memory="64GB",
    time="06:00:00",
    queue="physics",
    features="infiniband"  # PBS feature for high-speed networking
)
def molecular_dynamics_simulation(n_particles=10000, n_steps=100000, temperature=300.0):
    """
    Simplified molecular dynamics simulation for materials science.
    """
    import numpy as np
    import math

    # Physical constants
    kb = 1.380649e-23  # Boltzmann constant (J/K)
    mass = 1.66054e-27  # Approximate atomic mass (kg)
    dt = 1e-15  # Time step (s)
    sigma = 3.4e-10  # Lennard-Jones parameter (m)
    epsilon = 1.65e-21  # Lennard-Jones parameter (J)

    print(f"Starting MD simulation with {n_particles:,} particles for {n_steps:,} steps...")
    print(f"Temperature: {temperature} K")

    # Initialize system
    box_size = (n_particles / 0.8) ** (1/3) * sigma  # Density ~0.8

    # Random initial positions
    positions = np.random.uniform(0, box_size, (n_particles, 3))

    # Maxwell-Boltzmann velocity distribution
    velocity_scale = math.sqrt(kb * temperature / mass)
    velocities = np.random.normal(0, velocity_scale, (n_particles, 3))

    # Remove center of mass motion
    velocities -= np.mean(velocities, axis=0)

    # Storage for analysis
    energies = []
    temperatures = []
    pressures = []
    radial_distribution = []

    def lennard_jones_force(r):
        """Calculate Lennard-Jones force"""
        if r < 1e-12:  # Avoid division by zero
            return 0
        sr6 = (sigma / r) ** 6
        sr12 = sr6 ** 2
        return 24 * epsilon * (2 * sr12 - sr6) / r

    def calculate_forces(pos):
        """Calculate forces on all particles"""
        forces = np.zeros_like(pos)
        potential_energy = 0

        for i in range(n_particles):
            for j in range(i + 1, n_particles):
                # Distance vector with periodic boundary conditions
                dr = pos[j] - pos[i]
                dr = dr - box_size * np.round(dr / box_size)
                r = np.linalg.norm(dr)

                if r < 2.5 * sigma:  # Cutoff distance
                    force_magnitude = lennard_jones_force(r)
                    force_vector = force_magnitude * dr / r

                    forces[i] += force_vector
                    forces[j] -= force_vector

                    # Potential energy
                    sr6 = (sigma / r) ** 6
                    sr12 = sr6 ** 2
                    potential_energy += 4 * epsilon * (sr12 - sr6)

        return forces, potential_energy

    def calculate_temperature(vel):
        """Calculate instantaneous temperature"""
        kinetic_energy = 0.5 * mass * np.sum(vel ** 2)
        return 2 * kinetic_energy / (3 * n_particles * kb)

    def calculate_pressure(pos, forces):
        """Calculate pressure using virial theorem"""
        kinetic_term = n_particles * kb * calculate_temperature(velocities)
        virial = np.sum(positions * forces)
        volume = box_size ** 3
        return (kinetic_term + virial/3) / volume

    # Main simulation loop
    for step in range(n_steps):
        if step % (n_steps // 10) == 0:
            print(f"Step {step:,}/{n_steps:,} ({100*step/n_steps:.1f}%)")

        # Calculate forces
        forces, potential_energy = calculate_forces(positions)

        # Velocity Verlet integration
        # Update positions
        positions += velocities * dt + 0.5 * forces / mass * dt ** 2

        # Apply periodic boundary conditions
        positions = positions % box_size

        # Update velocities
        new_forces, _ = calculate_forces(positions)
        velocities += 0.5 * (forces + new_forces) / mass * dt

        # Calculate thermodynamic properties
        if step % 1000 == 0:  # Sample every 1000 steps
            kinetic_energy = 0.5 * mass * np.sum(velocities ** 2)
            total_energy = kinetic_energy + potential_energy
            temp = calculate_temperature(velocities)
            pressure = calculate_pressure(positions, new_forces)

            energies.append({
                'step': step,
                'kinetic': kinetic_energy,
                'potential': potential_energy,
                'total': total_energy
            })
            temperatures.append(temp)
            pressures.append(pressure)

        # Simple thermostat (velocity rescaling)
        if step % 100 == 0:  # Apply every 100 steps
            current_temp = calculate_temperature(velocities)
            if current_temp > 0:
                scaling_factor = math.sqrt(temperature / current_temp)
                velocities *= scaling_factor

    # Calculate radial distribution function (simplified)
    def calculate_rdf(pos, n_bins=100, max_r=None):
        if max_r is None:
            max_r = box_size / 2

        bin_width = max_r / n_bins
        rdf = np.zeros(n_bins)

        for i in range(min(1000, n_particles)):  # Sample subset for efficiency
            for j in range(i + 1, min(1000, n_particles)):
                dr = pos[j] - pos[i]
                dr = dr - box_size * np.round(dr / box_size)
                r = np.linalg.norm(dr)

                if r < max_r:
                    bin_index = int(r / bin_width)
                    if bin_index < n_bins:
                        rdf[bin_index] += 1

        # Normalize
        for i in range(n_bins):
            r = (i + 0.5) * bin_width
            volume = 4 * math.pi * r ** 2 * bin_width
            density = n_particles / box_size ** 3
            rdf[i] /= (volume * density * 1000)  # 1000 particles sampled

        return rdf, np.arange(0.5 * bin_width, max_r, bin_width)

    rdf_values, rdf_distances = calculate_rdf(positions)

    # Final analysis
    avg_temperature = np.mean(temperatures[-50:])  # Last 50 samples
    avg_pressure = np.mean(pressures[-50:])
    final_energy = energies[-1]['total'] if energies else 0

    simulation_results = {
        'n_particles': n_particles,
        'n_steps': n_steps,
        'target_temperature': temperature,
        'average_temperature': avg_temperature,
        'temperature_stability': np.std(temperatures[-50:]),
        'average_pressure': avg_pressure,
        'final_energy': final_energy,
        'box_size': box_size,
        'density': n_particles / box_size ** 3,
        'energy_trajectory': energies[::10],  # Every 10th point
        'temperature_trajectory': temperatures[::10],
        'pressure_trajectory': pressures[::10],
        'radial_distribution': {
            'distances': rdf_distances.tolist(),
            'values': rdf_values.tolist()
        },
        'simulation_time_ns': n_steps * dt * 1e9  # Convert to nanoseconds
    }

    return simulation_results

# Run molecular dynamics simulation
md_results = molecular_dynamics_simulation(
    n_particles=5000,
    n_steps=50000,
    temperature=298.15  # Room temperature
)

print(f"\nMOLECULAR DYNAMICS SIMULATION COMPLETE")
print(f"Particles: {md_results['n_particles']:,}")
print(f"Steps: {md_results['n_steps']:,}")
print(f"Simulation time: {md_results['simulation_time_ns']:.2f} ns")
print(f"Target temperature: {md_results['target_temperature']:.1f} K")
print(f"Average temperature: {md_results['average_temperature']:.1f} K")
print(f"Temperature stability: ±{md_results['temperature_stability']:.1f} K")
print(f"Average pressure: {md_results['average_pressure']:.2e} Pa")
print(f"System density: {md_results['density']:.2e} particles/m³")

Example 3: Environmental Science - Climate Data Analysis

Analyze large climate datasets commonly processed on research clusters:

[ ]:
@cluster(
    cores=12,
    memory="48GB",
    time="04:00:00",
    queue="climate",
    parallel=True  # Enable automatic parallelization
)
def analyze_climate_data(years_to_analyze=50, stations_per_year=1000):
    """
    Comprehensive climate data analysis for environmental research.
    """
    import numpy as np
    import pandas as pd
    from datetime import datetime, timedelta
    import random
    from scipy import stats
    import math

    def generate_realistic_climate_data(year, station_id, latitude, longitude):
        """Generate realistic climate data for a station"""
        np.random.seed(year * 1000 + station_id)  # Reproducible but varied

        # Base temperature influenced by latitude
        base_temp = 25 - abs(latitude) * 0.6  # Cooler at higher latitudes

        # Generate daily data for the year
        start_date = datetime(year, 1, 1)
        days_in_year = 366 if year % 4 == 0 else 365

        daily_data = []

        for day in range(days_in_year):
            date = start_date + timedelta(days=day)
            day_of_year = day + 1

            # Seasonal temperature variation
            seasonal_temp = base_temp + 15 * math.cos(2 * math.pi * (day_of_year - 172) / 365)

            # Add random variation and trends
            climate_trend = 0.01 * (year - 1970)  # 0.01°C/year warming
            daily_temp = seasonal_temp + climate_trend + np.random.normal(0, 3)

            # Precipitation (higher in tropics and certain seasons)
            base_precip = max(0, 10 - abs(latitude) * 0.3)
            seasonal_precip_factor = 1 + 0.5 * math.cos(2 * math.pi * (day_of_year - 30) / 365)
            daily_precip = max(0, np.random.exponential(base_precip * seasonal_precip_factor))

            # Humidity (correlated with temperature and precipitation)
            base_humidity = 60 - abs(latitude) * 0.5
            humidity = base_humidity + daily_precip * 0.5 - (daily_temp - base_temp) * 0.3
            humidity = max(10, min(100, humidity + np.random.normal(0, 5)))

            # Wind speed (more variable at higher latitudes)
            base_wind = 5 + abs(latitude) * 0.1
            wind_speed = max(0, np.random.gamma(2, base_wind / 2))

            # Atmospheric pressure (altitude and weather dependent)
            base_pressure = 1013.25  # Sea level
            pressure = base_pressure + np.random.normal(0, 10)

            daily_data.append({
                'date': date,
                'temperature': daily_temp,
                'precipitation': daily_precip,
                'humidity': humidity,
                'wind_speed': wind_speed,
                'pressure': pressure
            })

        return daily_data

    def analyze_station_trends(station_data):
        """Analyze trends for a single weather station"""
        df = pd.DataFrame(station_data)

        # Calculate annual statistics
        annual_stats = {
            'mean_temperature': df['temperature'].mean(),
            'temperature_range': df['temperature'].max() - df['temperature'].min(),
            'total_precipitation': df['precipitation'].sum(),
            'mean_humidity': df['humidity'].mean(),
            'mean_wind_speed': df['wind_speed'].mean(),
            'mean_pressure': df['pressure'].mean(),
            'temperature_std': df['temperature'].std(),
            'precipitation_days': (df['precipitation'] > 1.0).sum(),
            'extreme_heat_days': (df['temperature'] > df['temperature'].quantile(0.95)).sum(),
            'extreme_cold_days': (df['temperature'] < df['temperature'].quantile(0.05)).sum()
        }

        # Seasonal analysis
        df['month'] = df['date'].dt.month
        seasonal_temps = df.groupby(df['month'])['temperature'].mean()
        seasonal_precip = df.groupby(df['month'])['precipitation'].sum()

        annual_stats['seasonal_temperature_variation'] = seasonal_temps.std()
        annual_stats['wettest_month'] = seasonal_precip.idxmax()
        annual_stats['driest_month'] = seasonal_precip.idxmin()

        return annual_stats

    print(f"Analyzing climate data for {years_to_analyze} years, {stations_per_year} stations per year...")
    print(f"Total data points: {years_to_analyze * stations_per_year * 365:,}")

    all_station_results = []

    # This loop will be automatically parallelized by Clustrix
    for year in range(1970, 1970 + years_to_analyze):
        print(f"Processing year {year}...")

        year_results = []

        for station_id in range(stations_per_year):
            # Generate random station location
            latitude = np.random.uniform(-60, 75)  # Inhabitable latitudes
            longitude = np.random.uniform(-180, 180)

            # Generate climate data for this station and year
            station_data = generate_realistic_climate_data(year, station_id, latitude, longitude)

            # Analyze the station data
            station_analysis = analyze_station_trends(station_data)
            station_analysis['year'] = year
            station_analysis['station_id'] = station_id
            station_analysis['latitude'] = latitude
            station_analysis['longitude'] = longitude

            year_results.append(station_analysis)

        all_station_results.extend(year_results)

    # Convert to DataFrame for analysis
    results_df = pd.DataFrame(all_station_results)

    # Global trend analysis
    yearly_global_temps = results_df.groupby('year')['mean_temperature'].mean()
    yearly_global_precip = results_df.groupby('year')['total_precipitation'].mean()

    # Calculate trends
    years = yearly_global_temps.index
    temp_trend, temp_intercept, temp_r_value, temp_p_value, temp_std_err = stats.linregress(years, yearly_global_temps)
    precip_trend, precip_intercept, precip_r_value, precip_p_value, precip_std_err = stats.linregress(years, yearly_global_precip)

    # Regional analysis
    def classify_climate_zone(lat):
        if abs(lat) < 23.5:
            return "Tropical"
        elif abs(lat) < 35:
            return "Subtropical"
        elif abs(lat) < 50:
            return "Temperate"
        else:
            return "Polar"

    results_df['climate_zone'] = results_df['latitude'].apply(classify_climate_zone)
    zone_analysis = results_df.groupby('climate_zone').agg({
        'mean_temperature': ['mean', 'std'],
        'total_precipitation': ['mean', 'std'],
        'temperature_range': 'mean',
        'extreme_heat_days': 'mean',
        'extreme_cold_days': 'mean'
    }).round(2)

    # Extreme events analysis
    extreme_heat_threshold = results_df['mean_temperature'].quantile(0.95)
    extreme_cold_threshold = results_df['mean_temperature'].quantile(0.05)
    drought_threshold = results_df['total_precipitation'].quantile(0.1)
    flood_threshold = results_df['total_precipitation'].quantile(0.9)

    extreme_events = {
        'extreme_heat_stations': (results_df['mean_temperature'] > extreme_heat_threshold).sum(),
        'extreme_cold_stations': (results_df['mean_temperature'] < extreme_cold_threshold).sum(),
        'drought_affected_stations': (results_df['total_precipitation'] < drought_threshold).sum(),
        'flood_risk_stations': (results_df['total_precipitation'] > flood_threshold).sum()
    }

    # Compile final results
    climate_analysis = {
        'analysis_summary': {
            'years_analyzed': years_to_analyze,
            'stations_per_year': stations_per_year,
            'total_station_years': len(results_df),
            'data_points_analyzed': len(results_df) * 365
        },
        'global_trends': {
            'temperature_trend_per_decade': temp_trend * 10,
            'temperature_trend_significance': temp_p_value,
            'temperature_correlation': temp_r_value ** 2,
            'precipitation_trend_per_decade': precip_trend * 10,
            'precipitation_trend_significance': precip_p_value,
            'precipitation_correlation': precip_r_value ** 2
        },
        'current_climate_state': {
            'global_mean_temperature': yearly_global_temps.iloc[-1],
            'global_mean_precipitation': yearly_global_precip.iloc[-1],
            'temperature_warming_since_start': yearly_global_temps.iloc[-1] - yearly_global_temps.iloc[0],
            'precipitation_change_since_start': yearly_global_precip.iloc[-1] - yearly_global_precip.iloc[0]
        },
        'regional_analysis': zone_analysis.to_dict(),
        'extreme_events': extreme_events,
        'statistical_summary': {
            'mean_global_temperature': results_df['mean_temperature'].mean(),
            'temperature_standard_deviation': results_df['mean_temperature'].std(),
            'mean_global_precipitation': results_df['total_precipitation'].mean(),
            'precipitation_standard_deviation': results_df['total_precipitation'].std(),
            'warmest_station_temp': results_df['mean_temperature'].max(),
            'coldest_station_temp': results_df['mean_temperature'].min(),
            'wettest_station_precip': results_df['total_precipitation'].max(),
            'driest_station_precip': results_df['total_precipitation'].min()
        }
    }

    return climate_analysis

# Run climate analysis
climate_results = analyze_climate_data(years_to_analyze=30, stations_per_year=200)

print(f"\nCLIMATE DATA ANALYSIS COMPLETE")
print(f"Years analyzed: {climate_results['analysis_summary']['years_analyzed']}")
print(f"Total station-years: {climate_results['analysis_summary']['total_station_years']:,}")
print(f"Data points: {climate_results['analysis_summary']['data_points_analyzed']:,}")

print("\nGlobal Trends:")
trends = climate_results['global_trends']
print(f"  Temperature trend: {trends['temperature_trend_per_decade']:.3f}°C per decade (p={trends['temperature_trend_significance']:.4f})")
print(f"  Precipitation trend: {trends['precipitation_trend_per_decade']:.1f} mm per decade (p={trends['precipitation_trend_significance']:.4f})")

print("\nCurrent Climate State:")
current = climate_results['current_climate_state']
print(f"  Global mean temperature: {current['global_mean_temperature']:.2f}°C")
print(f"  Temperature change since start: {current['temperature_warming_since_start']:.2f}°C")
print(f"  Global mean precipitation: {current['global_mean_precipitation']:.1f} mm/year")

print("\nExtreme Events:")
extremes = climate_results['extreme_events']
total_stations = climate_results['analysis_summary']['total_station_years']
print(f"  Extreme heat affected: {extremes['extreme_heat_stations']} stations ({100*extremes['extreme_heat_stations']/total_stations:.1f}%)")
print(f"  Drought affected: {extremes['drought_affected_stations']} stations ({100*extremes['drought_affected_stations']/total_stations:.1f}%)")
print(f"  Flood risk: {extremes['flood_risk_stations']} stations ({100*extremes['flood_risk_stations']/total_stations:.1f}%)")

PBS Queue Management and Resource Selection

Understanding how to choose appropriate PBS queues and resources:

[ ]:
def select_pbs_resources(workload_type, data_size_mb, urgency="normal"):
    """
    Intelligent PBS resource selection based on workload characteristics.
    """

    # Base resource templates
    resource_templates = {
        "bioinformatics": {
            "small": {"cores": 4, "memory": "16GB", "time": "02:00:00", "queue": "bioqueue"},
            "medium": {"cores": 8, "memory": "32GB", "time": "06:00:00", "queue": "bioqueue"},
            "large": {"cores": 16, "memory": "64GB", "time": "12:00:00", "queue": "bioqueue_long"}
        },
        "physics": {
            "small": {"cores": 8, "memory": "32GB", "time": "04:00:00", "queue": "physics"},
            "medium": {"cores": 16, "memory": "64GB", "time": "12:00:00", "queue": "physics"},
            "large": {"cores": 32, "memory": "128GB", "time": "24:00:00", "queue": "physics_long"}
        },
        "climate": {
            "small": {"cores": 6, "memory": "24GB", "time": "03:00:00", "queue": "climate"},
            "medium": {"cores": 12, "memory": "48GB", "time": "08:00:00", "queue": "climate"},
            "large": {"cores": 24, "memory": "96GB", "time": "16:00:00", "queue": "climate_long"}
        },
        "ml": {
            "small": {"cores": 4, "memory": "16GB", "time": "01:00:00", "queue": "gpu", "gres": "gpu:1"},
            "medium": {"cores": 8, "memory": "32GB", "time": "04:00:00", "queue": "gpu", "gres": "gpu:2"},
            "large": {"cores": 16, "memory": "64GB", "time": "12:00:00", "queue": "gpu_long", "gres": "gpu:4"}
        }
    }

    # Determine size category based on data
    if data_size_mb < 100:
        size_category = "small"
    elif data_size_mb < 1000:
        size_category = "medium"
    else:
        size_category = "large"

    # Get base configuration
    if workload_type not in resource_templates:
        workload_type = "physics"  # Default fallback

    config = resource_templates[workload_type][size_category].copy()

    # Adjust for urgency
    if urgency == "urgent":
        # Use express queue with reduced resources
        config["queue"] = "express"
        config["time"] = "00:30:00"
        config["cores"] = min(4, config["cores"])
    elif urgency == "low":
        # Use long queue with more resources
        config["queue"] = config["queue"].replace("queue", "queue_long")
        config["cores"] = int(config["cores"] * 1.5)
        # Increase time limit
        time_parts = config["time"].split(":")
        hours = int(time_parts[0]) * 2
        config["time"] = f"{hours:02d}:{time_parts[1]}:{time_parts[2]}"

    return config

# Example resource selections
example_workloads = [
    ("bioinformatics", 500, "normal"),
    ("physics", 2000, "low"),
    ("climate", 150, "urgent"),
    ("ml", 800, "normal")
]

print("PBS Resource Selection Examples:")
print("=" * 70)

for workload, data_size, urgency in example_workloads:
    resources = select_pbs_resources(workload, data_size, urgency)
    print(f"\n{workload.upper()} ({data_size} MB, {urgency} priority):")
    for key, value in resources.items():
        print(f"  {key}: {value}")

PBS Job Arrays for Parameter Studies

Use PBS job arrays for efficient parameter sweeps:

[ ]:
@cluster(
    cores=4,
    memory="16GB",
    time="01:00:00",
    queue="normal",
    pbs_array="1-20"  # PBS job array with 20 tasks
)
def drug_discovery_parameter_sweep(base_config):
    """
    Pharmaceutical research parameter sweep using PBS job arrays.
    Each array task tests different molecular parameters.
    """
    import os
    import numpy as np
    import random
    from math import exp, log

    # Get PBS array index
    array_index = int(os.environ.get('PBS_ARRAYID', '1'))

    # Define parameter space for drug discovery
    molecular_weights = np.linspace(150, 500, 20)  # Typical drug MW range
    logp_values = np.linspace(-1, 5, 20)           # Lipophilicity
    hbd_counts = list(range(0, 6))                 # Hydrogen bond donors
    hba_counts = list(range(0, 11))                # Hydrogen bond acceptors

    # Select parameters for this array task
    mw = molecular_weights[array_index - 1]
    logp = logp_values[array_index - 1]

    # Random selection for other parameters
    np.random.seed(array_index * 42)
    hbd = random.choice(hbd_counts)
    hba = random.choice(hba_counts)

    print(f"Array task {array_index}: MW={mw:.1f}, LogP={logp:.2f}, HBD={hbd}, HBA={hba}")

    def calculate_drug_likeness(mw, logp, hbd, hba):
        """Calculate drug-likeness using Lipinski's Rule of Five"""
        violations = 0

        if mw > 500:
            violations += 1
        if logp > 5:
            violations += 1
        if hbd > 5:
            violations += 1
        if hba > 10:
            violations += 1

        drug_likeness = max(0, 1.0 - violations * 0.25)
        return drug_likeness, violations

    def simulate_binding_affinity(mw, logp, hbd, hba):
        """Simulate binding affinity to target protein"""
        # Simplified model based on molecular properties
        optimal_mw = 350
        optimal_logp = 2.5
        optimal_hbd = 2
        optimal_hba = 6

        mw_score = exp(-((mw - optimal_mw) / 100) ** 2)
        logp_score = exp(-((logp - optimal_logp) / 1.5) ** 2)
        hbd_score = exp(-((hbd - optimal_hbd) / 1.5) ** 2)
        hba_score = exp(-((hba - optimal_hba) / 2.5) ** 2)

        # Combine scores with some randomness
        base_affinity = (mw_score * logp_score * hbd_score * hba_score) ** 0.5
        random_factor = np.random.uniform(0.7, 1.3)

        binding_affinity = base_affinity * random_factor
        ic50 = 10 ** (-6 - 3 * binding_affinity)  # Convert to IC50 (M)

        return binding_affinity, ic50

    def simulate_admet_properties(mw, logp, hbd, hba):
        """Simulate ADMET (Absorption, Distribution, Metabolism, Excretion, Toxicity)"""
        # Absorption (permeability)
        permeability = 1 / (1 + exp(-(logp - 1.5)))
        permeability *= np.random.uniform(0.8, 1.2)

        # Distribution (plasma protein binding)
        ppb = min(99, max(10, 20 + logp * 15 + np.random.normal(0, 10)))

        # Metabolism (hepatic clearance)
        clearance = 0.5 + 0.3 * (1 / (1 + exp(-(mw - 300) / 50)))
        clearance *= np.random.uniform(0.7, 1.3)

        # Excretion (renal clearance)
        renal_clearance = max(0.1, 0.8 - logp * 0.1 + np.random.normal(0, 0.1))

        # Toxicity (simplified hERG channel binding)
        herg_risk = 1 / (1 + exp(-(logp - 3.5)))
        if mw > 400:
            herg_risk *= 1.5

        return {
            'permeability': permeability,
            'plasma_protein_binding': ppb,
            'hepatic_clearance': clearance,
            'renal_clearance': renal_clearance,
            'herg_risk': herg_risk
        }

    def calculate_developability_score(drug_likeness, binding_affinity, admet):
        """Calculate overall drug developability score"""
        # Weight different factors
        likeness_weight = 0.2
        affinity_weight = 0.4
        admet_weight = 0.4

        # ADMET composite score
        admet_score = (
            admet['permeability'] * 0.3 +
            (1 - admet['herg_risk']) * 0.3 +
            (1 - admet['hepatic_clearance']) * 0.2 +
            admet['renal_clearance'] * 0.2
        )

        total_score = (
            drug_likeness * likeness_weight +
            binding_affinity * affinity_weight +
            admet_score * admet_weight
        )

        return total_score, admet_score

    # Run simulations
    drug_likeness, ro5_violations = calculate_drug_likeness(mw, logp, hbd, hba)
    binding_affinity, ic50 = simulate_binding_affinity(mw, logp, hbd, hba)
    admet_props = simulate_admet_properties(mw, logp, hbd, hba)
    developability_score, admet_score = calculate_developability_score(
        drug_likeness, binding_affinity, admet_props
    )

    # Compile results
    compound_results = {
        'array_task_id': array_index,
        'molecular_properties': {
            'molecular_weight': mw,
            'logp': logp,
            'hbd_count': hbd,
            'hba_count': hba
        },
        'drug_likeness': {
            'score': drug_likeness,
            'ro5_violations': ro5_violations,
            'passes_ro5': ro5_violations <= 1
        },
        'target_binding': {
            'affinity_score': binding_affinity,
            'ic50_M': ic50,
            'pic50': -log(ic50, 10) if ic50 > 0 else 0
        },
        'admet_properties': admet_props,
        'overall_assessment': {
            'developability_score': developability_score,
            'admet_score': admet_score,
            'promising_candidate': developability_score > 0.6 and binding_affinity > 0.5
        }
    }

    return compound_results

# Run drug discovery parameter sweep
drug_config = {
    'target_name': 'EGFR',
    'assay_type': 'binding',
    'screening_library': 'chembl'
}

# This will run as one task of the PBS array
drug_result = drug_discovery_parameter_sweep(drug_config)

print(f"\nDRUG DISCOVERY ANALYSIS - Task {drug_result['array_task_id']}")
print("=" * 60)

mol_props = drug_result['molecular_properties']
print(f"Molecular Weight: {mol_props['molecular_weight']:.1f} Da")
print(f"LogP: {mol_props['logp']:.2f}")
print(f"H-bond donors: {mol_props['hbd_count']}")
print(f"H-bond acceptors: {mol_props['hba_count']}")

drug_like = drug_result['drug_likeness']
print(f"\nDrug-likeness score: {drug_like['score']:.3f}")
print(f"Rule of 5 violations: {drug_like['ro5_violations']}")
print(f"Passes Lipinski's Rule: {drug_like['passes_ro5']}")

binding = drug_result['target_binding']
print(f"\nBinding affinity score: {binding['affinity_score']:.3f}")
print(f"IC50: {binding['ic50_M']:.2e} M")
print(f"pIC50: {binding['pic50']:.2f}")

assessment = drug_result['overall_assessment']
print(f"\nDevelopability score: {assessment['developability_score']:.3f}")
print(f"ADMET score: {assessment['admet_score']:.3f}")
print(f"Promising candidate: {assessment['promising_candidate']}")

Monitoring PBS Jobs

Monitor and manage PBS jobs using Clustrix:

[ ]:
from clustrix import ClusterExecutor

# Get the configured executor
config = clustrix.get_config()
executor = ClusterExecutor(config)

try:
    executor.connect()
    print("✓ Successfully connected to PBS cluster")

    # Check PBS version
    stdout, stderr = executor._execute_command("qstat --version")
    if stdout:
        print(f"✓ PBS version: {stdout.strip()}")

    # Check available queues
    stdout, stderr = executor._execute_command("qstat -Q")
    if stdout:
        print("\nAvailable queues:")
        lines = stdout.strip().split('\n')
        for line in lines[2:7]:  # Skip header, show first 5 queues
            parts = line.split()
            if len(parts) >= 3:
                queue_name = parts[0]
                max_jobs = parts[1] if parts[1] != '--' else 'unlimited'
                total_jobs = parts[2]
                print(f"  {queue_name}: {total_jobs} jobs, max: {max_jobs}")

    # Check node status
    stdout, stderr = executor._execute_command("pbsnodes -a | grep -E '^(\w+|\s+state)' | head -20")
    if stdout:
        print("\nNode status (sample):")
        lines = stdout.strip().split('\n')
        current_node = None
        for line in lines[:10]:  # Show first few nodes
            if not line.startswith(' '):
                current_node = line.strip()
            elif 'state' in line:
                state = line.split('=')[1].strip() if '=' in line else 'unknown'
                print(f"  {current_node}: {state}")

    # Check user's job status
    username = config.username
    stdout, stderr = executor._execute_command(f"qstat -u {username}")
    if stdout and len(stdout.strip().split('\n')) > 2:
        print(f"\nYour current jobs:")
        lines = stdout.strip().split('\n')
        for line in lines[2:]:  # Skip headers
            print(f"  {line}")
    else:
        print(f"\n✓ No jobs currently running for user {username}")

    executor.disconnect()
    print("\n✓ PBS cluster monitoring completed successfully")

except Exception as e:
    print(f"✗ Connection or monitoring failed: {e}")
    print("Please check your PBS cluster configuration and connectivity")

PBS Configuration Best Practices

Environment-Specific Configuration Files

Create different configurations for different PBS environments:

[ ]:
# Create PBS configuration for different research domains

bioinformatics_config = {
    'cluster_type': 'pbs',
    'cluster_host': 'bio-cluster.university.edu',
    'username': 'researcher',
    'default_queue': 'bioqueue',
    'default_cores': 8,
    'default_memory': '32GB',
    'default_time': '06:00:00',
    'module_loads': ['python/3.9', 'blast/2.12', 'hmmer/3.3'],
    'remote_work_dir': '/scratch/bio/clustrix',
    'max_parallel_jobs': 20
}

physics_config = {
    'cluster_type': 'pbs',
    'cluster_host': 'physics-hpc.university.edu',
    'username': 'physicist',
    'default_queue': 'physics',
    'default_cores': 16,
    'default_memory': '64GB',
    'default_time': '12:00:00',
    'module_loads': ['python/3.9', 'openmpi/4.1', 'fftw/3.3'],
    'remote_work_dir': '/home/physicist/clustrix',
    'features': 'infiniband',  # Request high-speed interconnect
    'max_parallel_jobs': 10
}

climate_config = {
    'cluster_type': 'pbs',
    'cluster_host': 'climate-compute.noaa.gov',
    'username': 'climatologist',
    'default_queue': 'climate',
    'default_cores': 12,
    'default_memory': '48GB',
    'default_time': '08:00:00',
    'module_loads': ['python/3.9', 'netcdf/4.8', 'gdal/3.4'],
    'remote_work_dir': '/data/climate/clustrix',
    'max_parallel_jobs': 15
}

# Example of selecting configuration based on research domain
def configure_for_domain(domain):
    configs = {
        'bioinformatics': bioinformatics_config,
        'physics': physics_config,
        'climate': climate_config
    }

    if domain in configs:
        clustrix.configure(**configs[domain])
        print(f"Configured Clustrix for {domain} research")
        return configs[domain]
    else:
        print(f"Unknown domain: {domain}. Available: {list(configs.keys())}")
        return None

# Configure for bioinformatics research
selected_config = configure_for_domain('bioinformatics')
if selected_config:
    print("\nConfiguration details:")
    for key, value in selected_config.items():
        print(f"  {key}: {value}")

Summary

This tutorial covered PBS/Torque cluster usage with Clustrix:

  1. PBS Configuration - Setting up Clustrix for PBS clusters

  2. Bioinformatics Applications - DNA sequence analysis and genomics

  3. Materials Science - Molecular dynamics simulations

  4. Climate Research - Large-scale environmental data analysis

  5. Drug Discovery - Pharmaceutical parameter sweeps with job arrays

  6. Resource Management - Intelligent queue and resource selection

  7. Job Monitoring - PBS cluster status and job management

  8. Best Practices - Domain-specific configurations

Key PBS Features:

  • Queue Management: Choose appropriate queues for different workload types

  • Resource Specification: Use PBS directives for cores, memory, and time

  • Job Arrays: Efficient parameter sweeps with pbs_array parameter

  • Feature Requests: Specify hardware features like InfiniBand

  • Module Loading: Automatic environment setup with required software

  • Walltime Management: Realistic time estimates for job completion

Next Steps:

For more information, visit the Clustrix Documentation.