SGE (Sun Grid Engine) Tutorial

Open In Colab

This tutorial demonstrates how to use Clustrix with SGE (Sun Grid Engine) clusters, including Open Grid Scheduler and other SGE-compatible systems.

Prerequisites

  • Access to an SGE cluster

  • SSH key configured for the cluster

  • Clustrix installed: pip install clustrix

[ ]:
# Install Clustrix (uncomment if needed)
# !pip install clustrix

import clustrix
from clustrix import cluster, configure
import numpy as np

SGE Configuration

Configure Clustrix for your SGE cluster:

[ ]:
# Configure for SGE cluster
configure(
    cluster_type="sge",
    cluster_host="sge-cluster.org",           # Replace with your cluster
    username="your-username",                 # Replace with your username
    key_file="~/.ssh/id_rsa",                # SSH key path

    # SGE resource defaults
    default_cores=4,
    default_memory="8GB",
    default_time="02:00:00",
    default_queue="all.q",                    # Common SGE queue name

    # SGE-specific settings
    remote_work_dir="/home/your-username/clustrix",

    # Environment modules
    module_loads=["python/3.9"],

    # Job management
    cleanup_on_success=True,
    max_parallel_jobs=30
)

print("SGE cluster configured successfully!")

Example 1: Mathematical Optimization

SGE clusters are often used for optimization problems:

