SSH Remote Execution Tutorial

Open In Colab

This tutorial demonstrates how to use Clustrix for simple SSH-based remote execution without a job scheduler. This is perfect for executing functions on remote servers, workstations, or cloud instances.

Prerequisites

  • SSH access to a remote server

  • SSH key-based authentication configured

  • Python installed on the remote server

  • Clustrix installed: pip install clustrix

[ ]:
# Install Clustrix (uncomment if needed)
# !pip install clustrix

import clustrix
from clustrix import cluster, configure
import numpy as np

SSH Configuration

Configure Clustrix for SSH-based remote execution:

[ ]:
# Configure for SSH remote execution
configure(
    cluster_type="ssh",
    cluster_host="your-server.example.com",    # Replace with your server
    username="your-username",                  # Replace with your username
    key_file="~/.ssh/id_rsa",                 # Path to SSH private key
    port=22,                                   # SSH port (default 22)

    # Remote environment
    remote_work_dir="/home/your-username/clustrix",  # Remote working directory
    python_executable="python3",              # Python command on remote server

    # Execution settings
    cleanup_on_success=True,                   # Clean up remote files after success
    max_parallel_jobs=5,                       # Limit concurrent executions

    # Optional: Remote environment setup
    conda_env_name="myenv",                   # Activate conda environment
    # or
    # virtualenv_path="/path/to/venv",        # Activate virtual environment
)

print("SSH remote execution configured successfully!")

Example 1: Basic Remote Computation

Execute a simple mathematical computation remotely:

[ ]:
@cluster
def basic_remote_computation(n=1000000):
    """
    Simple computation executed on remote server.
    """
    import math
    import time
    from datetime import datetime

    print(f"Starting computation on remote server at {datetime.now()}")
    print(f"Computing sum of squares for {n:,} numbers")

    start_time = time.time()

    # Compute sum of squares
    total = sum(i*i for i in range(n))

    # Compute some mathematical functions
    sqrt_total = math.sqrt(total)
    log_total = math.log(total)

    end_time = time.time()
    execution_time = end_time - start_time

    result = {
        'n': n,
        'sum_of_squares': total,
        'sqrt_sum': sqrt_total,
        'log_sum': log_total,
        'execution_time_seconds': execution_time,
        'completion_time': datetime.now().isoformat()
    }

    print(f"Computation completed in {execution_time:.2f} seconds")
    return result

# Execute on remote server
result = basic_remote_computation(500000)

print(f"\nREMOTE COMPUTATION COMPLETE")
print(f"Numbers processed: {result['n']:,}")
print(f"Sum of squares: {result['sum_of_squares']:,}")
print(f"Square root of sum: {result['sqrt_sum']:,.2f}")
print(f"Execution time: {result['execution_time_seconds']:.2f} seconds")

Example 2: Remote Data Processing

Process data files on a remote server:

