SSH Remote Execution Tutorial¶
This tutorial demonstrates how to use Clustrix for simple SSH-based remote execution without a job scheduler. This is perfect for executing functions on remote servers, workstations, or cloud instances.
Prerequisites¶
SSH access to a remote server
SSH key-based authentication configured
Python installed on the remote server
Clustrix installed:
pip install clustrix
[ ]:
# Install Clustrix (uncomment if needed)
# !pip install clustrix
import clustrix
from clustrix import cluster, configure
import numpy as np
SSH Configuration¶
Configure Clustrix for SSH-based remote execution:
[ ]:
# Configure for SSH remote execution
configure(
cluster_type="ssh",
cluster_host="your-server.example.com", # Replace with your server
username="your-username", # Replace with your username
key_file="~/.ssh/id_rsa", # Path to SSH private key
port=22, # SSH port (default 22)
# Remote environment
remote_work_dir="/home/your-username/clustrix", # Remote working directory
python_executable="python3", # Python command on remote server
# Execution settings
cleanup_on_success=True, # Clean up remote files after success
max_parallel_jobs=5, # Limit concurrent executions
# Optional: Remote environment setup
conda_env_name="myenv", # Activate conda environment
# or
# virtualenv_path="/path/to/venv", # Activate virtual environment
)
print("SSH remote execution configured successfully!")
Example 1: Basic Remote Computation¶
Execute a simple mathematical computation remotely:
[ ]:
@cluster
def basic_remote_computation(n=1000000):
"""
Simple computation executed on remote server.
"""
import math
import time
from datetime import datetime
print(f"Starting computation on remote server at {datetime.now()}")
print(f"Computing sum of squares for {n:,} numbers")
start_time = time.time()
# Compute sum of squares
total = sum(i*i for i in range(n))
# Compute some mathematical functions
sqrt_total = math.sqrt(total)
log_total = math.log(total)
end_time = time.time()
execution_time = end_time - start_time
result = {
'n': n,
'sum_of_squares': total,
'sqrt_sum': sqrt_total,
'log_sum': log_total,
'execution_time_seconds': execution_time,
'completion_time': datetime.now().isoformat()
}
print(f"Computation completed in {execution_time:.2f} seconds")
return result
# Execute on remote server
result = basic_remote_computation(500000)
print(f"\nREMOTE COMPUTATION COMPLETE")
print(f"Numbers processed: {result['n']:,}")
print(f"Sum of squares: {result['sum_of_squares']:,}")
print(f"Square root of sum: {result['sqrt_sum']:,.2f}")
print(f"Execution time: {result['execution_time_seconds']:.2f} seconds")
Example 2: Remote Data Processing¶
Process data files on a remote server:
[ ]:
@cluster
def remote_data_processing(data_size=100000, output_format="json"):
"""
Generate and process data on remote server.
"""
import json
import csv
import os
import tempfile
import random
import statistics
from datetime import datetime, timedelta
print(f"Processing {data_size:,} data points on remote server")
# Generate synthetic time-series data
def generate_synthetic_data(size):
data = []
base_date = datetime(2023, 1, 1)
for i in range(size):
timestamp = base_date + timedelta(hours=i)
# Generate synthetic metrics
base_value = 100 + 20 * random.sin(2 * 3.14159 * i / 24) # Daily pattern
noise = random.gauss(0, 5)
value = max(0, base_value + noise)
data_point = {
'timestamp': timestamp.isoformat(),
'value': round(value, 2),
'category': random.choice(['A', 'B', 'C']),
'status': random.choice(['active', 'inactive']) if random.random() > 0.1 else 'error'
}
data.append(data_point)
return data
# Generate the dataset
print("Generating synthetic dataset...")
dataset = generate_synthetic_data(data_size)
# Process the data
print("Processing data...")
# Basic statistics
values = [point['value'] for point in dataset]
stats = {
'count': len(values),
'mean': statistics.mean(values),
'median': statistics.median(values),
'stdev': statistics.stdev(values) if len(values) > 1 else 0,
'min': min(values),
'max': max(values)
}
# Category analysis
category_counts = {}
status_counts = {}
for point in dataset:
category = point['category']
status = point['status']
category_counts[category] = category_counts.get(category, 0) + 1
status_counts[status] = status_counts.get(status, 0) + 1
# Time-based analysis
hourly_averages = {}
for point in dataset:
hour = datetime.fromisoformat(point['timestamp']).hour
if hour not in hourly_averages:
hourly_averages[hour] = []
hourly_averages[hour].append(point['value'])
# Calculate hourly means
hourly_means = {hour: statistics.mean(values) for hour, values in hourly_averages.items()}
peak_hour = max(hourly_means.keys(), key=lambda h: hourly_means[h])
trough_hour = min(hourly_means.keys(), key=lambda h: hourly_means[h])
# Anomaly detection (simple threshold-based)
threshold = stats['mean'] + 2 * stats['stdev']
anomalies = [point for point in dataset if point['value'] > threshold]
# Error analysis
error_points = [point for point in dataset if point['status'] == 'error']
error_rate = len(error_points) / len(dataset) * 100
# Save processed data to temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix=f'.{output_format}', delete=False) as f:
output_file = f.name
if output_format == 'json':
json.dump(dataset, f, indent=2)
elif output_format == 'csv':
if dataset:
writer = csv.DictWriter(f, fieldnames=dataset[0].keys())
writer.writeheader()
writer.writerows(dataset)
file_size = os.path.getsize(output_file)
# Cleanup
os.unlink(output_file)
processing_results = {
'data_info': {
'total_points': len(dataset),
'output_format': output_format,
'file_size_bytes': file_size
},
'basic_statistics': stats,
'category_distribution': category_counts,
'status_distribution': status_counts,
'temporal_analysis': {
'peak_hour': peak_hour,
'trough_hour': trough_hour,
'peak_average': hourly_means[peak_hour],
'trough_average': hourly_means[trough_hour],
'daily_variation': hourly_means[peak_hour] - hourly_means[trough_hour]
},
'quality_metrics': {
'anomalies_detected': len(anomalies),
'anomaly_rate_percent': len(anomalies) / len(dataset) * 100,
'error_rate_percent': error_rate,
'data_completeness': (len(dataset) - len(error_points)) / len(dataset) * 100
},
'processing_metadata': {
'processed_on': datetime.now().isoformat(),
'processing_location': 'remote_server'
}
}
return processing_results
# Process data on remote server
data_results = remote_data_processing(data_size=50000, output_format="json")
print(f"\nREMOTE DATA PROCESSING COMPLETE")
data_info = data_results['data_info']
print(f"Data points processed: {data_info['total_points']:,}")
print(f"Output format: {data_info['output_format']}")
print(f"Generated file size: {data_info['file_size_bytes']:,} bytes")
stats = data_results['basic_statistics']
print(f"\nStatistics:")
print(f" Mean: {stats['mean']:.2f}")
print(f" Median: {stats['median']:.2f}")
print(f" Std Dev: {stats['stdev']:.2f}")
print(f" Range: {stats['min']:.2f} - {stats['max']:.2f}")
temporal = data_results['temporal_analysis']
print(f"\nTemporal Analysis:")
print(f" Peak hour: {temporal['peak_hour']}:00 (avg: {temporal['peak_average']:.2f})")
print(f" Trough hour: {temporal['trough_hour']}:00 (avg: {temporal['trough_average']:.2f})")
print(f" Daily variation: {temporal['daily_variation']:.2f}")
quality = data_results['quality_metrics']
print(f"\nData Quality:")
print(f" Anomalies: {quality['anomalies_detected']} ({quality['anomaly_rate_percent']:.2f}%)")
print(f" Error rate: {quality['error_rate_percent']:.2f}%")
print(f" Data completeness: {quality['data_completeness']:.1f}%")
Example 3: Remote File System Operations¶
Perform file system operations on the remote server:
[ ]:
@cluster
def remote_filesystem_analysis(directory_path="/tmp", max_depth=3):
"""
Analyze file system structure on remote server.
"""
import os
import stat
import pwd
import grp
import time
from datetime import datetime
from collections import defaultdict
print(f"Analyzing directory: {directory_path} (max depth: {max_depth})")
def get_file_info(file_path):
"""Get detailed file information"""
try:
stat_info = os.stat(file_path)
# Get owner and group names
try:
owner = pwd.getpwuid(stat_info.st_uid).pw_name
except KeyError:
owner = str(stat_info.st_uid)
try:
group = grp.getgrgid(stat_info.st_gid).gr_name
except KeyError:
group = str(stat_info.st_gid)
return {
'size': stat_info.st_size,
'owner': owner,
'group': group,
'permissions': oct(stat_info.st_mode)[-3:],
'modified_time': datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
'is_file': stat.S_ISREG(stat_info.st_mode),
'is_dir': stat.S_ISDIR(stat_info.st_mode),
'is_symlink': stat.S_ISLNK(stat_info.st_mode)
}
except (OSError, PermissionError) as e:
return {'error': str(e)}
def analyze_directory(path, current_depth=0):
"""Recursively analyze directory structure"""
if current_depth > max_depth:
return {}
analysis = {
'path': path,
'depth': current_depth,
'contents': [],
'stats': {
'total_files': 0,
'total_dirs': 0,
'total_size': 0,
'file_types': defaultdict(int),
'largest_files': [],
'permission_stats': defaultdict(int)
}
}
try:
entries = os.listdir(path)
entries.sort() # Sort for consistent output
# Limit entries to prevent overwhelming output
if len(entries) > 100:
entries = entries[:100]
analysis['truncated'] = True
for entry in entries:
entry_path = os.path.join(path, entry)
file_info = get_file_info(entry_path)
if 'error' in file_info:
continue
entry_data = {
'name': entry,
'path': entry_path,
**file_info
}
analysis['contents'].append(entry_data)
# Update statistics
if file_info['is_file']:
analysis['stats']['total_files'] += 1
analysis['stats']['total_size'] += file_info['size']
# File extension analysis
_, ext = os.path.splitext(entry)
if ext:
analysis['stats']['file_types'][ext.lower()] += 1
else:
analysis['stats']['file_types']['no_extension'] += 1
# Track largest files
analysis['stats']['largest_files'].append({
'name': entry,
'size': file_info['size'],
'path': entry_path
})
elif file_info['is_dir']:
analysis['stats']['total_dirs'] += 1
# Recurse into subdirectory
if current_depth < max_depth:
subdir_analysis = analyze_directory(entry_path, current_depth + 1)
if subdir_analysis:
# Aggregate subdirectory stats
analysis['stats']['total_files'] += subdir_analysis['stats']['total_files']
analysis['stats']['total_dirs'] += subdir_analysis['stats']['total_dirs']
analysis['stats']['total_size'] += subdir_analysis['stats']['total_size']
# Permission statistics
analysis['stats']['permission_stats'][file_info['permissions']] += 1
# Sort largest files
analysis['stats']['largest_files'].sort(key=lambda x: x['size'], reverse=True)
analysis['stats']['largest_files'] = analysis['stats']['largest_files'][:10] # Top 10
# Convert defaultdict to regular dict for JSON serialization
analysis['stats']['file_types'] = dict(analysis['stats']['file_types'])
analysis['stats']['permission_stats'] = dict(analysis['stats']['permission_stats'])
except PermissionError:
analysis['error'] = f"Permission denied accessing {path}"
except OSError as e:
analysis['error'] = f"OS error accessing {path}: {str(e)}"
return analysis
# Get system information
def get_system_info():
"""Get basic system information"""
import platform
import shutil
# Disk usage for the analyzed directory
try:
disk_usage = shutil.disk_usage(directory_path)
disk_info = {
'total_bytes': disk_usage.total,
'used_bytes': disk_usage.used,
'free_bytes': disk_usage.free,
'used_percent': (disk_usage.used / disk_usage.total) * 100
}
except OSError:
disk_info = {'error': 'Could not get disk usage'}
return {
'hostname': platform.node(),
'system': platform.system(),
'release': platform.release(),
'machine': platform.machine(),
'python_version': platform.python_version(),
'disk_usage': disk_info,
'current_user': os.environ.get('USER', 'unknown'),
'home_directory': os.environ.get('HOME', 'unknown'),
'working_directory': os.getcwd()
}
# Perform the analysis
print("Starting filesystem analysis...")
start_time = time.time()
directory_analysis = analyze_directory(directory_path)
system_info = get_system_info()
end_time = time.time()
analysis_time = end_time - start_time
# Summary statistics
summary = {
'analysis_parameters': {
'target_directory': directory_path,
'max_depth': max_depth,
'analysis_time_seconds': analysis_time
},
'directory_summary': directory_analysis['stats'] if 'stats' in directory_analysis else {},
'system_information': system_info,
'analysis_metadata': {
'completed_at': datetime.now().isoformat(),
'analysis_location': 'remote_server'
}
}
# Include sample of directory contents (first 20 items)
if 'contents' in directory_analysis:
summary['sample_contents'] = directory_analysis['contents'][:20]
print(f"Filesystem analysis completed in {analysis_time:.2f} seconds")
return summary
# Analyze remote filesystem
fs_results = remote_filesystem_analysis(directory_path="/usr/local", max_depth=2)
print(f"\nREMOTE FILESYSTEM ANALYSIS COMPLETE")
params = fs_results['analysis_parameters']
print(f"Directory analyzed: {params['target_directory']}")
print(f"Max depth: {params['max_depth']}")
print(f"Analysis time: {params['analysis_time_seconds']:.2f} seconds")
if 'directory_summary' in fs_results and fs_results['directory_summary']:
summary = fs_results['directory_summary']
print(f"\nDirectory Summary:")
print(f" Total files: {summary.get('total_files', 0):,}")
print(f" Total directories: {summary.get('total_dirs', 0):,}")
print(f" Total size: {summary.get('total_size', 0):,} bytes")
if 'largest_files' in summary and summary['largest_files']:
print(f"\nLargest files:")
for i, file_info in enumerate(summary['largest_files'][:5], 1):
print(f" {i}. {file_info['name']}: {file_info['size']:,} bytes")
if 'file_types' in summary and summary['file_types']:
print(f"\nFile types:")
sorted_types = sorted(summary['file_types'].items(), key=lambda x: x[1], reverse=True)
for ext, count in sorted_types[:5]:
print(f" {ext}: {count} files")
sys_info = fs_results['system_information']
print(f"\nSystem Information:")
print(f" Hostname: {sys_info['hostname']}")
print(f" OS: {sys_info['system']} {sys_info['release']}")
print(f" Architecture: {sys_info['machine']}")
print(f" Python: {sys_info['python_version']}")
print(f" Current user: {sys_info['current_user']}")
if 'disk_usage' in sys_info and 'error' not in sys_info['disk_usage']:
disk = sys_info['disk_usage']
print(f"\nDisk Usage:")
print(f" Total: {disk['total_bytes']:,} bytes")
print(f" Used: {disk['used_bytes']:,} bytes ({disk['used_percent']:.1f}%)")
print(f" Free: {disk['free_bytes']:,} bytes")
Example 4: Remote Environment Testing¶
Test the remote Python environment and available packages:
[ ]:
@cluster
def test_remote_environment():
"""
Test and report on the remote Python environment.
"""
import sys
import os
import platform
import subprocess
import importlib
from datetime import datetime
print("Testing remote Python environment...")
# Basic Python information
python_info = {
'version': sys.version,
'version_info': {
'major': sys.version_info.major,
'minor': sys.version_info.minor,
'micro': sys.version_info.micro
},
'executable': sys.executable,
'platform': sys.platform,
'path': sys.path[:5], # First 5 path entries
}
# System information
system_info = {
'hostname': platform.node(),
'system': platform.system(),
'release': platform.release(),
'version': platform.version(),
'machine': platform.machine(),
'processor': platform.processor()
}
# Environment variables (selected)
env_vars = {
'USER': os.environ.get('USER', 'not_set'),
'HOME': os.environ.get('HOME', 'not_set'),
'PATH': os.environ.get('PATH', 'not_set')[:200] + '...', # Truncate PATH
'SHELL': os.environ.get('SHELL', 'not_set'),
'LANG': os.environ.get('LANG', 'not_set'),
'PWD': os.environ.get('PWD', 'not_set'),
'PYTHONPATH': os.environ.get('PYTHONPATH', 'not_set')
}
# Test common packages
common_packages = [
'numpy', 'pandas', 'scipy', 'matplotlib', 'sklearn', 'requests',
'flask', 'django', 'pytest', 'jupyter', 'ipython', 'click',
'yaml', 'json', 'csv', 'sqlite3', 'pickle', 'datetime',
'os', 'sys', 'subprocess', 'threading', 'multiprocessing'
]
package_status = {}
for package in common_packages:
try:
module = importlib.import_module(package)
version = getattr(module, '__version__', 'unknown')
package_status[package] = {
'available': True,
'version': version,
'location': getattr(module, '__file__', 'built-in')
}
except ImportError:
package_status[package] = {
'available': False,
'error': 'Not installed'
}
except Exception as e:
package_status[package] = {
'available': False,
'error': str(e)
}
# Get pip list (if available)
pip_packages = []
try:
result = subprocess.run(
[sys.executable, '-m', 'pip', 'list', '--format=freeze'],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
pip_packages = result.stdout.strip().split('\n')[:20] # First 20 packages
except Exception as e:
pip_packages = [f"Error getting pip list: {str(e)}"]
# Test basic functionality
functionality_tests = {}
# Test file I/O
try:
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=True) as f:
f.write("test")
f.flush()
functionality_tests['file_io'] = {'status': 'ok', 'message': 'File I/O working'}
except Exception as e:
functionality_tests['file_io'] = {'status': 'error', 'message': str(e)}
# Test networking
try:
import socket
socket.gethostname()
functionality_tests['networking'] = {'status': 'ok', 'message': 'Basic networking working'}
except Exception as e:
functionality_tests['networking'] = {'status': 'error', 'message': str(e)}
# Test multiprocessing
try:
import multiprocessing
cpu_count = multiprocessing.cpu_count()
functionality_tests['multiprocessing'] = {
'status': 'ok',
'message': f'Multiprocessing available, {cpu_count} CPUs detected'
}
except Exception as e:
functionality_tests['multiprocessing'] = {'status': 'error', 'message': str(e)}
# Test numerical computing
numerical_test = {'status': 'ok', 'tests': []}
try:
# Basic math
import math
result = math.sqrt(16)
numerical_test['tests'].append(f"math.sqrt(16) = {result}")
# NumPy if available
if package_status['numpy']['available']:
import numpy as np
arr = np.array([1, 2, 3, 4, 5])
mean_val = np.mean(arr)
numerical_test['tests'].append(f"numpy.mean([1,2,3,4,5]) = {mean_val}")
functionality_tests['numerical_computing'] = numerical_test
except Exception as e:
functionality_tests['numerical_computing'] = {'status': 'error', 'message': str(e)}
# Performance test
performance_test = {}
try:
import time
start_time = time.time()
# Simple computation benchmark
total = sum(i*i for i in range(100000))
end_time = time.time()
computation_time = end_time - start_time
performance_test = {
'computation_time_seconds': computation_time,
'result': total,
'operations_per_second': 100000 / computation_time if computation_time > 0 else 0
}
except Exception as e:
performance_test = {'error': str(e)}
environment_report = {
'test_metadata': {
'test_timestamp': datetime.now().isoformat(),
'test_location': 'remote_server'
},
'python_information': python_info,
'system_information': system_info,
'environment_variables': env_vars,
'package_availability': {
'total_tested': len(common_packages),
'available': sum(1 for pkg in package_status.values() if pkg.get('available', False)),
'unavailable': sum(1 for pkg in package_status.values() if not pkg.get('available', False)),
'details': package_status
},
'installed_packages_sample': pip_packages,
'functionality_tests': functionality_tests,
'performance_benchmark': performance_test
}
print("Remote environment testing completed")
return environment_report
# Test remote environment
env_results = test_remote_environment()
print(f"\nREMOTE ENVIRONMENT TEST COMPLETE")
python_info = env_results['python_information']
print(f"Python version: {python_info['version_info']['major']}.{python_info['version_info']['minor']}.{python_info['version_info']['micro']}")
print(f"Python executable: {python_info['executable']}")
system_info = env_results['system_information']
print(f"\nSystem: {system_info['system']} {system_info['release']}")
print(f"Hostname: {system_info['hostname']}")
print(f"Architecture: {system_info['machine']}")
pkg_info = env_results['package_availability']
print(f"\nPackage Availability:")
print(f" Available: {pkg_info['available']}/{pkg_info['total_tested']}")
print(f" Unavailable: {pkg_info['unavailable']}/{pkg_info['total_tested']}")
# Show some available packages
available_packages = [name for name, info in pkg_info['details'].items() if info.get('available', False)]
print(f" Key packages available: {', '.join(available_packages[:10])}")
func_tests = env_results['functionality_tests']
print(f"\nFunctionality Tests:")
for test_name, test_result in func_tests.items():
status = test_result.get('status', 'unknown')
message = test_result.get('message', 'No message')
print(f" {test_name}: {status.upper()} - {message}")
if 'error' not in env_results['performance_benchmark']:
perf = env_results['performance_benchmark']
print(f"\nPerformance Benchmark:")
print(f" Computation time: {perf['computation_time_seconds']:.4f} seconds")
print(f" Operations per second: {perf['operations_per_second']:,.0f}")
SSH Configuration Management¶
Best practices for SSH configuration with Clustrix:
[ ]:
def ssh_configuration_examples():
"""
Examples of different SSH configuration patterns.
"""
configurations = {
'basic_ssh': {
'description': 'Basic SSH connection with key authentication',
'config': {
'cluster_type': 'ssh',
'cluster_host': 'server.example.com',
'username': 'user',
'key_file': '~/.ssh/id_rsa',
'port': 22
},
'use_case': 'Single remote server or workstation'
},
'custom_port': {
'description': 'SSH connection with custom port',
'config': {
'cluster_type': 'ssh',
'cluster_host': 'secure-server.example.com',
'username': 'admin',
'key_file': '~/.ssh/admin_key',
'port': 2222
},
'use_case': 'Servers with non-standard SSH ports'
},
'password_auth': {
'description': 'SSH with password authentication (less secure)',
'config': {
'cluster_type': 'ssh',
'cluster_host': 'legacy-server.example.com',
'username': 'olduser',
'password': 'your-password' # Not recommended for production
},
'use_case': 'Legacy systems without key-based auth'
},
'conda_environment': {
'description': 'SSH with conda environment activation',
'config': {
'cluster_type': 'ssh',
'cluster_host': 'ml-server.example.com',
'username': 'researcher',
'key_file': '~/.ssh/research_key',
'conda_env_name': 'pytorch',
'python_executable': 'python'
},
'use_case': 'Servers with conda environments for specific packages'
},
'virtual_environment': {
'description': 'SSH with Python virtual environment',
'config': {
'cluster_type': 'ssh',
'cluster_host': 'dev-server.example.com',
'username': 'developer',
'key_file': '~/.ssh/dev_key',
'virtualenv_path': '/home/developer/venv/myproject',
'python_executable': 'python3'
},
'use_case': 'Development servers with project-specific environments'
},
'cloud_instance': {
'description': 'Cloud instance with specific configuration',
'config': {
'cluster_type': 'ssh',
'cluster_host': 'ec2-12-34-56-78.compute-1.amazonaws.com',
'username': 'ubuntu', # Common for Ubuntu AMIs
'key_file': '~/.ssh/aws-keypair.pem',
'remote_work_dir': '/tmp/clustrix',
'cleanup_on_success': True
},
'use_case': 'AWS, GCP, or Azure cloud instances'
}
}
print("SSH Configuration Examples:")
print("=" * 50)
for config_name, config_info in configurations.items():
print(f"\n{config_name.upper().replace('_', ' ')}:")
print(f" Description: {config_info['description']}")
print(f" Use case: {config_info['use_case']}")
print(f" Configuration:")
for key, value in config_info['config'].items():
print(f" {key}: {value}")
return configurations
def ssh_security_best_practices():
"""
SSH security best practices for Clustrix.
"""
practices = {
'key_management': {
'title': 'SSH Key Management',
'practices': [
'Use strong, unique SSH keys for each server/project',
'Prefer Ed25519 keys: ssh-keygen -t ed25519 -f ~/.ssh/clustrix_key',
'Use RSA 4096-bit keys if Ed25519 not supported: ssh-keygen -t rsa -b 4096',
'Protect private keys with strong passphrases',
'Set proper permissions: chmod 600 ~/.ssh/private_key',
'Regularly rotate keys (every 6-12 months)'
]
},
'connection_security': {
'title': 'Connection Security',
'practices': [
'Always use key-based authentication, avoid passwords',
'Disable SSH agent forwarding unless necessary',
'Use SSH config files for consistent settings',
'Enable SSH connection multiplexing for efficiency',
'Set reasonable connection timeouts',
'Use non-standard SSH ports when possible'
]
},
'server_hardening': {
'title': 'Server-Side Security',
'practices': [
'Disable SSH password authentication',
'Disable SSH root login',
'Use fail2ban or similar intrusion prevention',
'Regularly update SSH server software',
'Monitor SSH access logs',
'Use firewall rules to restrict SSH access'
]
},
'clustrix_specific': {
'title': 'Clustrix-Specific Security',
'practices': [
'Use dedicated SSH keys for Clustrix operations',
'Restrict remote work directories to user-specific paths',
'Enable cleanup_on_success to remove temporary files',
'Limit max_parallel_jobs to prevent resource exhaustion',
'Monitor remote execution logs for anomalies',
'Use isolated Python environments for execution'
]
}
}
print("\nSSH Security Best Practices:")
print("=" * 40)
for category, info in practices.items():
print(f"\n{info['title']}:")
for i, practice in enumerate(info['practices'], 1):
print(f" {i}. {practice}")
return practices
def ssh_troubleshooting_guide():
"""
Common SSH issues and solutions.
"""
issues = {
'connection_refused': {
'problem': 'Connection refused or timeout',
'solutions': [
'Check if SSH service is running on remote server',
'Verify correct hostname/IP address',
'Check if firewall is blocking SSH port',
'Confirm SSH port number (default 22)',
'Test connection with: ssh -v user@hostname'
]
},
'permission_denied': {
'problem': 'Permission denied (publickey)',
'solutions': [
'Verify SSH key is added to remote ~/.ssh/authorized_keys',
'Check SSH key permissions (600 for private key)',
'Ensure correct username for the server',
'Verify key file path in Clustrix configuration',
'Test key with: ssh -i ~/.ssh/key_file user@hostname'
]
},
'host_key_verification': {
'problem': 'Host key verification failed',
'solutions': [
'Add host to known_hosts: ssh-keyscan hostname >> ~/.ssh/known_hosts',
'Remove old host key: ssh-keygen -R hostname',
'Connect manually first to accept host key',
'Check if hostname/IP changed',
'Verify server authenticity before accepting'
]
},
'python_not_found': {
'problem': 'Python executable not found on remote server',
'solutions': [
'Specify correct python_executable in configuration',
'Check if Python is installed: which python3',
'Add Python to PATH on remote server',
'Use full path: /usr/bin/python3',
'Install Python if missing'
]
},
'environment_issues': {
'problem': 'Python environment or package issues',
'solutions': [
'Verify conda/virtualenv paths are correct',
'Check environment activation commands',
'Install missing packages in remote environment',
'Use pip install --user for user-level packages',
'Test environment manually: ssh user@host "source env/bin/activate && python"'
]
}
}
print("\nSSH Troubleshooting Guide:")
print("=" * 30)
for issue_name, issue_info in issues.items():
print(f"\n{issue_info['problem']}:")
for i, solution in enumerate(issue_info['solutions'], 1):
print(f" {i}. {solution}")
return issues
# Display all SSH guidance
ssh_configs = ssh_configuration_examples()
security_practices = ssh_security_best_practices()
troubleshooting = ssh_troubleshooting_guide()
SSH Connection Testing¶
[ ]:
def test_ssh_connection():
"""
Test SSH connection and basic functionality.
"""
from clustrix import ClusterExecutor
try:
# Get current configuration
config = clustrix.get_config()
if config.cluster_type != 'ssh':
print("Current configuration is not for SSH. Please configure for SSH first.")
return
print("Testing SSH connection...")
print(f"Host: {config.cluster_host}")
print(f"Username: {config.username}")
print(f"Port: {getattr(config, 'port', 22)}")
# Create executor and test connection
executor = ClusterExecutor(config)
print("\n1. Testing SSH connection...")
executor.connect()
print(" ✓ SSH connection successful")
print("\n2. Testing basic commands...")
# Test hostname
stdout, stderr = executor._execute_command("hostname")
if stdout:
print(f" ✓ Remote hostname: {stdout.strip()}")
# Test whoami
stdout, stderr = executor._execute_command("whoami")
if stdout:
print(f" ✓ Remote user: {stdout.strip()}")
# Test pwd
stdout, stderr = executor._execute_command("pwd")
if stdout:
print(f" ✓ Remote working directory: {stdout.strip()}")
print("\n3. Testing Python availability...")
# Test Python
python_cmd = getattr(config, 'python_executable', 'python3')
stdout, stderr = executor._execute_command(f"{python_cmd} --version")
if stdout:
print(f" ✓ Python version: {stdout.strip()}")
elif stderr:
print(f" ✓ Python version: {stderr.strip()}")
# Test Python path
stdout, stderr = executor._execute_command(f"which {python_cmd}")
if stdout:
print(f" ✓ Python executable: {stdout.strip()}")
print("\n4. Testing remote work directory...")
# Test work directory creation
work_dir = getattr(config, 'remote_work_dir', '/tmp/clustrix')
stdout, stderr = executor._execute_command(f"mkdir -p {work_dir} && echo 'Directory created'")
if "Directory created" in stdout:
print(f" ✓ Work directory accessible: {work_dir}")
# Test write permissions
test_file = f"{work_dir}/test_file.txt"
stdout, stderr = executor._execute_command(f"echo 'test' > {test_file} && cat {test_file} && rm {test_file}")
if "test" in stdout:
print(f" ✓ Write permissions confirmed")
print("\n5. Testing environment activation...")
conda_env = getattr(config, 'conda_env_name', None)
venv_path = getattr(config, 'virtualenv_path', None)
if conda_env:
stdout, stderr = executor._execute_command(f"conda activate {conda_env} && echo 'Conda environment activated'")
if "activated" in stdout:
print(f" ✓ Conda environment '{conda_env}' activated")
else:
print(f" ⚠ Conda environment '{conda_env}' activation failed")
elif venv_path:
stdout, stderr = executor._execute_command(f"source {venv_path}/bin/activate && echo 'Virtual environment activated'")
if "activated" in stdout:
print(f" ✓ Virtual environment '{venv_path}' activated")
else:
print(f" ⚠ Virtual environment '{venv_path}' activation failed")
else:
print(" - No environment activation configured")
executor.disconnect()
print("\n✓ All SSH tests completed successfully!")
print("\nYour SSH configuration is working correctly with Clustrix.")
except Exception as e:
print(f"\n✗ SSH connection test failed: {e}")
print("\nTroubleshooting suggestions:")
print("1. Check your SSH configuration")
print("2. Verify SSH key authentication")
print("3. Test manual SSH connection: ssh user@hostname")
print("4. Check firewall and network connectivity")
# Test SSH connection
print("SSH Connection Test:")
print("=" * 25)
test_ssh_connection()
Summary¶
This tutorial covered SSH-based remote execution with Clustrix:
SSH Configuration - Setting up Clustrix for direct SSH connections
Basic Remote Computation - Simple mathematical operations on remote servers
Data Processing - Generating and analyzing data remotely
File System Operations - Remote directory analysis and system information
Environment Testing - Validating remote Python environments
Configuration Management - Best practices and security guidelines
Connection Testing - Troubleshooting and verification tools
Key SSH Advantages:¶
Simplicity: No job scheduler complexity, direct execution
Flexibility: Works with any SSH-accessible server or workstation
Speed: Immediate execution without queue waiting times
Control: Direct access to remote file system and environment
Debugging: Easy to test and troubleshoot connection issues
Cost-Effective: Utilize existing servers without additional infrastructure
Best Practices:¶
Security: Always use SSH key authentication, never passwords in production
Key Management: Use unique, strong SSH keys with proper permissions
Environment Isolation: Use conda or virtual environments for package management
Resource Limits: Set max_parallel_jobs to prevent overwhelming the server
Cleanup: Enable cleanup_on_success to maintain clean remote directories
Monitoring: Regular checks of remote resource usage and access logs
Use Cases:¶
Development: Testing code on different environments
Workstations: Utilizing powerful desktop machines remotely
Cloud Instances: Running computations on cloud VMs
Legacy Systems: Accessing older servers without modern schedulers
Personal Computing: Home lab and personal server utilization
Prototyping: Quick testing before moving to larger cluster systems
When to Use SSH vs. Other Cluster Types:¶
Choose SSH when:
Working with single servers or small clusters
Need immediate execution without queuing
Prototyping or development work
Working with cloud instances or workstations
Choose schedulers (SLURM/PBS/SGE) when:
Working with large HPC clusters
Need resource management and fair sharing
Running production workloads with resource constraints
Require advanced scheduling features
Choose Kubernetes when:
Need containerized execution environments
Require auto-scaling and fault tolerance
Working with microservices or cloud-native applications
Need orchestration across multiple nodes
Next Steps:¶
Compare with SLURM Tutorial for HPC cluster computing
Explore Kubernetes Tutorial for containerized execution
Review SSH Setup Guide for detailed security configuration
Check the API Documentation for advanced decorator options
For more information, visit the Clustrix Documentation.