[ ]:
@cluster(
    cores=8,
    memory="16GB",
    time="01:30:00",
    queue="all.q",
    pe="smp 8"  # SGE parallel environment
)
def genetic_algorithm_optimization(problem_size=1000, generations=500):
    """
    Genetic Algorithm optimization on SGE cluster.
    """
    import numpy as np
    import random
    from functools import partial

    def rastrigin_function(x):
        """Rastrigin function - a multimodal optimization benchmark"""
        A = 10
        n = len(x)
        return A * n + sum(xi**2 - A * np.cos(2 * np.pi * xi) for xi in x)

    def rosenbrock_function(x):
        """Rosenbrock function - another optimization benchmark"""
        return sum(100 * (x[i+1] - x[i]**2)**2 + (1 - x[i])**2 for i in range(len(x)-1))

    def sphere_function(x):
        """Simple sphere function"""
        return sum(xi**2 for xi in x)

    # Choose objective function
    objective_functions = {
        'rastrigin': rastrigin_function,
        'rosenbrock': rosenbrock_function,
        'sphere': sphere_function
    }

    objective_name = 'rastrigin'  # Can be parameterized
    objective_func = objective_functions[objective_name]

    # Problem dimensions
    dimensions = min(50, problem_size // 20)  # Scale dimensions with problem size
    bounds = (-5.12, 5.12) if objective_name == 'rastrigin' else (-2.0, 2.0)

    print(f"Optimizing {objective_name} function in {dimensions} dimensions")
    print(f"Population size: {problem_size}, Generations: {generations}")

    class Individual:
        def __init__(self, genes=None):
            if genes is None:
                self.genes = np.random.uniform(bounds[0], bounds[1], dimensions)
            else:
                self.genes = genes.copy()
            self.fitness = None

        def evaluate(self):
            if self.fitness is None:
                self.fitness = objective_func(self.genes)
            return self.fitness

        def mutate(self, mutation_rate=0.1, mutation_strength=0.1):
            if random.random() < mutation_rate:
                # Add Gaussian noise
                mutation = np.random.normal(0, mutation_strength, dimensions)
                self.genes = np.clip(self.genes + mutation, bounds[0], bounds[1])
                self.fitness = None  # Reset fitness

        def crossover(self, other):
            # Uniform crossover
            mask = np.random.random(dimensions) < 0.5
            child1_genes = np.where(mask, self.genes, other.genes)
            child2_genes = np.where(mask, other.genes, self.genes)
            return Individual(child1_genes), Individual(child2_genes)

    # Initialize population
    population = [Individual() for _ in range(problem_size)]

    # Evaluate initial population
    for individual in population:
        individual.evaluate()

    # Evolution statistics
    best_fitness_history = []
    average_fitness_history = []
    diversity_history = []

    # Main evolution loop
    for generation in range(generations):
        if generation % (generations // 10) == 0:
            print(f"Generation {generation}/{generations}")

        # Selection (tournament selection)
        def tournament_selection(pop, tournament_size=3):
            tournament = random.sample(pop, tournament_size)
            return min(tournament, key=lambda ind: ind.evaluate())

        # Create new population
        new_population = []

        # Elitism - keep best 10%
        elite_size = max(1, problem_size // 10)
        elite = sorted(population, key=lambda ind: ind.evaluate())[:elite_size]
        new_population.extend([Individual(ind.genes) for ind in elite])

        # Generate offspring
        while len(new_population) < problem_size:
            parent1 = tournament_selection(population)
            parent2 = tournament_selection(population)

            if random.random() < 0.8:  # Crossover probability
                child1, child2 = parent1.crossover(parent2)
            else:
                child1, child2 = Individual(parent1.genes), Individual(parent2.genes)

            # Adaptive mutation rate
            mutation_rate = 0.1 * (1 + generation / generations)
            child1.mutate(mutation_rate=mutation_rate)
            child2.mutate(mutation_rate=mutation_rate)

            new_population.extend([child1, child2])

        # Trim to exact population size
        new_population = new_population[:problem_size]
        population = new_population

        # Evaluate new population
        for individual in population:
            individual.evaluate()

        # Statistics
        fitnesses = [ind.fitness for ind in population]
        best_fitness = min(fitnesses)
        average_fitness = np.mean(fitnesses)

        # Population diversity (average pairwise distance)
        if generation % 10 == 0:  # Calculate diversity every 10 generations
            sample_size = min(100, problem_size)
            sample_pop = random.sample(population, sample_size)
            distances = []
            for i in range(len(sample_pop)):
                for j in range(i+1, len(sample_pop)):
                    dist = np.linalg.norm(sample_pop[i].genes - sample_pop[j].genes)
                    distances.append(dist)
            diversity = np.mean(distances) if distances else 0
            diversity_history.append(diversity)

        best_fitness_history.append(best_fitness)
        average_fitness_history.append(average_fitness)

    # Final results
    best_individual = min(population, key=lambda ind: ind.evaluate())

    return {
        'objective_function': objective_name,
        'dimensions': dimensions,
        'population_size': problem_size,
        'generations': generations,
        'best_fitness': best_individual.fitness,
        'best_solution': best_individual.genes.tolist(),
        'convergence_history': {
            'best_fitness': best_fitness_history[::10],  # Every 10th generation
            'average_fitness': average_fitness_history[::10],
            'diversity': diversity_history
        },
        'final_population_stats': {
            'best_fitness': min(fitnesses),
            'worst_fitness': max(fitnesses),
            'average_fitness': np.mean(fitnesses),
            'fitness_std': np.std(fitnesses)
        }
    }

# Run genetic algorithm optimization
ga_results = genetic_algorithm_optimization(problem_size=500, generations=200)

print(f"\nGENETIC ALGORITHM OPTIMIZATION COMPLETE")
print(f"Function: {ga_results['objective_function']}")
print(f"Dimensions: {ga_results['dimensions']}")
print(f"Best fitness: {ga_results['best_fitness']:.6f}")
print(f"Population size: {ga_results['population_size']}")
print(f"Generations: {ga_results['generations']}")

final_stats = ga_results['final_population_stats']
print(f"\nFinal population statistics:")
print(f"  Best: {final_stats['best_fitness']:.6f}")
print(f"  Average: {final_stats['average_fitness']:.6f}")
print(f"  Worst: {final_stats['worst_fitness']:.6f}")
print(f"  Std Dev: {final_stats['fitness_std']:.6f}")

Example 2: Engineering Simulation

Finite element analysis commonly run on SGE clusters:

[ ]:
@cluster(
    cores=12,
    memory="32GB",
    time="04:00:00",
    queue="all.q",
    pe="mpi 12"  # MPI parallel environment
)
def finite_element_stress_analysis(mesh_density="medium", material="steel", load_cases=5):
    """
    Simplified finite element stress analysis simulation.
    """
    import numpy as np
    from scipy.sparse import csr_matrix
    from scipy.sparse.linalg import spsolve
    import math

    # Material properties
    materials = {
        'steel': {'E': 200e9, 'nu': 0.3, 'yield_strength': 250e6, 'density': 7850},
        'aluminum': {'E': 70e9, 'nu': 0.33, 'yield_strength': 276e6, 'density': 2700},
        'titanium': {'E': 114e9, 'nu': 0.32, 'yield_strength': 880e6, 'density': 4500},
        'concrete': {'E': 30e9, 'nu': 0.2, 'yield_strength': 30e6, 'density': 2400}
    }

    mat_props = materials.get(material, materials['steel'])
    E = mat_props['E']  # Young's modulus
    nu = mat_props['nu']  # Poisson's ratio
    yield_strength = mat_props['yield_strength']
    density = mat_props['density']

    print(f"FEA Analysis - Material: {material}, Mesh: {mesh_density}, Load cases: {load_cases}")

    # Mesh generation parameters
    mesh_sizes = {
        'coarse': {'nx': 20, 'ny': 20, 'nz': 10},
        'medium': {'nx': 40, 'ny': 40, 'nz': 20},
        'fine': {'nx': 80, 'ny': 80, 'nz': 40}
    }

    mesh_params = mesh_sizes.get(mesh_density, mesh_sizes['medium'])
    nx, ny, nz = mesh_params['nx'], mesh_params['ny'], mesh_params['nz']

    # Geometry (simple beam)
    length, width, height = 2.0, 0.2, 0.1  # meters

    # Generate mesh
    def generate_3d_mesh(nx, ny, nz, length, width, height):
        """Generate 3D hexahedral mesh"""
        nodes = []
        elements = []

        # Generate nodes
        for k in range(nz + 1):
            for j in range(ny + 1):
                for i in range(nx + 1):
                    x = i * length / nx
                    y = j * width / ny
                    z = k * height / nz
                    nodes.append([x, y, z])

        # Generate elements (hexahedral)
        for k in range(nz):
            for j in range(ny):
                for i in range(nx):
                    # Node indices for hexahedral element
                    n1 = k * (nx + 1) * (ny + 1) + j * (nx + 1) + i
                    n2 = n1 + 1
                    n3 = n1 + (nx + 1) + 1
                    n4 = n1 + (nx + 1)
                    n5 = n1 + (nx + 1) * (ny + 1)
                    n6 = n5 + 1
                    n7 = n5 + (nx + 1) + 1
                    n8 = n5 + (nx + 1)

                    elements.append([n1, n2, n3, n4, n5, n6, n7, n8])

        return np.array(nodes), np.array(elements)

    nodes, elements = generate_3d_mesh(nx, ny, nz, length, width, height)
    n_nodes = len(nodes)
    n_elements = len(elements)
    n_dof = n_nodes * 3  # 3 DOF per node (x, y, z displacements)

    print(f"Mesh generated: {n_nodes:,} nodes, {n_elements:,} elements, {n_dof:,} DOF")

    # Material matrix (isotropic elasticity)
    def material_matrix_3d(E, nu):
        """3D elasticity matrix"""
        factor = E / ((1 + nu) * (1 - 2 * nu))
        D = np.zeros((6, 6))

        # Diagonal terms
        D[0, 0] = D[1, 1] = D[2, 2] = factor * (1 - nu)
        D[3, 3] = D[4, 4] = D[5, 5] = factor * (1 - 2 * nu) / 2

        # Off-diagonal terms
        D[0, 1] = D[0, 2] = D[1, 0] = D[1, 2] = D[2, 0] = D[2, 1] = factor * nu

        return D

    D_matrix = material_matrix_3d(E, nu)

    # Simplified stiffness matrix assembly
    def assemble_stiffness_matrix(nodes, elements, D_matrix):
        """Assemble global stiffness matrix (simplified)"""
        K_global = np.zeros((n_dof, n_dof))

        for elem_idx, element in enumerate(elements[:min(1000, len(elements))]):  # Limit for demo
            if elem_idx % 200 == 0:
                print(f"  Assembling element {elem_idx:,}/{len(elements):,}")

            # Element nodes
            elem_nodes = nodes[element]

            # Simplified element stiffness (using average properties)
            volume = length * width * height / n_elements
            k_elem = volume * np.eye(24) * E / (length**2)  # Simplified

            # Assembly
            for i, node_i in enumerate(element):
                for j, node_j in enumerate(element):
                    for di in range(3):
                        for dj in range(3):
                            row = node_i * 3 + di
                            col = node_j * 3 + dj
                            if row < n_dof and col < n_dof:
                                K_global[row, col] += k_elem[i*3+di, j*3+dj]

        return csr_matrix(K_global)

    print("Assembling stiffness matrix...")
    K = assemble_stiffness_matrix(nodes, elements, D_matrix)

    # Load case analysis
    load_case_results = []

    for case in range(load_cases):
        print(f"\nAnalyzing load case {case + 1}/{load_cases}...")

        # Define load case
        F = np.zeros(n_dof)

        if case == 0:  # Point load at free end
            # Find nodes at free end (x = length)
            free_end_nodes = np.where(np.abs(nodes[:, 0] - length) < 1e-6)[0]
            if len(free_end_nodes) > 0:
                center_node = free_end_nodes[len(free_end_nodes)//2]
                F[center_node * 3 + 2] = -1000  # 1kN downward

        elif case == 1:  # Distributed load
            # Apply distributed load to top surface
            top_nodes = np.where(np.abs(nodes[:, 2] - height) < 1e-6)[0]
            load_per_node = -100  # N per node
            for node in top_nodes:
                F[node * 3 + 2] = load_per_node

        elif case == 2:  # Torsional load
            # Apply moments at free end
            free_end_nodes = np.where(np.abs(nodes[:, 0] - length) < 1e-6)[0]
            for node in free_end_nodes:
                y, z = nodes[node, 1], nodes[node, 2]
                # Simplified torsion as equivalent forces
                F[node * 3 + 1] = 500 * (z - height/2)  # Simplified
                F[node * 3 + 2] = -500 * (y - width/2)

        elif case == 3:  # Thermal expansion
            # Simplified thermal load (equivalent forces)
            alpha = 12e-6  # Thermal expansion coefficient
            delta_T = 100  # Temperature change (K)
            thermal_strain = alpha * delta_T
            # Apply as equivalent forces (simplified)
            F += np.random.normal(0, E * thermal_strain / 1000, n_dof)

        else:  # Dynamic/random load
            # Random distributed forces
            np.random.seed(case * 123)
            F = np.random.normal(0, 50, n_dof)

        # Boundary conditions (fixed end)
        fixed_nodes = np.where(np.abs(nodes[:, 0]) < 1e-6)[0]
        fixed_dofs = []
        for node in fixed_nodes:
            fixed_dofs.extend([node * 3, node * 3 + 1, node * 3 + 2])

        # Apply boundary conditions
        K_reduced = K.copy()
        F_reduced = F.copy()

        # Zero out fixed DOFs
        for dof in fixed_dofs:
            if dof < n_dof:
                K_reduced[dof, :] = 0
                K_reduced[:, dof] = 0
                K_reduced[dof, dof] = 1
                F_reduced[dof] = 0

        # Solve for displacements
        print("  Solving linear system...")
        try:
            displacements = spsolve(K_reduced, F_reduced)
        except:
            # Fallback for singular matrices
            displacements = np.zeros(n_dof)
            print("  Warning: Singular matrix, using zero displacements")

        # Calculate stresses (simplified)
        max_displacement = np.max(np.abs(displacements))
        displacement_magnitude = np.sqrt(
            displacements[::3]**2 + displacements[1::3]**2 + displacements[2::3]**2
        )

        # Simplified stress calculation
        max_stress = E * max_displacement / length  # Rough estimate

        # Safety factor
        safety_factor = yield_strength / max_stress if max_stress > 0 else float('inf')

        case_result = {
            'case_id': case,
            'load_type': ['point_load', 'distributed', 'torsion', 'thermal', 'dynamic'][case],
            'max_displacement_m': max_displacement,
            'max_stress_Pa': max_stress,
            'safety_factor': min(safety_factor, 1000),  # Cap at 1000
            'total_force_N': np.sum(np.abs(F)),
            'displacement_distribution': {
                'mean': np.mean(displacement_magnitude),
                'std': np.std(displacement_magnitude),
                'max': np.max(displacement_magnitude)
            }
        }

        load_case_results.append(case_result)

        print(f"  Max displacement: {max_displacement:.2e} m")
        print(f"  Max stress: {max_stress:.2e} Pa")
        print(f"  Safety factor: {safety_factor:.2f}")

    # Summary analysis
    max_displacement_overall = max(case['max_displacement_m'] for case in load_case_results)
    max_stress_overall = max(case['max_stress_Pa'] for case in load_case_results)
    min_safety_factor = min(case['safety_factor'] for case in load_case_results)

    analysis_results = {
        'model_info': {
            'material': material,
            'mesh_density': mesh_density,
            'nodes': n_nodes,
            'elements': n_elements,
            'dof': n_dof,
            'geometry': {'length': length, 'width': width, 'height': height}
        },
        'material_properties': mat_props,
        'load_cases': load_case_results,
        'summary': {
            'max_displacement_m': max_displacement_overall,
            'max_stress_Pa': max_stress_overall,
            'min_safety_factor': min_safety_factor,
            'critical_load_case': min(load_case_results, key=lambda x: x['safety_factor'])['load_type'],
            'passes_safety_check': min_safety_factor > 2.0
        }
    }

    return analysis_results

# Run FEA stress analysis
fea_results = finite_element_stress_analysis(
    mesh_density="medium",
    material="steel",
    load_cases=3
)

print(f"\nFINITE ELEMENT ANALYSIS COMPLETE")
model_info = fea_results['model_info']
print(f"Material: {model_info['material']}")
print(f"Mesh: {model_info['nodes']:,} nodes, {model_info['elements']:,} elements")
print(f"DOF: {model_info['dof']:,}")

summary = fea_results['summary']
print(f"\nSummary Results:")
print(f"  Max displacement: {summary['max_displacement_m']:.2e} m")
print(f"  Max stress: {summary['max_stress_Pa']:.2e} Pa")
print(f"  Min safety factor: {summary['min_safety_factor']:.2f}")
print(f"  Critical load case: {summary['critical_load_case']}")
print(f"  Passes safety check: {summary['passes_safety_check']}")

Example 3: Multi-Objective Engineering Design

Use SGE task arrays for design optimization:

[ ]:
@cluster(
    cores=6,
    memory="24GB",
    time="02:00:00",
    queue="all.q",
    sge_array="1-25"  # SGE task array
)
def multi_objective_design_optimization(design_problem="beam_design"):
    """
    Multi-objective design optimization using SGE task arrays.
    Each task evaluates different design parameters.
    """
    import os
    import numpy as np
    import random
    from math import pi, sqrt

    # Get SGE task array index
    task_id = int(os.environ.get('SGE_TASK_ID', '1'))

    print(f"Design optimization task {task_id}")

    def beam_design_objectives(width, height, length, material_density=7850):
        """Calculate beam design objectives"""
        # Geometry constraints
        area = width * height
        moment_of_inertia = width * height**3 / 12
        volume = area * length
        mass = volume * material_density

        # Structural performance
        E = 200e9  # Young's modulus (Pa)
        max_load = 10000  # Maximum load (N)

        # Deflection calculation (simply supported beam)
        max_deflection = (5 * max_load * length**4) / (384 * E * moment_of_inertia)

        # Stress calculation
        max_moment = max_load * length / 4  # For simply supported beam
        max_stress = max_moment * (height / 2) / moment_of_inertia

        # Objectives to minimize
        objectives = {
            'mass': mass,                    # Minimize weight
            'deflection': max_deflection,    # Minimize deflection
            'stress': max_stress,           # Minimize stress
            'cost': mass * 2.5 + area * 10  # Material + manufacturing cost
        }

        # Constraints
        constraints = {
            'deflection_limit': max_deflection < length / 250,  # L/250 deflection limit
            'stress_limit': max_stress < 250e6,                # Yield stress limit
            'aspect_ratio': height / width < 5,               # Practical aspect ratio
            'minimum_thickness': width > 0.01 and height > 0.01  # Minimum thickness
        }

        return objectives, constraints

    def truss_design_objectives(member_areas, topology, material_density=2700):
        """Calculate truss design objectives"""
        # Simplified truss analysis
        n_members = len(member_areas)
        total_length = sum(topology)  # Simplified total length
        total_volume = sum(area * length for area, length in zip(member_areas, topology))
        total_mass = total_volume * material_density

        # Simplified stiffness calculation
        E = 70e9  # Aluminum Young's modulus
        avg_stiffness = E * sum(member_areas) / n_members

        # Simplified stress analysis
        applied_load = 5000  # N
        avg_stress = applied_load / sum(member_areas)

        objectives = {
            'mass': total_mass,
            'compliance': 1 / avg_stiffness,  # Inverse of stiffness
            'max_stress': avg_stress,
            'cost': total_mass * 3.0 + n_members * 50  # Material + connection cost
        }

        constraints = {
            'stress_limit': avg_stress < 276e6,  # Aluminum yield
            'buckling_check': all(area > 1e-4 for area in member_areas),  # Min area
            'geometric_feasibility': len(member_areas) >= 3  # Minimum members
        }

        return objectives, constraints

    # Set up design space for this task
    np.random.seed(task_id * 42)  # Reproducible but different per task

    if design_problem == "beam_design":
        # Generate design variables for beam
        width = np.random.uniform(0.05, 0.5)     # 5cm to 50cm
        height = np.random.uniform(0.1, 1.0)     # 10cm to 100cm
        length = np.random.uniform(2.0, 10.0)    # 2m to 10m

        objectives, constraints = beam_design_objectives(width, height, length)
        design_vars = {'width': width, 'height': height, 'length': length}

    elif design_problem == "truss_design":
        # Generate design variables for truss
        n_members = random.randint(5, 15)
        member_areas = np.random.uniform(1e-4, 1e-2, n_members)  # 1cm² to 100cm²
        topology = np.random.uniform(0.5, 3.0, n_members)       # Member lengths

        objectives, constraints = truss_design_objectives(member_areas, topology)
        design_vars = {
            'n_members': n_members,
            'member_areas': member_areas.tolist(),
            'topology': topology.tolist()
        }

    else:
        raise ValueError(f"Unknown design problem: {design_problem}")

    # Check constraint feasibility
    feasible = all(constraints.values())
    n_violated_constraints = sum(1 for satisfied in constraints.values() if not satisfied)

    # Calculate Pareto performance metrics
    def normalize_objectives(objectives):
        """Normalize objectives for multi-objective comparison"""
        # Reference values for normalization (approximate)
        if design_problem == "beam_design":
            ref_values = {
                'mass': 1000,        # kg
                'deflection': 0.01,  # m
                'stress': 100e6,     # Pa
                'cost': 5000         # currency units
            }
        else:  # truss_design
            ref_values = {
                'mass': 500,           # kg
                'compliance': 1e-9,    # 1/N
                'max_stress': 100e6,   # Pa
                'cost': 3000           # currency units
            }

        normalized = {}
        for obj, value in objectives.items():
            if obj in ref_values:
                normalized[obj] = value / ref_values[obj]
            else:
                normalized[obj] = value

        return normalized

    normalized_objectives = normalize_objectives(objectives)

    # Calculate aggregate performance metrics
    weighted_sum = sum(normalized_objectives.values())  # Equal weights
    max_objective = max(normalized_objectives.values())

    # Performance score (lower is better)
    if feasible:
        performance_score = weighted_sum
    else:
        # Penalty for infeasible designs
        performance_score = weighted_sum * (1 + 10 * n_violated_constraints)

    # Compile results
    design_result = {
        'task_id': task_id,
        'design_problem': design_problem,
        'design_variables': design_vars,
        'objectives': objectives,
        'normalized_objectives': normalized_objectives,
        'constraints': constraints,
        'feasible': feasible,
        'constraints_violated': n_violated_constraints,
        'performance_metrics': {
            'weighted_sum': weighted_sum,
            'max_objective': max_objective,
            'performance_score': performance_score
        },
        'design_quality': {
            'excellent': performance_score < 2.0 and feasible,
            'good': performance_score < 4.0 and feasible,
            'acceptable': performance_score < 8.0 and feasible,
            'poor': not feasible or performance_score >= 8.0
        }
    }

    return design_result

# Run design optimization (this would be one task of the SGE array)
design_result = multi_objective_design_optimization("beam_design")

print(f"\nDESIGN OPTIMIZATION - Task {design_result['task_id']}")
print(f"Problem: {design_result['design_problem']}")
print(f"Feasible: {design_result['feasible']}")

if design_result['design_problem'] == 'beam_design':
    vars = design_result['design_variables']
    print(f"\nDesign Variables:")
    print(f"  Width: {vars['width']:.3f} m")
    print(f"  Height: {vars['height']:.3f} m")
    print(f"  Length: {vars['length']:.3f} m")

print(f"\nObjectives:")
for obj, value in design_result['objectives'].items():
    if 'stress' in obj or 'deflection' in obj:
        print(f"  {obj}: {value:.2e}")
    else:
        print(f"  {obj}: {value:.2f}")

perf = design_result['performance_metrics']
print(f"\nPerformance Score: {perf['performance_score']:.2f}")

quality = design_result['design_quality']
for level, is_level in quality.items():
    if is_level:
        print(f"Design Quality: {level.upper()}")
        break

SGE Parallel Environments and Resource Management

[ ]:
def configure_sge_parallel_environments():
    """
    Examples of different SGE parallel environment configurations.
    """

    # Common SGE parallel environments
    pe_configs = {
        'smp': {
            'description': 'Symmetric Multi-Processing (shared memory)',
            'use_case': 'Multi-threaded applications on single node',
            'example_cores': [2, 4, 8, 16, 32],
            'clustrix_config': {
                'cores': 8,
                'pe': 'smp 8',
                'memory': '32GB'
            }
        },
        'mpi': {
            'description': 'Message Passing Interface (distributed memory)',
            'use_case': 'Distributed parallel applications across nodes',
            'example_cores': [8, 16, 32, 64, 128],
            'clustrix_config': {
                'cores': 32,
                'pe': 'mpi 32',
                'memory': '128GB'
            }
        },
        'openmp': {
            'description': 'OpenMP parallel environment',
            'use_case': 'OpenMP applications with thread parallelism',
            'example_cores': [4, 8, 12, 16],
            'clustrix_config': {
                'cores': 12,
                'pe': 'openmp 12',
                'memory': '48GB'
            }
        },
        'hybrid': {
            'description': 'Hybrid MPI+OpenMP',
            'use_case': 'Applications using both MPI and OpenMP',
            'example_cores': [16, 32, 64],
            'clustrix_config': {
                'cores': 32,
                'pe': 'hybrid 32',
                'memory': '128GB'
            }
        }
    }

    print("SGE Parallel Environment Configurations:")
    print("=" * 60)

    for pe_name, config in pe_configs.items():
        print(f"\n{pe_name.upper()}:")
        print(f"  Description: {config['description']}")
        print(f"  Use case: {config['use_case']}")
        print(f"  Common core counts: {config['example_cores']}")
        print(f"  Clustrix configuration:")
        for key, value in config['clustrix_config'].items():
            print(f"    {key}: {value}")

    return pe_configs

# SGE resource selection helper
def select_sge_resources(application_type, problem_size, parallelization="smp"):
    """
    Select appropriate SGE resources based on application characteristics.
    """

    # Base resource requirements by application type
    app_requirements = {
        'optimization': {'base_cores': 8, 'memory_per_core': 4, 'time_factor': 1.5},
        'simulation': {'base_cores': 16, 'memory_per_core': 6, 'time_factor': 2.0},
        'ml_training': {'base_cores': 4, 'memory_per_core': 8, 'time_factor': 1.0},
        'data_analysis': {'base_cores': 6, 'memory_per_core': 4, 'time_factor': 0.8},
        'engineering': {'base_cores': 12, 'memory_per_core': 5, 'time_factor': 1.8}
    }

    if application_type not in app_requirements:
        application_type = 'simulation'  # Default

    req = app_requirements[application_type]

    # Scale resources based on problem size
    size_multipliers = {
        'small': 0.5,
        'medium': 1.0,
        'large': 2.0,
        'xlarge': 4.0
    }

    multiplier = size_multipliers.get(problem_size, 1.0)

    cores = max(1, int(req['base_cores'] * multiplier))
    memory_gb = max(4, int(cores * req['memory_per_core']))

    # Time estimation (hours)
    base_time = 2.0  # hours
    time_hours = max(0.5, base_time * req['time_factor'] * multiplier)

    # Format time as HH:MM:SS
    hours = int(time_hours)
    minutes = int((time_hours - hours) * 60)
    time_str = f"{hours:02d}:{minutes:02d}:00"

    # Queue selection
    if time_hours <= 1:
        queue = "short.q"
    elif time_hours <= 8:
        queue = "all.q"
    else:
        queue = "long.q"

    sge_config = {
        'cores': cores,
        'memory': f"{memory_gb}GB",
        'time': time_str,
        'queue': queue,
        'pe': f"{parallelization} {cores}"
    }

    return sge_config

# Display PE configurations
pe_configs = configure_sge_parallel_environments()

# Example resource selections
print("\n\nSGE Resource Selection Examples:")
print("=" * 60)

examples = [
    ('optimization', 'medium', 'smp'),
    ('simulation', 'large', 'mpi'),
    ('ml_training', 'small', 'openmp'),
    ('engineering', 'xlarge', 'hybrid')
]

for app_type, size, parallel in examples:
    config = select_sge_resources(app_type, size, parallel)
    print(f"\n{app_type.upper()} ({size}, {parallel}):")
    for key, value in config.items():
        print(f"  {key}: {value}")

SGE Job Monitoring and Management

[ ]:
from clustrix import ClusterExecutor

# Connect to SGE cluster and check status
config = clustrix.get_config()
executor = ClusterExecutor(config)

try:
    executor.connect()
    print("✓ Successfully connected to SGE cluster")

    # Check SGE version and configuration
    stdout, stderr = executor._execute_command("qconf -sconf")
    if "SGE" in stdout or "Grid Engine" in stdout:
        print("✓ SGE/Grid Engine detected")

    # List available queues
    stdout, stderr = executor._execute_command("qconf -sql")
    if stdout:
        queues = stdout.strip().split('\n')
        print(f"\nAvailable queues ({len(queues)}):")
        for queue in queues[:10]:  # Show first 10
            print(f"  {queue}")
        if len(queues) > 10:
            print(f"  ... and {len(queues) - 10} more")

    # List parallel environments
    stdout, stderr = executor._execute_command("qconf -spl")
    if stdout:
        pes = stdout.strip().split('\n')
        print(f"\nParallel environments ({len(pes)}):")
        for pe in pes:
            print(f"  {pe}")

    # Check queue status
    stdout, stderr = executor._execute_command("qstat -g c")
    if stdout:
        print("\nCluster queue summary:")
        lines = stdout.strip().split('\n')
        for line in lines[:15]:  # Show header and first few lines
            print(f"  {line}")

    # Check user's jobs
    username = config.username
    stdout, stderr = executor._execute_command(f"qstat -u {username}")
    if stdout and len(stdout.strip().split('\n')) > 2:
        print(f"\nYour current jobs:")
        lines = stdout.strip().split('\n')
        for line in lines:
            print(f"  {line}")
    else:
        print(f"\n✓ No jobs currently running for user {username}")

    # Check host information
    stdout, stderr = executor._execute_command("qhost | head -20")
    if stdout:
        print("\nHost information (sample):")
        lines = stdout.strip().split('\n')
        for line in lines:
            print(f"  {line}")

    executor.disconnect()
    print("\n✓ SGE cluster monitoring completed successfully")

except Exception as e:
    print(f"✗ Connection or monitoring failed: {e}")
    print("Please check your SGE cluster configuration")

Summary

This tutorial covered SGE cluster usage with Clustrix:

  1. SGE Configuration - Setting up Clustrix for SGE/Grid Engine clusters

  2. Mathematical Optimization - Genetic algorithms and complex optimization

  3. Engineering Simulation - Finite element analysis and structural design

  4. Multi-Objective Design - Engineering design optimization with task arrays

  5. Parallel Environments - SMP, MPI, OpenMP, and hybrid configurations

  6. Resource Management - Intelligent resource selection and queue management

  7. Job Monitoring - SGE cluster status and job management

Key SGE Features:

  • Parallel Environments: Use pe parameter for SMP, MPI, OpenMP configurations

  • Task Arrays: Efficient parameter sweeps with sge_array parameter

  • Queue Selection: Choose appropriate queues based on runtime requirements

  • Resource Specification: Flexible core, memory, and time allocation

  • Job Dependencies: Chain jobs with SGE dependency mechanisms

  • Advanced Scheduling: Priority, reservation, and resource policies

Best Practices:

  • Parallel Environment Selection: Choose PE based on application parallelization model

  • Resource Estimation: Use application profiling to estimate requirements accurately

  • Queue Strategy: Match job characteristics to appropriate queue policies

  • Array Jobs: Use task arrays for embarrassingly parallel workloads

  • Monitoring: Regular cluster status checks for optimal resource utilization

Next Steps:

For more information, visit the Clustrix Documentation.