[ ]:
@cluster
def remote_data_processing(data_size=100000, output_format="json"):
    """
    Generate and process data on remote server.
    """
    import json
    import csv
    import os
    import tempfile
    import random
    import statistics
    from datetime import datetime, timedelta

    print(f"Processing {data_size:,} data points on remote server")

    # Generate synthetic time-series data
    def generate_synthetic_data(size):
        data = []
        base_date = datetime(2023, 1, 1)

        for i in range(size):
            timestamp = base_date + timedelta(hours=i)

            # Generate synthetic metrics
            base_value = 100 + 20 * random.sin(2 * 3.14159 * i / 24)  # Daily pattern
            noise = random.gauss(0, 5)
            value = max(0, base_value + noise)

            data_point = {
                'timestamp': timestamp.isoformat(),
                'value': round(value, 2),
                'category': random.choice(['A', 'B', 'C']),
                'status': random.choice(['active', 'inactive']) if random.random() > 0.1 else 'error'
            }
            data.append(data_point)

        return data

    # Generate the dataset
    print("Generating synthetic dataset...")
    dataset = generate_synthetic_data(data_size)

    # Process the data
    print("Processing data...")

    # Basic statistics
    values = [point['value'] for point in dataset]

    stats = {
        'count': len(values),
        'mean': statistics.mean(values),
        'median': statistics.median(values),
        'stdev': statistics.stdev(values) if len(values) > 1 else 0,
        'min': min(values),
        'max': max(values)
    }

    # Category analysis
    category_counts = {}
    status_counts = {}

    for point in dataset:
        category = point['category']
        status = point['status']

        category_counts[category] = category_counts.get(category, 0) + 1
        status_counts[status] = status_counts.get(status, 0) + 1

    # Time-based analysis
    hourly_averages = {}
    for point in dataset:
        hour = datetime.fromisoformat(point['timestamp']).hour
        if hour not in hourly_averages:
            hourly_averages[hour] = []
        hourly_averages[hour].append(point['value'])

    # Calculate hourly means
    hourly_means = {hour: statistics.mean(values) for hour, values in hourly_averages.items()}
    peak_hour = max(hourly_means.keys(), key=lambda h: hourly_means[h])
    trough_hour = min(hourly_means.keys(), key=lambda h: hourly_means[h])

    # Anomaly detection (simple threshold-based)
    threshold = stats['mean'] + 2 * stats['stdev']
    anomalies = [point for point in dataset if point['value'] > threshold]

    # Error analysis
    error_points = [point for point in dataset if point['status'] == 'error']
    error_rate = len(error_points) / len(dataset) * 100

    # Save processed data to temporary file
    with tempfile.NamedTemporaryFile(mode='w', suffix=f'.{output_format}', delete=False) as f:
        output_file = f.name

        if output_format == 'json':
            json.dump(dataset, f, indent=2)
        elif output_format == 'csv':
            if dataset:
                writer = csv.DictWriter(f, fieldnames=dataset[0].keys())
                writer.writeheader()
                writer.writerows(dataset)

    file_size = os.path.getsize(output_file)

    # Cleanup
    os.unlink(output_file)

    processing_results = {
        'data_info': {
            'total_points': len(dataset),
            'output_format': output_format,
            'file_size_bytes': file_size
        },
        'basic_statistics': stats,
        'category_distribution': category_counts,
        'status_distribution': status_counts,
        'temporal_analysis': {
            'peak_hour': peak_hour,
            'trough_hour': trough_hour,
            'peak_average': hourly_means[peak_hour],
            'trough_average': hourly_means[trough_hour],
            'daily_variation': hourly_means[peak_hour] - hourly_means[trough_hour]
        },
        'quality_metrics': {
            'anomalies_detected': len(anomalies),
            'anomaly_rate_percent': len(anomalies) / len(dataset) * 100,
            'error_rate_percent': error_rate,
            'data_completeness': (len(dataset) - len(error_points)) / len(dataset) * 100
        },
        'processing_metadata': {
            'processed_on': datetime.now().isoformat(),
            'processing_location': 'remote_server'
        }
    }

    return processing_results

# Process data on remote server
data_results = remote_data_processing(data_size=50000, output_format="json")

print(f"\nREMOTE DATA PROCESSING COMPLETE")
data_info = data_results['data_info']
print(f"Data points processed: {data_info['total_points']:,}")
print(f"Output format: {data_info['output_format']}")
print(f"Generated file size: {data_info['file_size_bytes']:,} bytes")

stats = data_results['basic_statistics']
print(f"\nStatistics:")
print(f"  Mean: {stats['mean']:.2f}")
print(f"  Median: {stats['median']:.2f}")
print(f"  Std Dev: {stats['stdev']:.2f}")
print(f"  Range: {stats['min']:.2f} - {stats['max']:.2f}")

temporal = data_results['temporal_analysis']
print(f"\nTemporal Analysis:")
print(f"  Peak hour: {temporal['peak_hour']}:00 (avg: {temporal['peak_average']:.2f})")
print(f"  Trough hour: {temporal['trough_hour']}:00 (avg: {temporal['trough_average']:.2f})")
print(f"  Daily variation: {temporal['daily_variation']:.2f}")

quality = data_results['quality_metrics']
print(f"\nData Quality:")
print(f"  Anomalies: {quality['anomalies_detected']} ({quality['anomaly_rate_percent']:.2f}%)")
print(f"  Error rate: {quality['error_rate_percent']:.2f}%")
print(f"  Data completeness: {quality['data_completeness']:.1f}%")

Example 3: Remote File System Operations

Perform file system operations on the remote server:

