SGE (Sun Grid Engine) Tutorial¶
This tutorial demonstrates how to use Clustrix with SGE (Sun Grid Engine) clusters, including Open Grid Scheduler and other SGE-compatible systems.
Prerequisites¶
Access to an SGE cluster
SSH key configured for the cluster
Clustrix installed:
pip install clustrix
[ ]:
# Install Clustrix (uncomment if needed)
# !pip install clustrix
import clustrix
from clustrix import cluster, configure
import numpy as np
SGE Configuration¶
Configure Clustrix for your SGE cluster:
[ ]:
# Configure for SGE cluster
configure(
cluster_type="sge",
cluster_host="sge-cluster.org", # Replace with your cluster
username="your-username", # Replace with your username
key_file="~/.ssh/id_rsa", # SSH key path
# SGE resource defaults
default_cores=4,
default_memory="8GB",
default_time="02:00:00",
default_queue="all.q", # Common SGE queue name
# SGE-specific settings
remote_work_dir="/home/your-username/clustrix",
# Environment modules
module_loads=["python/3.9"],
# Job management
cleanup_on_success=True,
max_parallel_jobs=30
)
print("SGE cluster configured successfully!")
Example 1: Mathematical Optimization¶
SGE clusters are often used for optimization problems:
[ ]:
@cluster(
cores=8,
memory="16GB",
time="01:30:00",
queue="all.q",
pe="smp 8" # SGE parallel environment
)
def genetic_algorithm_optimization(problem_size=1000, generations=500):
"""
Genetic Algorithm optimization on SGE cluster.
"""
import numpy as np
import random
from functools import partial
def rastrigin_function(x):
"""Rastrigin function - a multimodal optimization benchmark"""
A = 10
n = len(x)
return A * n + sum(xi**2 - A * np.cos(2 * np.pi * xi) for xi in x)
def rosenbrock_function(x):
"""Rosenbrock function - another optimization benchmark"""
return sum(100 * (x[i+1] - x[i]**2)**2 + (1 - x[i])**2 for i in range(len(x)-1))
def sphere_function(x):
"""Simple sphere function"""
return sum(xi**2 for xi in x)
# Choose objective function
objective_functions = {
'rastrigin': rastrigin_function,
'rosenbrock': rosenbrock_function,
'sphere': sphere_function
}
objective_name = 'rastrigin' # Can be parameterized
objective_func = objective_functions[objective_name]
# Problem dimensions
dimensions = min(50, problem_size // 20) # Scale dimensions with problem size
bounds = (-5.12, 5.12) if objective_name == 'rastrigin' else (-2.0, 2.0)
print(f"Optimizing {objective_name} function in {dimensions} dimensions")
print(f"Population size: {problem_size}, Generations: {generations}")
class Individual:
def __init__(self, genes=None):
if genes is None:
self.genes = np.random.uniform(bounds[0], bounds[1], dimensions)
else:
self.genes = genes.copy()
self.fitness = None
def evaluate(self):
if self.fitness is None:
self.fitness = objective_func(self.genes)
return self.fitness
def mutate(self, mutation_rate=0.1, mutation_strength=0.1):
if random.random() < mutation_rate:
# Add Gaussian noise
mutation = np.random.normal(0, mutation_strength, dimensions)
self.genes = np.clip(self.genes + mutation, bounds[0], bounds[1])
self.fitness = None # Reset fitness
def crossover(self, other):
# Uniform crossover
mask = np.random.random(dimensions) < 0.5
child1_genes = np.where(mask, self.genes, other.genes)
child2_genes = np.where(mask, other.genes, self.genes)
return Individual(child1_genes), Individual(child2_genes)
# Initialize population
population = [Individual() for _ in range(problem_size)]
# Evaluate initial population
for individual in population:
individual.evaluate()
# Evolution statistics
best_fitness_history = []
average_fitness_history = []
diversity_history = []
# Main evolution loop
for generation in range(generations):
if generation % (generations // 10) == 0:
print(f"Generation {generation}/{generations}")
# Selection (tournament selection)
def tournament_selection(pop, tournament_size=3):
tournament = random.sample(pop, tournament_size)
return min(tournament, key=lambda ind: ind.evaluate())
# Create new population
new_population = []
# Elitism - keep best 10%
elite_size = max(1, problem_size // 10)
elite = sorted(population, key=lambda ind: ind.evaluate())[:elite_size]
new_population.extend([Individual(ind.genes) for ind in elite])
# Generate offspring
while len(new_population) < problem_size:
parent1 = tournament_selection(population)
parent2 = tournament_selection(population)
if random.random() < 0.8: # Crossover probability
child1, child2 = parent1.crossover(parent2)
else:
child1, child2 = Individual(parent1.genes), Individual(parent2.genes)
# Adaptive mutation rate
mutation_rate = 0.1 * (1 + generation / generations)
child1.mutate(mutation_rate=mutation_rate)
child2.mutate(mutation_rate=mutation_rate)
new_population.extend([child1, child2])
# Trim to exact population size
new_population = new_population[:problem_size]
population = new_population
# Evaluate new population
for individual in population:
individual.evaluate()
# Statistics
fitnesses = [ind.fitness for ind in population]
best_fitness = min(fitnesses)
average_fitness = np.mean(fitnesses)
# Population diversity (average pairwise distance)
if generation % 10 == 0: # Calculate diversity every 10 generations
sample_size = min(100, problem_size)
sample_pop = random.sample(population, sample_size)
distances = []
for i in range(len(sample_pop)):
for j in range(i+1, len(sample_pop)):
dist = np.linalg.norm(sample_pop[i].genes - sample_pop[j].genes)
distances.append(dist)
diversity = np.mean(distances) if distances else 0
diversity_history.append(diversity)
best_fitness_history.append(best_fitness)
average_fitness_history.append(average_fitness)
# Final results
best_individual = min(population, key=lambda ind: ind.evaluate())
return {
'objective_function': objective_name,
'dimensions': dimensions,
'population_size': problem_size,
'generations': generations,
'best_fitness': best_individual.fitness,
'best_solution': best_individual.genes.tolist(),
'convergence_history': {
'best_fitness': best_fitness_history[::10], # Every 10th generation
'average_fitness': average_fitness_history[::10],
'diversity': diversity_history
},
'final_population_stats': {
'best_fitness': min(fitnesses),
'worst_fitness': max(fitnesses),
'average_fitness': np.mean(fitnesses),
'fitness_std': np.std(fitnesses)
}
}
# Run genetic algorithm optimization
ga_results = genetic_algorithm_optimization(problem_size=500, generations=200)
print(f"\nGENETIC ALGORITHM OPTIMIZATION COMPLETE")
print(f"Function: {ga_results['objective_function']}")
print(f"Dimensions: {ga_results['dimensions']}")
print(f"Best fitness: {ga_results['best_fitness']:.6f}")
print(f"Population size: {ga_results['population_size']}")
print(f"Generations: {ga_results['generations']}")
final_stats = ga_results['final_population_stats']
print(f"\nFinal population statistics:")
print(f" Best: {final_stats['best_fitness']:.6f}")
print(f" Average: {final_stats['average_fitness']:.6f}")
print(f" Worst: {final_stats['worst_fitness']:.6f}")
print(f" Std Dev: {final_stats['fitness_std']:.6f}")
Example 2: Engineering Simulation¶
Finite element analysis commonly run on SGE clusters:
[ ]:
@cluster(
cores=12,
memory="32GB",
time="04:00:00",
queue="all.q",
pe="mpi 12" # MPI parallel environment
)
def finite_element_stress_analysis(mesh_density="medium", material="steel", load_cases=5):
"""
Simplified finite element stress analysis simulation.
"""
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse.linalg import spsolve
import math
# Material properties
materials = {
'steel': {'E': 200e9, 'nu': 0.3, 'yield_strength': 250e6, 'density': 7850},
'aluminum': {'E': 70e9, 'nu': 0.33, 'yield_strength': 276e6, 'density': 2700},
'titanium': {'E': 114e9, 'nu': 0.32, 'yield_strength': 880e6, 'density': 4500},
'concrete': {'E': 30e9, 'nu': 0.2, 'yield_strength': 30e6, 'density': 2400}
}
mat_props = materials.get(material, materials['steel'])
E = mat_props['E'] # Young's modulus
nu = mat_props['nu'] # Poisson's ratio
yield_strength = mat_props['yield_strength']
density = mat_props['density']
print(f"FEA Analysis - Material: {material}, Mesh: {mesh_density}, Load cases: {load_cases}")
# Mesh generation parameters
mesh_sizes = {
'coarse': {'nx': 20, 'ny': 20, 'nz': 10},
'medium': {'nx': 40, 'ny': 40, 'nz': 20},
'fine': {'nx': 80, 'ny': 80, 'nz': 40}
}
mesh_params = mesh_sizes.get(mesh_density, mesh_sizes['medium'])
nx, ny, nz = mesh_params['nx'], mesh_params['ny'], mesh_params['nz']
# Geometry (simple beam)
length, width, height = 2.0, 0.2, 0.1 # meters
# Generate mesh
def generate_3d_mesh(nx, ny, nz, length, width, height):
"""Generate 3D hexahedral mesh"""
nodes = []
elements = []
# Generate nodes
for k in range(nz + 1):
for j in range(ny + 1):
for i in range(nx + 1):
x = i * length / nx
y = j * width / ny
z = k * height / nz
nodes.append([x, y, z])
# Generate elements (hexahedral)
for k in range(nz):
for j in range(ny):
for i in range(nx):
# Node indices for hexahedral element
n1 = k * (nx + 1) * (ny + 1) + j * (nx + 1) + i
n2 = n1 + 1
n3 = n1 + (nx + 1) + 1
n4 = n1 + (nx + 1)
n5 = n1 + (nx + 1) * (ny + 1)
n6 = n5 + 1
n7 = n5 + (nx + 1) + 1
n8 = n5 + (nx + 1)
elements.append([n1, n2, n3, n4, n5, n6, n7, n8])
return np.array(nodes), np.array(elements)
nodes, elements = generate_3d_mesh(nx, ny, nz, length, width, height)
n_nodes = len(nodes)
n_elements = len(elements)
n_dof = n_nodes * 3 # 3 DOF per node (x, y, z displacements)
print(f"Mesh generated: {n_nodes:,} nodes, {n_elements:,} elements, {n_dof:,} DOF")
# Material matrix (isotropic elasticity)
def material_matrix_3d(E, nu):
"""3D elasticity matrix"""
factor = E / ((1 + nu) * (1 - 2 * nu))
D = np.zeros((6, 6))
# Diagonal terms
D[0, 0] = D[1, 1] = D[2, 2] = factor * (1 - nu)
D[3, 3] = D[4, 4] = D[5, 5] = factor * (1 - 2 * nu) / 2
# Off-diagonal terms
D[0, 1] = D[0, 2] = D[1, 0] = D[1, 2] = D[2, 0] = D[2, 1] = factor * nu
return D
D_matrix = material_matrix_3d(E, nu)
# Simplified stiffness matrix assembly
def assemble_stiffness_matrix(nodes, elements, D_matrix):
"""Assemble global stiffness matrix (simplified)"""
K_global = np.zeros((n_dof, n_dof))
for elem_idx, element in enumerate(elements[:min(1000, len(elements))]): # Limit for demo
if elem_idx % 200 == 0:
print(f" Assembling element {elem_idx:,}/{len(elements):,}")
# Element nodes
elem_nodes = nodes[element]
# Simplified element stiffness (using average properties)
volume = length * width * height / n_elements
k_elem = volume * np.eye(24) * E / (length**2) # Simplified
# Assembly
for i, node_i in enumerate(element):
for j, node_j in enumerate(element):
for di in range(3):
for dj in range(3):
row = node_i * 3 + di
col = node_j * 3 + dj
if row < n_dof and col < n_dof:
K_global[row, col] += k_elem[i*3+di, j*3+dj]
return csr_matrix(K_global)
print("Assembling stiffness matrix...")
K = assemble_stiffness_matrix(nodes, elements, D_matrix)
# Load case analysis
load_case_results = []
for case in range(load_cases):
print(f"\nAnalyzing load case {case + 1}/{load_cases}...")
# Define load case
F = np.zeros(n_dof)
if case == 0: # Point load at free end
# Find nodes at free end (x = length)
free_end_nodes = np.where(np.abs(nodes[:, 0] - length) < 1e-6)[0]
if len(free_end_nodes) > 0:
center_node = free_end_nodes[len(free_end_nodes)//2]
F[center_node * 3 + 2] = -1000 # 1kN downward
elif case == 1: # Distributed load
# Apply distributed load to top surface
top_nodes = np.where(np.abs(nodes[:, 2] - height) < 1e-6)[0]
load_per_node = -100 # N per node
for node in top_nodes:
F[node * 3 + 2] = load_per_node
elif case == 2: # Torsional load
# Apply moments at free end
free_end_nodes = np.where(np.abs(nodes[:, 0] - length) < 1e-6)[0]
for node in free_end_nodes:
y, z = nodes[node, 1], nodes[node, 2]
# Simplified torsion as equivalent forces
F[node * 3 + 1] = 500 * (z - height/2) # Simplified
F[node * 3 + 2] = -500 * (y - width/2)
elif case == 3: # Thermal expansion
# Simplified thermal load (equivalent forces)
alpha = 12e-6 # Thermal expansion coefficient
delta_T = 100 # Temperature change (K)
thermal_strain = alpha * delta_T
# Apply as equivalent forces (simplified)
F += np.random.normal(0, E * thermal_strain / 1000, n_dof)
else: # Dynamic/random load
# Random distributed forces
np.random.seed(case * 123)
F = np.random.normal(0, 50, n_dof)
# Boundary conditions (fixed end)
fixed_nodes = np.where(np.abs(nodes[:, 0]) < 1e-6)[0]
fixed_dofs = []
for node in fixed_nodes:
fixed_dofs.extend([node * 3, node * 3 + 1, node * 3 + 2])
# Apply boundary conditions
K_reduced = K.copy()
F_reduced = F.copy()
# Zero out fixed DOFs
for dof in fixed_dofs:
if dof < n_dof:
K_reduced[dof, :] = 0
K_reduced[:, dof] = 0
K_reduced[dof, dof] = 1
F_reduced[dof] = 0
# Solve for displacements
print(" Solving linear system...")
try:
displacements = spsolve(K_reduced, F_reduced)
except:
# Fallback for singular matrices
displacements = np.zeros(n_dof)
print(" Warning: Singular matrix, using zero displacements")
# Calculate stresses (simplified)
max_displacement = np.max(np.abs(displacements))
displacement_magnitude = np.sqrt(
displacements[::3]**2 + displacements[1::3]**2 + displacements[2::3]**2
)
# Simplified stress calculation
max_stress = E * max_displacement / length # Rough estimate
# Safety factor
safety_factor = yield_strength / max_stress if max_stress > 0 else float('inf')
case_result = {
'case_id': case,
'load_type': ['point_load', 'distributed', 'torsion', 'thermal', 'dynamic'][case],
'max_displacement_m': max_displacement,
'max_stress_Pa': max_stress,
'safety_factor': min(safety_factor, 1000), # Cap at 1000
'total_force_N': np.sum(np.abs(F)),
'displacement_distribution': {
'mean': np.mean(displacement_magnitude),
'std': np.std(displacement_magnitude),
'max': np.max(displacement_magnitude)
}
}
load_case_results.append(case_result)
print(f" Max displacement: {max_displacement:.2e} m")
print(f" Max stress: {max_stress:.2e} Pa")
print(f" Safety factor: {safety_factor:.2f}")
# Summary analysis
max_displacement_overall = max(case['max_displacement_m'] for case in load_case_results)
max_stress_overall = max(case['max_stress_Pa'] for case in load_case_results)
min_safety_factor = min(case['safety_factor'] for case in load_case_results)
analysis_results = {
'model_info': {
'material': material,
'mesh_density': mesh_density,
'nodes': n_nodes,
'elements': n_elements,
'dof': n_dof,
'geometry': {'length': length, 'width': width, 'height': height}
},
'material_properties': mat_props,
'load_cases': load_case_results,
'summary': {
'max_displacement_m': max_displacement_overall,
'max_stress_Pa': max_stress_overall,
'min_safety_factor': min_safety_factor,
'critical_load_case': min(load_case_results, key=lambda x: x['safety_factor'])['load_type'],
'passes_safety_check': min_safety_factor > 2.0
}
}
return analysis_results
# Run FEA stress analysis
fea_results = finite_element_stress_analysis(
mesh_density="medium",
material="steel",
load_cases=3
)
print(f"\nFINITE ELEMENT ANALYSIS COMPLETE")
model_info = fea_results['model_info']
print(f"Material: {model_info['material']}")
print(f"Mesh: {model_info['nodes']:,} nodes, {model_info['elements']:,} elements")
print(f"DOF: {model_info['dof']:,}")
summary = fea_results['summary']
print(f"\nSummary Results:")
print(f" Max displacement: {summary['max_displacement_m']:.2e} m")
print(f" Max stress: {summary['max_stress_Pa']:.2e} Pa")
print(f" Min safety factor: {summary['min_safety_factor']:.2f}")
print(f" Critical load case: {summary['critical_load_case']}")
print(f" Passes safety check: {summary['passes_safety_check']}")
Example 3: Multi-Objective Engineering Design¶
Use SGE task arrays for design optimization:
[ ]:
@cluster(
cores=6,
memory="24GB",
time="02:00:00",
queue="all.q",
sge_array="1-25" # SGE task array
)
def multi_objective_design_optimization(design_problem="beam_design"):
"""
Multi-objective design optimization using SGE task arrays.
Each task evaluates different design parameters.
"""
import os
import numpy as np
import random
from math import pi, sqrt
# Get SGE task array index
task_id = int(os.environ.get('SGE_TASK_ID', '1'))
print(f"Design optimization task {task_id}")
def beam_design_objectives(width, height, length, material_density=7850):
"""Calculate beam design objectives"""
# Geometry constraints
area = width * height
moment_of_inertia = width * height**3 / 12
volume = area * length
mass = volume * material_density
# Structural performance
E = 200e9 # Young's modulus (Pa)
max_load = 10000 # Maximum load (N)
# Deflection calculation (simply supported beam)
max_deflection = (5 * max_load * length**4) / (384 * E * moment_of_inertia)
# Stress calculation
max_moment = max_load * length / 4 # For simply supported beam
max_stress = max_moment * (height / 2) / moment_of_inertia
# Objectives to minimize
objectives = {
'mass': mass, # Minimize weight
'deflection': max_deflection, # Minimize deflection
'stress': max_stress, # Minimize stress
'cost': mass * 2.5 + area * 10 # Material + manufacturing cost
}
# Constraints
constraints = {
'deflection_limit': max_deflection < length / 250, # L/250 deflection limit
'stress_limit': max_stress < 250e6, # Yield stress limit
'aspect_ratio': height / width < 5, # Practical aspect ratio
'minimum_thickness': width > 0.01 and height > 0.01 # Minimum thickness
}
return objectives, constraints
def truss_design_objectives(member_areas, topology, material_density=2700):
"""Calculate truss design objectives"""
# Simplified truss analysis
n_members = len(member_areas)
total_length = sum(topology) # Simplified total length
total_volume = sum(area * length for area, length in zip(member_areas, topology))
total_mass = total_volume * material_density
# Simplified stiffness calculation
E = 70e9 # Aluminum Young's modulus
avg_stiffness = E * sum(member_areas) / n_members
# Simplified stress analysis
applied_load = 5000 # N
avg_stress = applied_load / sum(member_areas)
objectives = {
'mass': total_mass,
'compliance': 1 / avg_stiffness, # Inverse of stiffness
'max_stress': avg_stress,
'cost': total_mass * 3.0 + n_members * 50 # Material + connection cost
}
constraints = {
'stress_limit': avg_stress < 276e6, # Aluminum yield
'buckling_check': all(area > 1e-4 for area in member_areas), # Min area
'geometric_feasibility': len(member_areas) >= 3 # Minimum members
}
return objectives, constraints
# Set up design space for this task
np.random.seed(task_id * 42) # Reproducible but different per task
if design_problem == "beam_design":
# Generate design variables for beam
width = np.random.uniform(0.05, 0.5) # 5cm to 50cm
height = np.random.uniform(0.1, 1.0) # 10cm to 100cm
length = np.random.uniform(2.0, 10.0) # 2m to 10m
objectives, constraints = beam_design_objectives(width, height, length)
design_vars = {'width': width, 'height': height, 'length': length}
elif design_problem == "truss_design":
# Generate design variables for truss
n_members = random.randint(5, 15)
member_areas = np.random.uniform(1e-4, 1e-2, n_members) # 1cm² to 100cm²
topology = np.random.uniform(0.5, 3.0, n_members) # Member lengths
objectives, constraints = truss_design_objectives(member_areas, topology)
design_vars = {
'n_members': n_members,
'member_areas': member_areas.tolist(),
'topology': topology.tolist()
}
else:
raise ValueError(f"Unknown design problem: {design_problem}")
# Check constraint feasibility
feasible = all(constraints.values())
n_violated_constraints = sum(1 for satisfied in constraints.values() if not satisfied)
# Calculate Pareto performance metrics
def normalize_objectives(objectives):
"""Normalize objectives for multi-objective comparison"""
# Reference values for normalization (approximate)
if design_problem == "beam_design":
ref_values = {
'mass': 1000, # kg
'deflection': 0.01, # m
'stress': 100e6, # Pa
'cost': 5000 # currency units
}
else: # truss_design
ref_values = {
'mass': 500, # kg
'compliance': 1e-9, # 1/N
'max_stress': 100e6, # Pa
'cost': 3000 # currency units
}
normalized = {}
for obj, value in objectives.items():
if obj in ref_values:
normalized[obj] = value / ref_values[obj]
else:
normalized[obj] = value
return normalized
normalized_objectives = normalize_objectives(objectives)
# Calculate aggregate performance metrics
weighted_sum = sum(normalized_objectives.values()) # Equal weights
max_objective = max(normalized_objectives.values())
# Performance score (lower is better)
if feasible:
performance_score = weighted_sum
else:
# Penalty for infeasible designs
performance_score = weighted_sum * (1 + 10 * n_violated_constraints)
# Compile results
design_result = {
'task_id': task_id,
'design_problem': design_problem,
'design_variables': design_vars,
'objectives': objectives,
'normalized_objectives': normalized_objectives,
'constraints': constraints,
'feasible': feasible,
'constraints_violated': n_violated_constraints,
'performance_metrics': {
'weighted_sum': weighted_sum,
'max_objective': max_objective,
'performance_score': performance_score
},
'design_quality': {
'excellent': performance_score < 2.0 and feasible,
'good': performance_score < 4.0 and feasible,
'acceptable': performance_score < 8.0 and feasible,
'poor': not feasible or performance_score >= 8.0
}
}
return design_result
# Run design optimization (this would be one task of the SGE array)
design_result = multi_objective_design_optimization("beam_design")
print(f"\nDESIGN OPTIMIZATION - Task {design_result['task_id']}")
print(f"Problem: {design_result['design_problem']}")
print(f"Feasible: {design_result['feasible']}")
if design_result['design_problem'] == 'beam_design':
vars = design_result['design_variables']
print(f"\nDesign Variables:")
print(f" Width: {vars['width']:.3f} m")
print(f" Height: {vars['height']:.3f} m")
print(f" Length: {vars['length']:.3f} m")
print(f"\nObjectives:")
for obj, value in design_result['objectives'].items():
if 'stress' in obj or 'deflection' in obj:
print(f" {obj}: {value:.2e}")
else:
print(f" {obj}: {value:.2f}")
perf = design_result['performance_metrics']
print(f"\nPerformance Score: {perf['performance_score']:.2f}")
quality = design_result['design_quality']
for level, is_level in quality.items():
if is_level:
print(f"Design Quality: {level.upper()}")
break
SGE Parallel Environments and Resource Management¶
[ ]:
def configure_sge_parallel_environments():
"""
Examples of different SGE parallel environment configurations.
"""
# Common SGE parallel environments
pe_configs = {
'smp': {
'description': 'Symmetric Multi-Processing (shared memory)',
'use_case': 'Multi-threaded applications on single node',
'example_cores': [2, 4, 8, 16, 32],
'clustrix_config': {
'cores': 8,
'pe': 'smp 8',
'memory': '32GB'
}
},
'mpi': {
'description': 'Message Passing Interface (distributed memory)',
'use_case': 'Distributed parallel applications across nodes',
'example_cores': [8, 16, 32, 64, 128],
'clustrix_config': {
'cores': 32,
'pe': 'mpi 32',
'memory': '128GB'
}
},
'openmp': {
'description': 'OpenMP parallel environment',
'use_case': 'OpenMP applications with thread parallelism',
'example_cores': [4, 8, 12, 16],
'clustrix_config': {
'cores': 12,
'pe': 'openmp 12',
'memory': '48GB'
}
},
'hybrid': {
'description': 'Hybrid MPI+OpenMP',
'use_case': 'Applications using both MPI and OpenMP',
'example_cores': [16, 32, 64],
'clustrix_config': {
'cores': 32,
'pe': 'hybrid 32',
'memory': '128GB'
}
}
}
print("SGE Parallel Environment Configurations:")
print("=" * 60)
for pe_name, config in pe_configs.items():
print(f"\n{pe_name.upper()}:")
print(f" Description: {config['description']}")
print(f" Use case: {config['use_case']}")
print(f" Common core counts: {config['example_cores']}")
print(f" Clustrix configuration:")
for key, value in config['clustrix_config'].items():
print(f" {key}: {value}")
return pe_configs
# SGE resource selection helper
def select_sge_resources(application_type, problem_size, parallelization="smp"):
"""
Select appropriate SGE resources based on application characteristics.
"""
# Base resource requirements by application type
app_requirements = {
'optimization': {'base_cores': 8, 'memory_per_core': 4, 'time_factor': 1.5},
'simulation': {'base_cores': 16, 'memory_per_core': 6, 'time_factor': 2.0},
'ml_training': {'base_cores': 4, 'memory_per_core': 8, 'time_factor': 1.0},
'data_analysis': {'base_cores': 6, 'memory_per_core': 4, 'time_factor': 0.8},
'engineering': {'base_cores': 12, 'memory_per_core': 5, 'time_factor': 1.8}
}
if application_type not in app_requirements:
application_type = 'simulation' # Default
req = app_requirements[application_type]
# Scale resources based on problem size
size_multipliers = {
'small': 0.5,
'medium': 1.0,
'large': 2.0,
'xlarge': 4.0
}
multiplier = size_multipliers.get(problem_size, 1.0)
cores = max(1, int(req['base_cores'] * multiplier))
memory_gb = max(4, int(cores * req['memory_per_core']))
# Time estimation (hours)
base_time = 2.0 # hours
time_hours = max(0.5, base_time * req['time_factor'] * multiplier)
# Format time as HH:MM:SS
hours = int(time_hours)
minutes = int((time_hours - hours) * 60)
time_str = f"{hours:02d}:{minutes:02d}:00"
# Queue selection
if time_hours <= 1:
queue = "short.q"
elif time_hours <= 8:
queue = "all.q"
else:
queue = "long.q"
sge_config = {
'cores': cores,
'memory': f"{memory_gb}GB",
'time': time_str,
'queue': queue,
'pe': f"{parallelization} {cores}"
}
return sge_config
# Display PE configurations
pe_configs = configure_sge_parallel_environments()
# Example resource selections
print("\n\nSGE Resource Selection Examples:")
print("=" * 60)
examples = [
('optimization', 'medium', 'smp'),
('simulation', 'large', 'mpi'),
('ml_training', 'small', 'openmp'),
('engineering', 'xlarge', 'hybrid')
]
for app_type, size, parallel in examples:
config = select_sge_resources(app_type, size, parallel)
print(f"\n{app_type.upper()} ({size}, {parallel}):")
for key, value in config.items():
print(f" {key}: {value}")
SGE Job Monitoring and Management¶
[ ]:
from clustrix import ClusterExecutor
# Connect to SGE cluster and check status
config = clustrix.get_config()
executor = ClusterExecutor(config)
try:
executor.connect()
print("✓ Successfully connected to SGE cluster")
# Check SGE version and configuration
stdout, stderr = executor._execute_command("qconf -sconf")
if "SGE" in stdout or "Grid Engine" in stdout:
print("✓ SGE/Grid Engine detected")
# List available queues
stdout, stderr = executor._execute_command("qconf -sql")
if stdout:
queues = stdout.strip().split('\n')
print(f"\nAvailable queues ({len(queues)}):")
for queue in queues[:10]: # Show first 10
print(f" {queue}")
if len(queues) > 10:
print(f" ... and {len(queues) - 10} more")
# List parallel environments
stdout, stderr = executor._execute_command("qconf -spl")
if stdout:
pes = stdout.strip().split('\n')
print(f"\nParallel environments ({len(pes)}):")
for pe in pes:
print(f" {pe}")
# Check queue status
stdout, stderr = executor._execute_command("qstat -g c")
if stdout:
print("\nCluster queue summary:")
lines = stdout.strip().split('\n')
for line in lines[:15]: # Show header and first few lines
print(f" {line}")
# Check user's jobs
username = config.username
stdout, stderr = executor._execute_command(f"qstat -u {username}")
if stdout and len(stdout.strip().split('\n')) > 2:
print(f"\nYour current jobs:")
lines = stdout.strip().split('\n')
for line in lines:
print(f" {line}")
else:
print(f"\n✓ No jobs currently running for user {username}")
# Check host information
stdout, stderr = executor._execute_command("qhost | head -20")
if stdout:
print("\nHost information (sample):")
lines = stdout.strip().split('\n')
for line in lines:
print(f" {line}")
executor.disconnect()
print("\n✓ SGE cluster monitoring completed successfully")
except Exception as e:
print(f"✗ Connection or monitoring failed: {e}")
print("Please check your SGE cluster configuration")
Summary¶
This tutorial covered SGE cluster usage with Clustrix:
SGE Configuration - Setting up Clustrix for SGE/Grid Engine clusters
Mathematical Optimization - Genetic algorithms and complex optimization
Engineering Simulation - Finite element analysis and structural design
Multi-Objective Design - Engineering design optimization with task arrays
Parallel Environments - SMP, MPI, OpenMP, and hybrid configurations
Resource Management - Intelligent resource selection and queue management
Job Monitoring - SGE cluster status and job management
Key SGE Features:¶
Parallel Environments: Use
peparameter for SMP, MPI, OpenMP configurationsTask Arrays: Efficient parameter sweeps with
sge_arrayparameterQueue Selection: Choose appropriate queues based on runtime requirements
Resource Specification: Flexible core, memory, and time allocation
Job Dependencies: Chain jobs with SGE dependency mechanisms
Advanced Scheduling: Priority, reservation, and resource policies
Best Practices:¶
Parallel Environment Selection: Choose PE based on application parallelization model
Resource Estimation: Use application profiling to estimate requirements accurately
Queue Strategy: Match job characteristics to appropriate queue policies
Array Jobs: Use task arrays for embarrassingly parallel workloads
Monitoring: Regular cluster status checks for optimal resource utilization
Next Steps:¶
Try SLURM Tutorial for SLURM-specific features
Explore PBS Tutorial for PBS/Torque clusters
Check Kubernetes Tutorial for containerized computing
Review SSH Tutorial for simple remote execution
For more information, visit the Clustrix Documentation.