[ ]:
@cluster
def remote_filesystem_analysis(directory_path="/tmp", max_depth=3):
    """
    Analyze file system structure on remote server.
    """
    import os
    import stat
    import pwd
    import grp
    import time
    from datetime import datetime
    from collections import defaultdict

    print(f"Analyzing directory: {directory_path} (max depth: {max_depth})")

    def get_file_info(file_path):
        """Get detailed file information"""
        try:
            stat_info = os.stat(file_path)

            # Get owner and group names
            try:
                owner = pwd.getpwuid(stat_info.st_uid).pw_name
            except KeyError:
                owner = str(stat_info.st_uid)

            try:
                group = grp.getgrgid(stat_info.st_gid).gr_name
            except KeyError:
                group = str(stat_info.st_gid)

            return {
                'size': stat_info.st_size,
                'owner': owner,
                'group': group,
                'permissions': oct(stat_info.st_mode)[-3:],
                'modified_time': datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
                'is_file': stat.S_ISREG(stat_info.st_mode),
                'is_dir': stat.S_ISDIR(stat_info.st_mode),
                'is_symlink': stat.S_ISLNK(stat_info.st_mode)
            }
        except (OSError, PermissionError) as e:
            return {'error': str(e)}

    def analyze_directory(path, current_depth=0):
        """Recursively analyze directory structure"""
        if current_depth > max_depth:
            return {}

        analysis = {
            'path': path,
            'depth': current_depth,
            'contents': [],
            'stats': {
                'total_files': 0,
                'total_dirs': 0,
                'total_size': 0,
                'file_types': defaultdict(int),
                'largest_files': [],
                'permission_stats': defaultdict(int)
            }
        }

        try:
            entries = os.listdir(path)
            entries.sort()  # Sort for consistent output

            # Limit entries to prevent overwhelming output
            if len(entries) > 100:
                entries = entries[:100]
                analysis['truncated'] = True

            for entry in entries:
                entry_path = os.path.join(path, entry)
                file_info = get_file_info(entry_path)

                if 'error' in file_info:
                    continue

                entry_data = {
                    'name': entry,
                    'path': entry_path,
                    **file_info
                }

                analysis['contents'].append(entry_data)

                # Update statistics
                if file_info['is_file']:
                    analysis['stats']['total_files'] += 1
                    analysis['stats']['total_size'] += file_info['size']

                    # File extension analysis
                    _, ext = os.path.splitext(entry)
                    if ext:
                        analysis['stats']['file_types'][ext.lower()] += 1
                    else:
                        analysis['stats']['file_types']['no_extension'] += 1

                    # Track largest files
                    analysis['stats']['largest_files'].append({
                        'name': entry,
                        'size': file_info['size'],
                        'path': entry_path
                    })

                elif file_info['is_dir']:
                    analysis['stats']['total_dirs'] += 1

                    # Recurse into subdirectory
                    if current_depth < max_depth:
                        subdir_analysis = analyze_directory(entry_path, current_depth + 1)
                        if subdir_analysis:
                            # Aggregate subdirectory stats
                            analysis['stats']['total_files'] += subdir_analysis['stats']['total_files']
                            analysis['stats']['total_dirs'] += subdir_analysis['stats']['total_dirs']
                            analysis['stats']['total_size'] += subdir_analysis['stats']['total_size']

                # Permission statistics
                analysis['stats']['permission_stats'][file_info['permissions']] += 1

            # Sort largest files
            analysis['stats']['largest_files'].sort(key=lambda x: x['size'], reverse=True)
            analysis['stats']['largest_files'] = analysis['stats']['largest_files'][:10]  # Top 10

            # Convert defaultdict to regular dict for JSON serialization
            analysis['stats']['file_types'] = dict(analysis['stats']['file_types'])
            analysis['stats']['permission_stats'] = dict(analysis['stats']['permission_stats'])

        except PermissionError:
            analysis['error'] = f"Permission denied accessing {path}"
        except OSError as e:
            analysis['error'] = f"OS error accessing {path}: {str(e)}"

        return analysis

    # Get system information
    def get_system_info():
        """Get basic system information"""
        import platform
        import shutil

        # Disk usage for the analyzed directory
        try:
            disk_usage = shutil.disk_usage(directory_path)
            disk_info = {
                'total_bytes': disk_usage.total,
                'used_bytes': disk_usage.used,
                'free_bytes': disk_usage.free,
                'used_percent': (disk_usage.used / disk_usage.total) * 100
            }
        except OSError:
            disk_info = {'error': 'Could not get disk usage'}

        return {
            'hostname': platform.node(),
            'system': platform.system(),
            'release': platform.release(),
            'machine': platform.machine(),
            'python_version': platform.python_version(),
            'disk_usage': disk_info,
            'current_user': os.environ.get('USER', 'unknown'),
            'home_directory': os.environ.get('HOME', 'unknown'),
            'working_directory': os.getcwd()
        }

    # Perform the analysis
    print("Starting filesystem analysis...")
    start_time = time.time()

    directory_analysis = analyze_directory(directory_path)
    system_info = get_system_info()

    end_time = time.time()
    analysis_time = end_time - start_time

    # Summary statistics
    summary = {
        'analysis_parameters': {
            'target_directory': directory_path,
            'max_depth': max_depth,
            'analysis_time_seconds': analysis_time
        },
        'directory_summary': directory_analysis['stats'] if 'stats' in directory_analysis else {},
        'system_information': system_info,
        'analysis_metadata': {
            'completed_at': datetime.now().isoformat(),
            'analysis_location': 'remote_server'
        }
    }

    # Include sample of directory contents (first 20 items)
    if 'contents' in directory_analysis:
        summary['sample_contents'] = directory_analysis['contents'][:20]

    print(f"Filesystem analysis completed in {analysis_time:.2f} seconds")

    return summary

# Analyze remote filesystem
fs_results = remote_filesystem_analysis(directory_path="/usr/local", max_depth=2)

print(f"\nREMOTE FILESYSTEM ANALYSIS COMPLETE")
params = fs_results['analysis_parameters']
print(f"Directory analyzed: {params['target_directory']}")
print(f"Max depth: {params['max_depth']}")
print(f"Analysis time: {params['analysis_time_seconds']:.2f} seconds")

if 'directory_summary' in fs_results and fs_results['directory_summary']:
    summary = fs_results['directory_summary']
    print(f"\nDirectory Summary:")
    print(f"  Total files: {summary.get('total_files', 0):,}")
    print(f"  Total directories: {summary.get('total_dirs', 0):,}")
    print(f"  Total size: {summary.get('total_size', 0):,} bytes")

    if 'largest_files' in summary and summary['largest_files']:
        print(f"\nLargest files:")
        for i, file_info in enumerate(summary['largest_files'][:5], 1):
            print(f"  {i}. {file_info['name']}: {file_info['size']:,} bytes")

    if 'file_types' in summary and summary['file_types']:
        print(f"\nFile types:")
        sorted_types = sorted(summary['file_types'].items(), key=lambda x: x[1], reverse=True)
        for ext, count in sorted_types[:5]:
            print(f"  {ext}: {count} files")

sys_info = fs_results['system_information']
print(f"\nSystem Information:")
print(f"  Hostname: {sys_info['hostname']}")
print(f"  OS: {sys_info['system']} {sys_info['release']}")
print(f"  Architecture: {sys_info['machine']}")
print(f"  Python: {sys_info['python_version']}")
print(f"  Current user: {sys_info['current_user']}")

if 'disk_usage' in sys_info and 'error' not in sys_info['disk_usage']:
    disk = sys_info['disk_usage']
    print(f"\nDisk Usage:")
    print(f"  Total: {disk['total_bytes']:,} bytes")
    print(f"  Used: {disk['used_bytes']:,} bytes ({disk['used_percent']:.1f}%)")
    print(f"  Free: {disk['free_bytes']:,} bytes")

Example 4: Remote Environment Testing

Test the remote Python environment and available packages:

[ ]:
@cluster
def test_remote_environment():
    """
    Test and report on the remote Python environment.
    """
    import sys
    import os
    import platform
    import subprocess
    import importlib
    from datetime import datetime

    print("Testing remote Python environment...")

    # Basic Python information
    python_info = {
        'version': sys.version,
        'version_info': {
            'major': sys.version_info.major,
            'minor': sys.version_info.minor,
            'micro': sys.version_info.micro
        },
        'executable': sys.executable,
        'platform': sys.platform,
        'path': sys.path[:5],  # First 5 path entries
    }

    # System information
    system_info = {
        'hostname': platform.node(),
        'system': platform.system(),
        'release': platform.release(),
        'version': platform.version(),
        'machine': platform.machine(),
        'processor': platform.processor()
    }

    # Environment variables (selected)
    env_vars = {
        'USER': os.environ.get('USER', 'not_set'),
        'HOME': os.environ.get('HOME', 'not_set'),
        'PATH': os.environ.get('PATH', 'not_set')[:200] + '...',  # Truncate PATH
        'SHELL': os.environ.get('SHELL', 'not_set'),
        'LANG': os.environ.get('LANG', 'not_set'),
        'PWD': os.environ.get('PWD', 'not_set'),
        'PYTHONPATH': os.environ.get('PYTHONPATH', 'not_set')
    }

    # Test common packages
    common_packages = [
        'numpy', 'pandas', 'scipy', 'matplotlib', 'sklearn', 'requests',
        'flask', 'django', 'pytest', 'jupyter', 'ipython', 'click',
        'yaml', 'json', 'csv', 'sqlite3', 'pickle', 'datetime',
        'os', 'sys', 'subprocess', 'threading', 'multiprocessing'
    ]

    package_status = {}

    for package in common_packages:
        try:
            module = importlib.import_module(package)
            version = getattr(module, '__version__', 'unknown')
            package_status[package] = {
                'available': True,
                'version': version,
                'location': getattr(module, '__file__', 'built-in')
            }
        except ImportError:
            package_status[package] = {
                'available': False,
                'error': 'Not installed'
            }
        except Exception as e:
            package_status[package] = {
                'available': False,
                'error': str(e)
            }

    # Get pip list (if available)
    pip_packages = []
    try:
        result = subprocess.run(
            [sys.executable, '-m', 'pip', 'list', '--format=freeze'],
            capture_output=True,
            text=True,
            timeout=30
        )
        if result.returncode == 0:
            pip_packages = result.stdout.strip().split('\n')[:20]  # First 20 packages
    except Exception as e:
        pip_packages = [f"Error getting pip list: {str(e)}"]

    # Test basic functionality
    functionality_tests = {}

    # Test file I/O
    try:
        import tempfile
        with tempfile.NamedTemporaryFile(mode='w', delete=True) as f:
            f.write("test")
            f.flush()
        functionality_tests['file_io'] = {'status': 'ok', 'message': 'File I/O working'}
    except Exception as e:
        functionality_tests['file_io'] = {'status': 'error', 'message': str(e)}

    # Test networking
    try:
        import socket
        socket.gethostname()
        functionality_tests['networking'] = {'status': 'ok', 'message': 'Basic networking working'}
    except Exception as e:
        functionality_tests['networking'] = {'status': 'error', 'message': str(e)}

    # Test multiprocessing
    try:
        import multiprocessing
        cpu_count = multiprocessing.cpu_count()
        functionality_tests['multiprocessing'] = {
            'status': 'ok',
            'message': f'Multiprocessing available, {cpu_count} CPUs detected'
        }
    except Exception as e:
        functionality_tests['multiprocessing'] = {'status': 'error', 'message': str(e)}

    # Test numerical computing
    numerical_test = {'status': 'ok', 'tests': []}
    try:
        # Basic math
        import math
        result = math.sqrt(16)
        numerical_test['tests'].append(f"math.sqrt(16) = {result}")

        # NumPy if available
        if package_status['numpy']['available']:
            import numpy as np
            arr = np.array([1, 2, 3, 4, 5])
            mean_val = np.mean(arr)
            numerical_test['tests'].append(f"numpy.mean([1,2,3,4,5]) = {mean_val}")

        functionality_tests['numerical_computing'] = numerical_test
    except Exception as e:
        functionality_tests['numerical_computing'] = {'status': 'error', 'message': str(e)}

    # Performance test
    performance_test = {}
    try:
        import time
        start_time = time.time()

        # Simple computation benchmark
        total = sum(i*i for i in range(100000))

        end_time = time.time()
        computation_time = end_time - start_time

        performance_test = {
            'computation_time_seconds': computation_time,
            'result': total,
            'operations_per_second': 100000 / computation_time if computation_time > 0 else 0
        }
    except Exception as e:
        performance_test = {'error': str(e)}

    environment_report = {
        'test_metadata': {
            'test_timestamp': datetime.now().isoformat(),
            'test_location': 'remote_server'
        },
        'python_information': python_info,
        'system_information': system_info,
        'environment_variables': env_vars,
        'package_availability': {
            'total_tested': len(common_packages),
            'available': sum(1 for pkg in package_status.values() if pkg.get('available', False)),
            'unavailable': sum(1 for pkg in package_status.values() if not pkg.get('available', False)),
            'details': package_status
        },
        'installed_packages_sample': pip_packages,
        'functionality_tests': functionality_tests,
        'performance_benchmark': performance_test
    }

    print("Remote environment testing completed")

    return environment_report

# Test remote environment
env_results = test_remote_environment()

print(f"\nREMOTE ENVIRONMENT TEST COMPLETE")
python_info = env_results['python_information']
print(f"Python version: {python_info['version_info']['major']}.{python_info['version_info']['minor']}.{python_info['version_info']['micro']}")
print(f"Python executable: {python_info['executable']}")

system_info = env_results['system_information']
print(f"\nSystem: {system_info['system']} {system_info['release']}")
print(f"Hostname: {system_info['hostname']}")
print(f"Architecture: {system_info['machine']}")

pkg_info = env_results['package_availability']
print(f"\nPackage Availability:")
print(f"  Available: {pkg_info['available']}/{pkg_info['total_tested']}")
print(f"  Unavailable: {pkg_info['unavailable']}/{pkg_info['total_tested']}")

# Show some available packages
available_packages = [name for name, info in pkg_info['details'].items() if info.get('available', False)]
print(f"  Key packages available: {', '.join(available_packages[:10])}")

func_tests = env_results['functionality_tests']
print(f"\nFunctionality Tests:")
for test_name, test_result in func_tests.items():
    status = test_result.get('status', 'unknown')
    message = test_result.get('message', 'No message')
    print(f"  {test_name}: {status.upper()} - {message}")

if 'error' not in env_results['performance_benchmark']:
    perf = env_results['performance_benchmark']
    print(f"\nPerformance Benchmark:")
    print(f"  Computation time: {perf['computation_time_seconds']:.4f} seconds")
    print(f"  Operations per second: {perf['operations_per_second']:,.0f}")

SSH Configuration Management

Best practices for SSH configuration with Clustrix:

[ ]:
def ssh_configuration_examples():
    """
    Examples of different SSH configuration patterns.
    """

    configurations = {
        'basic_ssh': {
            'description': 'Basic SSH connection with key authentication',
            'config': {
                'cluster_type': 'ssh',
                'cluster_host': 'server.example.com',
                'username': 'user',
                'key_file': '~/.ssh/id_rsa',
                'port': 22
            },
            'use_case': 'Single remote server or workstation'
        },
        'custom_port': {
            'description': 'SSH connection with custom port',
            'config': {
                'cluster_type': 'ssh',
                'cluster_host': 'secure-server.example.com',
                'username': 'admin',
                'key_file': '~/.ssh/admin_key',
                'port': 2222
            },
            'use_case': 'Servers with non-standard SSH ports'
        },
        'password_auth': {
            'description': 'SSH with password authentication (less secure)',
            'config': {
                'cluster_type': 'ssh',
                'cluster_host': 'legacy-server.example.com',
                'username': 'olduser',
                'password': 'your-password'  # Not recommended for production
            },
            'use_case': 'Legacy systems without key-based auth'
        },
        'conda_environment': {
            'description': 'SSH with conda environment activation',
            'config': {
                'cluster_type': 'ssh',
                'cluster_host': 'ml-server.example.com',
                'username': 'researcher',
                'key_file': '~/.ssh/research_key',
                'conda_env_name': 'pytorch',
                'python_executable': 'python'
            },
            'use_case': 'Servers with conda environments for specific packages'
        },
        'virtual_environment': {
            'description': 'SSH with Python virtual environment',
            'config': {
                'cluster_type': 'ssh',
                'cluster_host': 'dev-server.example.com',
                'username': 'developer',
                'key_file': '~/.ssh/dev_key',
                'virtualenv_path': '/home/developer/venv/myproject',
                'python_executable': 'python3'
            },
            'use_case': 'Development servers with project-specific environments'
        },
        'cloud_instance': {
            'description': 'Cloud instance with specific configuration',
            'config': {
                'cluster_type': 'ssh',
                'cluster_host': 'ec2-12-34-56-78.compute-1.amazonaws.com',
                'username': 'ubuntu',  # Common for Ubuntu AMIs
                'key_file': '~/.ssh/aws-keypair.pem',
                'remote_work_dir': '/tmp/clustrix',
                'cleanup_on_success': True
            },
            'use_case': 'AWS, GCP, or Azure cloud instances'
        }
    }

    print("SSH Configuration Examples:")
    print("=" * 50)

    for config_name, config_info in configurations.items():
        print(f"\n{config_name.upper().replace('_', ' ')}:")
        print(f"  Description: {config_info['description']}")
        print(f"  Use case: {config_info['use_case']}")
        print(f"  Configuration:")
        for key, value in config_info['config'].items():
            print(f"    {key}: {value}")

    return configurations

def ssh_security_best_practices():
    """
    SSH security best practices for Clustrix.
    """

    practices = {
        'key_management': {
            'title': 'SSH Key Management',
            'practices': [
                'Use strong, unique SSH keys for each server/project',
                'Prefer Ed25519 keys: ssh-keygen -t ed25519 -f ~/.ssh/clustrix_key',
                'Use RSA 4096-bit keys if Ed25519 not supported: ssh-keygen -t rsa -b 4096',
                'Protect private keys with strong passphrases',
                'Set proper permissions: chmod 600 ~/.ssh/private_key',
                'Regularly rotate keys (every 6-12 months)'
            ]
        },
        'connection_security': {
            'title': 'Connection Security',
            'practices': [
                'Always use key-based authentication, avoid passwords',
                'Disable SSH agent forwarding unless necessary',
                'Use SSH config files for consistent settings',
                'Enable SSH connection multiplexing for efficiency',
                'Set reasonable connection timeouts',
                'Use non-standard SSH ports when possible'
            ]
        },
        'server_hardening': {
            'title': 'Server-Side Security',
            'practices': [
                'Disable SSH password authentication',
                'Disable SSH root login',
                'Use fail2ban or similar intrusion prevention',
                'Regularly update SSH server software',
                'Monitor SSH access logs',
                'Use firewall rules to restrict SSH access'
            ]
        },
        'clustrix_specific': {
            'title': 'Clustrix-Specific Security',
            'practices': [
                'Use dedicated SSH keys for Clustrix operations',
                'Restrict remote work directories to user-specific paths',
                'Enable cleanup_on_success to remove temporary files',
                'Limit max_parallel_jobs to prevent resource exhaustion',
                'Monitor remote execution logs for anomalies',
                'Use isolated Python environments for execution'
            ]
        }
    }

    print("\nSSH Security Best Practices:")
    print("=" * 40)

    for category, info in practices.items():
        print(f"\n{info['title']}:")
        for i, practice in enumerate(info['practices'], 1):
            print(f"  {i}. {practice}")

    return practices

def ssh_troubleshooting_guide():
    """
    Common SSH issues and solutions.
    """

    issues = {
        'connection_refused': {
            'problem': 'Connection refused or timeout',
            'solutions': [
                'Check if SSH service is running on remote server',
                'Verify correct hostname/IP address',
                'Check if firewall is blocking SSH port',
                'Confirm SSH port number (default 22)',
                'Test connection with: ssh -v user@hostname'
            ]
        },
        'permission_denied': {
            'problem': 'Permission denied (publickey)',
            'solutions': [
                'Verify SSH key is added to remote ~/.ssh/authorized_keys',
                'Check SSH key permissions (600 for private key)',
                'Ensure correct username for the server',
                'Verify key file path in Clustrix configuration',
                'Test key with: ssh -i ~/.ssh/key_file user@hostname'
            ]
        },
        'host_key_verification': {
            'problem': 'Host key verification failed',
            'solutions': [
                'Add host to known_hosts: ssh-keyscan hostname >> ~/.ssh/known_hosts',
                'Remove old host key: ssh-keygen -R hostname',
                'Connect manually first to accept host key',
                'Check if hostname/IP changed',
                'Verify server authenticity before accepting'
            ]
        },
        'python_not_found': {
            'problem': 'Python executable not found on remote server',
            'solutions': [
                'Specify correct python_executable in configuration',
                'Check if Python is installed: which python3',
                'Add Python to PATH on remote server',
                'Use full path: /usr/bin/python3',
                'Install Python if missing'
            ]
        },
        'environment_issues': {
            'problem': 'Python environment or package issues',
            'solutions': [
                'Verify conda/virtualenv paths are correct',
                'Check environment activation commands',
                'Install missing packages in remote environment',
                'Use pip install --user for user-level packages',
                'Test environment manually: ssh user@host "source env/bin/activate && python"'
            ]
        }
    }

    print("\nSSH Troubleshooting Guide:")
    print("=" * 30)

    for issue_name, issue_info in issues.items():
        print(f"\n{issue_info['problem']}:")
        for i, solution in enumerate(issue_info['solutions'], 1):
            print(f"  {i}. {solution}")

    return issues

# Display all SSH guidance
ssh_configs = ssh_configuration_examples()
security_practices = ssh_security_best_practices()
troubleshooting = ssh_troubleshooting_guide()

SSH Connection Testing

[ ]:
def test_ssh_connection():
    """
    Test SSH connection and basic functionality.
    """
    from clustrix import ClusterExecutor

    try:
        # Get current configuration
        config = clustrix.get_config()

        if config.cluster_type != 'ssh':
            print("Current configuration is not for SSH. Please configure for SSH first.")
            return

        print("Testing SSH connection...")
        print(f"Host: {config.cluster_host}")
        print(f"Username: {config.username}")
        print(f"Port: {getattr(config, 'port', 22)}")

        # Create executor and test connection
        executor = ClusterExecutor(config)

        print("\n1. Testing SSH connection...")
        executor.connect()
        print("   ✓ SSH connection successful")

        print("\n2. Testing basic commands...")

        # Test hostname
        stdout, stderr = executor._execute_command("hostname")
        if stdout:
            print(f"   ✓ Remote hostname: {stdout.strip()}")

        # Test whoami
        stdout, stderr = executor._execute_command("whoami")
        if stdout:
            print(f"   ✓ Remote user: {stdout.strip()}")

        # Test pwd
        stdout, stderr = executor._execute_command("pwd")
        if stdout:
            print(f"   ✓ Remote working directory: {stdout.strip()}")

        print("\n3. Testing Python availability...")

        # Test Python
        python_cmd = getattr(config, 'python_executable', 'python3')
        stdout, stderr = executor._execute_command(f"{python_cmd} --version")
        if stdout:
            print(f"   ✓ Python version: {stdout.strip()}")
        elif stderr:
            print(f"   ✓ Python version: {stderr.strip()}")

        # Test Python path
        stdout, stderr = executor._execute_command(f"which {python_cmd}")
        if stdout:
            print(f"   ✓ Python executable: {stdout.strip()}")

        print("\n4. Testing remote work directory...")

        # Test work directory creation
        work_dir = getattr(config, 'remote_work_dir', '/tmp/clustrix')
        stdout, stderr = executor._execute_command(f"mkdir -p {work_dir} && echo 'Directory created'")
        if "Directory created" in stdout:
            print(f"   ✓ Work directory accessible: {work_dir}")

        # Test write permissions
        test_file = f"{work_dir}/test_file.txt"
        stdout, stderr = executor._execute_command(f"echo 'test' > {test_file} && cat {test_file} && rm {test_file}")
        if "test" in stdout:
            print(f"   ✓ Write permissions confirmed")

        print("\n5. Testing environment activation...")

        conda_env = getattr(config, 'conda_env_name', None)
        venv_path = getattr(config, 'virtualenv_path', None)

        if conda_env:
            stdout, stderr = executor._execute_command(f"conda activate {conda_env} && echo 'Conda environment activated'")
            if "activated" in stdout:
                print(f"   ✓ Conda environment '{conda_env}' activated")
            else:
                print(f"   ⚠ Conda environment '{conda_env}' activation failed")

        elif venv_path:
            stdout, stderr = executor._execute_command(f"source {venv_path}/bin/activate && echo 'Virtual environment activated'")
            if "activated" in stdout:
                print(f"   ✓ Virtual environment '{venv_path}' activated")
            else:
                print(f"   ⚠ Virtual environment '{venv_path}' activation failed")

        else:
            print("   - No environment activation configured")

        executor.disconnect()
        print("\n✓ All SSH tests completed successfully!")
        print("\nYour SSH configuration is working correctly with Clustrix.")

    except Exception as e:
        print(f"\n✗ SSH connection test failed: {e}")
        print("\nTroubleshooting suggestions:")
        print("1. Check your SSH configuration")
        print("2. Verify SSH key authentication")
        print("3. Test manual SSH connection: ssh user@hostname")
        print("4. Check firewall and network connectivity")

# Test SSH connection
print("SSH Connection Test:")
print("=" * 25)
test_ssh_connection()

Summary

This tutorial covered SSH-based remote execution with Clustrix:

  1. SSH Configuration - Setting up Clustrix for direct SSH connections

  2. Basic Remote Computation - Simple mathematical operations on remote servers

  3. Data Processing - Generating and analyzing data remotely

  4. File System Operations - Remote directory analysis and system information

  5. Environment Testing - Validating remote Python environments

  6. Configuration Management - Best practices and security guidelines

  7. Connection Testing - Troubleshooting and verification tools

Key SSH Advantages:

  • Simplicity: No job scheduler complexity, direct execution

  • Flexibility: Works with any SSH-accessible server or workstation

  • Speed: Immediate execution without queue waiting times

  • Control: Direct access to remote file system and environment

  • Debugging: Easy to test and troubleshoot connection issues

  • Cost-Effective: Utilize existing servers without additional infrastructure

Best Practices:

  • Security: Always use SSH key authentication, never passwords in production

  • Key Management: Use unique, strong SSH keys with proper permissions

  • Environment Isolation: Use conda or virtual environments for package management

  • Resource Limits: Set max_parallel_jobs to prevent overwhelming the server

  • Cleanup: Enable cleanup_on_success to maintain clean remote directories

  • Monitoring: Regular checks of remote resource usage and access logs

Use Cases:

  • Development: Testing code on different environments

  • Workstations: Utilizing powerful desktop machines remotely

  • Cloud Instances: Running computations on cloud VMs

  • Legacy Systems: Accessing older servers without modern schedulers

  • Personal Computing: Home lab and personal server utilization

  • Prototyping: Quick testing before moving to larger cluster systems

When to Use SSH vs. Other Cluster Types:

Choose SSH when:

  • Working with single servers or small clusters

  • Need immediate execution without queuing

  • Prototyping or development work

  • Working with cloud instances or workstations

Choose schedulers (SLURM/PBS/SGE) when:

  • Working with large HPC clusters

  • Need resource management and fair sharing

  • Running production workloads with resource constraints

  • Require advanced scheduling features

Choose Kubernetes when:

  • Need containerized execution environments

  • Require auto-scaling and fault tolerance

  • Working with microservices or cloud-native applications

  • Need orchestration across multiple nodes

Next Steps:

For more information, visit the Clustrix Documentation.