{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# SSH Remote Execution Tutorial\n", "\n", "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/ContextLab/clustrix/blob/master/docs/notebooks/ssh_tutorial.ipynb)\n", "\n", "This tutorial demonstrates how to use Clustrix for simple SSH-based remote execution without a job scheduler. This is perfect for executing functions on remote servers, workstations, or cloud instances.\n", "\n", "## Prerequisites\n", "\n", "- SSH access to a remote server\n", "- SSH key-based authentication configured\n", "- Python installed on the remote server\n", "- Clustrix installed: `pip install clustrix`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Install Clustrix (uncomment if needed)\n", "# !pip install clustrix\n", "\n", "import clustrix\n", "from clustrix import cluster, configure\n", "import numpy as np" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SSH Configuration\n", "\n", "Configure Clustrix for SSH-based remote execution:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Configure for SSH remote execution\n", "configure(\n", " cluster_type=\"ssh\",\n", " cluster_host=\"your-server.example.com\", # Replace with your server\n", " username=\"your-username\", # Replace with your username\n", " key_file=\"~/.ssh/id_rsa\", # Path to SSH private key\n", " port=22, # SSH port (default 22)\n", " \n", " # Remote environment\n", " remote_work_dir=\"/home/your-username/clustrix\", # Remote working directory\n", " python_executable=\"python3\", # Python command on remote server\n", " \n", " # Execution settings\n", " cleanup_on_success=True, # Clean up remote files after success\n", " max_parallel_jobs=5, # Limit concurrent executions\n", " \n", " # Optional: Remote environment setup\n", " conda_env_name=\"myenv\", # Activate conda environment\n", " # or\n", " # virtualenv_path=\"/path/to/venv\", # Activate virtual environment\n", ")\n", "\n", "print(\"SSH remote execution configured successfully!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example 1: Basic Remote Computation\n", "\n", "Execute a simple mathematical computation remotely:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@cluster\n", "def basic_remote_computation(n=1000000):\n", " \"\"\"\n", " Simple computation executed on remote server.\n", " \"\"\"\n", " import math\n", " import time\n", " from datetime import datetime\n", " \n", " print(f\"Starting computation on remote server at {datetime.now()}\")\n", " print(f\"Computing sum of squares for {n:,} numbers\")\n", " \n", " start_time = time.time()\n", " \n", " # Compute sum of squares\n", " total = sum(i*i for i in range(n))\n", " \n", " # Compute some mathematical functions\n", " sqrt_total = math.sqrt(total)\n", " log_total = math.log(total)\n", " \n", " end_time = time.time()\n", " execution_time = end_time - start_time\n", " \n", " result = {\n", " 'n': n,\n", " 'sum_of_squares': total,\n", " 'sqrt_sum': sqrt_total,\n", " 'log_sum': log_total,\n", " 'execution_time_seconds': execution_time,\n", " 'completion_time': datetime.now().isoformat()\n", " }\n", " \n", " print(f\"Computation completed in {execution_time:.2f} seconds\")\n", " return result\n", "\n", "# Execute on remote server\n", "result = basic_remote_computation(500000)\n", "\n", "print(f\"\\nREMOTE COMPUTATION COMPLETE\")\n", "print(f\"Numbers processed: {result['n']:,}\")\n", "print(f\"Sum of squares: {result['sum_of_squares']:,}\")\n", "print(f\"Square root of sum: {result['sqrt_sum']:,.2f}\")\n", "print(f\"Execution time: {result['execution_time_seconds']:.2f} seconds\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example 2: Remote Data Processing\n", "\n", "Process data files on a remote server:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@cluster\n", "def remote_data_processing(data_size=100000, output_format=\"json\"):\n", " \"\"\"\n", " Generate and process data on remote server.\n", " \"\"\"\n", " import json\n", " import csv\n", " import os\n", " import tempfile\n", " import random\n", " import statistics\n", " from datetime import datetime, timedelta\n", " \n", " print(f\"Processing {data_size:,} data points on remote server\")\n", " \n", " # Generate synthetic time-series data\n", " def generate_synthetic_data(size):\n", " data = []\n", " base_date = datetime(2023, 1, 1)\n", " \n", " for i in range(size):\n", " timestamp = base_date + timedelta(hours=i)\n", " \n", " # Generate synthetic metrics\n", " base_value = 100 + 20 * random.sin(2 * 3.14159 * i / 24) # Daily pattern\n", " noise = random.gauss(0, 5)\n", " value = max(0, base_value + noise)\n", " \n", " data_point = {\n", " 'timestamp': timestamp.isoformat(),\n", " 'value': round(value, 2),\n", " 'category': random.choice(['A', 'B', 'C']),\n", " 'status': random.choice(['active', 'inactive']) if random.random() > 0.1 else 'error'\n", " }\n", " data.append(data_point)\n", " \n", " return data\n", " \n", " # Generate the dataset\n", " print(\"Generating synthetic dataset...\")\n", " dataset = generate_synthetic_data(data_size)\n", " \n", " # Process the data\n", " print(\"Processing data...\")\n", " \n", " # Basic statistics\n", " values = [point['value'] for point in dataset]\n", " \n", " stats = {\n", " 'count': len(values),\n", " 'mean': statistics.mean(values),\n", " 'median': statistics.median(values),\n", " 'stdev': statistics.stdev(values) if len(values) > 1 else 0,\n", " 'min': min(values),\n", " 'max': max(values)\n", " }\n", " \n", " # Category analysis\n", " category_counts = {}\n", " status_counts = {}\n", " \n", " for point in dataset:\n", " category = point['category']\n", " status = point['status']\n", " \n", " category_counts[category] = category_counts.get(category, 0) + 1\n", " status_counts[status] = status_counts.get(status, 0) + 1\n", " \n", " # Time-based analysis\n", " hourly_averages = {}\n", " for point in dataset:\n", " hour = datetime.fromisoformat(point['timestamp']).hour\n", " if hour not in hourly_averages:\n", " hourly_averages[hour] = []\n", " hourly_averages[hour].append(point['value'])\n", " \n", " # Calculate hourly means\n", " hourly_means = {hour: statistics.mean(values) for hour, values in hourly_averages.items()}\n", " peak_hour = max(hourly_means.keys(), key=lambda h: hourly_means[h])\n", " trough_hour = min(hourly_means.keys(), key=lambda h: hourly_means[h])\n", " \n", " # Anomaly detection (simple threshold-based)\n", " threshold = stats['mean'] + 2 * stats['stdev']\n", " anomalies = [point for point in dataset if point['value'] > threshold]\n", " \n", " # Error analysis\n", " error_points = [point for point in dataset if point['status'] == 'error']\n", " error_rate = len(error_points) / len(dataset) * 100\n", " \n", " # Save processed data to temporary file\n", " with tempfile.NamedTemporaryFile(mode='w', suffix=f'.{output_format}', delete=False) as f:\n", " output_file = f.name\n", " \n", " if output_format == 'json':\n", " json.dump(dataset, f, indent=2)\n", " elif output_format == 'csv':\n", " if dataset:\n", " writer = csv.DictWriter(f, fieldnames=dataset[0].keys())\n", " writer.writeheader()\n", " writer.writerows(dataset)\n", " \n", " file_size = os.path.getsize(output_file)\n", " \n", " # Cleanup\n", " os.unlink(output_file)\n", " \n", " processing_results = {\n", " 'data_info': {\n", " 'total_points': len(dataset),\n", " 'output_format': output_format,\n", " 'file_size_bytes': file_size\n", " },\n", " 'basic_statistics': stats,\n", " 'category_distribution': category_counts,\n", " 'status_distribution': status_counts,\n", " 'temporal_analysis': {\n", " 'peak_hour': peak_hour,\n", " 'trough_hour': trough_hour,\n", " 'peak_average': hourly_means[peak_hour],\n", " 'trough_average': hourly_means[trough_hour],\n", " 'daily_variation': hourly_means[peak_hour] - hourly_means[trough_hour]\n", " },\n", " 'quality_metrics': {\n", " 'anomalies_detected': len(anomalies),\n", " 'anomaly_rate_percent': len(anomalies) / len(dataset) * 100,\n", " 'error_rate_percent': error_rate,\n", " 'data_completeness': (len(dataset) - len(error_points)) / len(dataset) * 100\n", " },\n", " 'processing_metadata': {\n", " 'processed_on': datetime.now().isoformat(),\n", " 'processing_location': 'remote_server'\n", " }\n", " }\n", " \n", " return processing_results\n", "\n", "# Process data on remote server\n", "data_results = remote_data_processing(data_size=50000, output_format=\"json\")\n", "\n", "print(f\"\\nREMOTE DATA PROCESSING COMPLETE\")\n", "data_info = data_results['data_info']\n", "print(f\"Data points processed: {data_info['total_points']:,}\")\n", "print(f\"Output format: {data_info['output_format']}\")\n", "print(f\"Generated file size: {data_info['file_size_bytes']:,} bytes\")\n", "\n", "stats = data_results['basic_statistics']\n", "print(f\"\\nStatistics:\")\n", "print(f\" Mean: {stats['mean']:.2f}\")\n", "print(f\" Median: {stats['median']:.2f}\")\n", "print(f\" Std Dev: {stats['stdev']:.2f}\")\n", "print(f\" Range: {stats['min']:.2f} - {stats['max']:.2f}\")\n", "\n", "temporal = data_results['temporal_analysis']\n", "print(f\"\\nTemporal Analysis:\")\n", "print(f\" Peak hour: {temporal['peak_hour']}:00 (avg: {temporal['peak_average']:.2f})\")\n", "print(f\" Trough hour: {temporal['trough_hour']}:00 (avg: {temporal['trough_average']:.2f})\")\n", "print(f\" Daily variation: {temporal['daily_variation']:.2f}\")\n", "\n", "quality = data_results['quality_metrics']\n", "print(f\"\\nData Quality:\")\n", "print(f\" Anomalies: {quality['anomalies_detected']} ({quality['anomaly_rate_percent']:.2f}%)\")\n", "print(f\" Error rate: {quality['error_rate_percent']:.2f}%\")\n", "print(f\" Data completeness: {quality['data_completeness']:.1f}%\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example 3: Remote File System Operations\n", "\n", "Perform file system operations on the remote server:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@cluster\n", "def remote_filesystem_analysis(directory_path=\"/tmp\", max_depth=3):\n", " \"\"\"\n", " Analyze file system structure on remote server.\n", " \"\"\"\n", " import os\n", " import stat\n", " import pwd\n", " import grp\n", " import time\n", " from datetime import datetime\n", " from collections import defaultdict\n", " \n", " print(f\"Analyzing directory: {directory_path} (max depth: {max_depth})\")\n", " \n", " def get_file_info(file_path):\n", " \"\"\"Get detailed file information\"\"\"\n", " try:\n", " stat_info = os.stat(file_path)\n", " \n", " # Get owner and group names\n", " try:\n", " owner = pwd.getpwuid(stat_info.st_uid).pw_name\n", " except KeyError:\n", " owner = str(stat_info.st_uid)\n", " \n", " try:\n", " group = grp.getgrgid(stat_info.st_gid).gr_name\n", " except KeyError:\n", " group = str(stat_info.st_gid)\n", " \n", " return {\n", " 'size': stat_info.st_size,\n", " 'owner': owner,\n", " 'group': group,\n", " 'permissions': oct(stat_info.st_mode)[-3:],\n", " 'modified_time': datetime.fromtimestamp(stat_info.st_mtime).isoformat(),\n", " 'is_file': stat.S_ISREG(stat_info.st_mode),\n", " 'is_dir': stat.S_ISDIR(stat_info.st_mode),\n", " 'is_symlink': stat.S_ISLNK(stat_info.st_mode)\n", " }\n", " except (OSError, PermissionError) as e:\n", " return {'error': str(e)}\n", " \n", " def analyze_directory(path, current_depth=0):\n", " \"\"\"Recursively analyze directory structure\"\"\"\n", " if current_depth > max_depth:\n", " return {}\n", " \n", " analysis = {\n", " 'path': path,\n", " 'depth': current_depth,\n", " 'contents': [],\n", " 'stats': {\n", " 'total_files': 0,\n", " 'total_dirs': 0,\n", " 'total_size': 0,\n", " 'file_types': defaultdict(int),\n", " 'largest_files': [],\n", " 'permission_stats': defaultdict(int)\n", " }\n", " }\n", " \n", " try:\n", " entries = os.listdir(path)\n", " entries.sort() # Sort for consistent output\n", " \n", " # Limit entries to prevent overwhelming output\n", " if len(entries) > 100:\n", " entries = entries[:100]\n", " analysis['truncated'] = True\n", " \n", " for entry in entries:\n", " entry_path = os.path.join(path, entry)\n", " file_info = get_file_info(entry_path)\n", " \n", " if 'error' in file_info:\n", " continue\n", " \n", " entry_data = {\n", " 'name': entry,\n", " 'path': entry_path,\n", " **file_info\n", " }\n", " \n", " analysis['contents'].append(entry_data)\n", " \n", " # Update statistics\n", " if file_info['is_file']:\n", " analysis['stats']['total_files'] += 1\n", " analysis['stats']['total_size'] += file_info['size']\n", " \n", " # File extension analysis\n", " _, ext = os.path.splitext(entry)\n", " if ext:\n", " analysis['stats']['file_types'][ext.lower()] += 1\n", " else:\n", " analysis['stats']['file_types']['no_extension'] += 1\n", " \n", " # Track largest files\n", " analysis['stats']['largest_files'].append({\n", " 'name': entry,\n", " 'size': file_info['size'],\n", " 'path': entry_path\n", " })\n", " \n", " elif file_info['is_dir']:\n", " analysis['stats']['total_dirs'] += 1\n", " \n", " # Recurse into subdirectory\n", " if current_depth < max_depth:\n", " subdir_analysis = analyze_directory(entry_path, current_depth + 1)\n", " if subdir_analysis:\n", " # Aggregate subdirectory stats\n", " analysis['stats']['total_files'] += subdir_analysis['stats']['total_files']\n", " analysis['stats']['total_dirs'] += subdir_analysis['stats']['total_dirs']\n", " analysis['stats']['total_size'] += subdir_analysis['stats']['total_size']\n", " \n", " # Permission statistics\n", " analysis['stats']['permission_stats'][file_info['permissions']] += 1\n", " \n", " # Sort largest files\n", " analysis['stats']['largest_files'].sort(key=lambda x: x['size'], reverse=True)\n", " analysis['stats']['largest_files'] = analysis['stats']['largest_files'][:10] # Top 10\n", " \n", " # Convert defaultdict to regular dict for JSON serialization\n", " analysis['stats']['file_types'] = dict(analysis['stats']['file_types'])\n", " analysis['stats']['permission_stats'] = dict(analysis['stats']['permission_stats'])\n", " \n", " except PermissionError:\n", " analysis['error'] = f\"Permission denied accessing {path}\"\n", " except OSError as e:\n", " analysis['error'] = f\"OS error accessing {path}: {str(e)}\"\n", " \n", " return analysis\n", " \n", " # Get system information\n", " def get_system_info():\n", " \"\"\"Get basic system information\"\"\"\n", " import platform\n", " import shutil\n", " \n", " # Disk usage for the analyzed directory\n", " try:\n", " disk_usage = shutil.disk_usage(directory_path)\n", " disk_info = {\n", " 'total_bytes': disk_usage.total,\n", " 'used_bytes': disk_usage.used,\n", " 'free_bytes': disk_usage.free,\n", " 'used_percent': (disk_usage.used / disk_usage.total) * 100\n", " }\n", " except OSError:\n", " disk_info = {'error': 'Could not get disk usage'}\n", " \n", " return {\n", " 'hostname': platform.node(),\n", " 'system': platform.system(),\n", " 'release': platform.release(),\n", " 'machine': platform.machine(),\n", " 'python_version': platform.python_version(),\n", " 'disk_usage': disk_info,\n", " 'current_user': os.environ.get('USER', 'unknown'),\n", " 'home_directory': os.environ.get('HOME', 'unknown'),\n", " 'working_directory': os.getcwd()\n", " }\n", " \n", " # Perform the analysis\n", " print(\"Starting filesystem analysis...\")\n", " start_time = time.time()\n", " \n", " directory_analysis = analyze_directory(directory_path)\n", " system_info = get_system_info()\n", " \n", " end_time = time.time()\n", " analysis_time = end_time - start_time\n", " \n", " # Summary statistics\n", " summary = {\n", " 'analysis_parameters': {\n", " 'target_directory': directory_path,\n", " 'max_depth': max_depth,\n", " 'analysis_time_seconds': analysis_time\n", " },\n", " 'directory_summary': directory_analysis['stats'] if 'stats' in directory_analysis else {},\n", " 'system_information': system_info,\n", " 'analysis_metadata': {\n", " 'completed_at': datetime.now().isoformat(),\n", " 'analysis_location': 'remote_server'\n", " }\n", " }\n", " \n", " # Include sample of directory contents (first 20 items)\n", " if 'contents' in directory_analysis:\n", " summary['sample_contents'] = directory_analysis['contents'][:20]\n", " \n", " print(f\"Filesystem analysis completed in {analysis_time:.2f} seconds\")\n", " \n", " return summary\n", "\n", "# Analyze remote filesystem\n", "fs_results = remote_filesystem_analysis(directory_path=\"/usr/local\", max_depth=2)\n", "\n", "print(f\"\\nREMOTE FILESYSTEM ANALYSIS COMPLETE\")\n", "params = fs_results['analysis_parameters']\n", "print(f\"Directory analyzed: {params['target_directory']}\")\n", "print(f\"Max depth: {params['max_depth']}\")\n", "print(f\"Analysis time: {params['analysis_time_seconds']:.2f} seconds\")\n", "\n", "if 'directory_summary' in fs_results and fs_results['directory_summary']:\n", " summary = fs_results['directory_summary']\n", " print(f\"\\nDirectory Summary:\")\n", " print(f\" Total files: {summary.get('total_files', 0):,}\")\n", " print(f\" Total directories: {summary.get('total_dirs', 0):,}\")\n", " print(f\" Total size: {summary.get('total_size', 0):,} bytes\")\n", " \n", " if 'largest_files' in summary and summary['largest_files']:\n", " print(f\"\\nLargest files:\")\n", " for i, file_info in enumerate(summary['largest_files'][:5], 1):\n", " print(f\" {i}. {file_info['name']}: {file_info['size']:,} bytes\")\n", " \n", " if 'file_types' in summary and summary['file_types']:\n", " print(f\"\\nFile types:\")\n", " sorted_types = sorted(summary['file_types'].items(), key=lambda x: x[1], reverse=True)\n", " for ext, count in sorted_types[:5]:\n", " print(f\" {ext}: {count} files\")\n", "\n", "sys_info = fs_results['system_information']\n", "print(f\"\\nSystem Information:\")\n", "print(f\" Hostname: {sys_info['hostname']}\")\n", "print(f\" OS: {sys_info['system']} {sys_info['release']}\")\n", "print(f\" Architecture: {sys_info['machine']}\")\n", "print(f\" Python: {sys_info['python_version']}\")\n", "print(f\" Current user: {sys_info['current_user']}\")\n", "\n", "if 'disk_usage' in sys_info and 'error' not in sys_info['disk_usage']:\n", " disk = sys_info['disk_usage']\n", " print(f\"\\nDisk Usage:\")\n", " print(f\" Total: {disk['total_bytes']:,} bytes\")\n", " print(f\" Used: {disk['used_bytes']:,} bytes ({disk['used_percent']:.1f}%)\")\n", " print(f\" Free: {disk['free_bytes']:,} bytes\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example 4: Remote Environment Testing\n", "\n", "Test the remote Python environment and available packages:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@cluster\n", "def test_remote_environment():\n", " \"\"\"\n", " Test and report on the remote Python environment.\n", " \"\"\"\n", " import sys\n", " import os\n", " import platform\n", " import subprocess\n", " import importlib\n", " from datetime import datetime\n", " \n", " print(\"Testing remote Python environment...\")\n", " \n", " # Basic Python information\n", " python_info = {\n", " 'version': sys.version,\n", " 'version_info': {\n", " 'major': sys.version_info.major,\n", " 'minor': sys.version_info.minor,\n", " 'micro': sys.version_info.micro\n", " },\n", " 'executable': sys.executable,\n", " 'platform': sys.platform,\n", " 'path': sys.path[:5], # First 5 path entries\n", " }\n", " \n", " # System information\n", " system_info = {\n", " 'hostname': platform.node(),\n", " 'system': platform.system(),\n", " 'release': platform.release(),\n", " 'version': platform.version(),\n", " 'machine': platform.machine(),\n", " 'processor': platform.processor()\n", " }\n", " \n", " # Environment variables (selected)\n", " env_vars = {\n", " 'USER': os.environ.get('USER', 'not_set'),\n", " 'HOME': os.environ.get('HOME', 'not_set'),\n", " 'PATH': os.environ.get('PATH', 'not_set')[:200] + '...', # Truncate PATH\n", " 'SHELL': os.environ.get('SHELL', 'not_set'),\n", " 'LANG': os.environ.get('LANG', 'not_set'),\n", " 'PWD': os.environ.get('PWD', 'not_set'),\n", " 'PYTHONPATH': os.environ.get('PYTHONPATH', 'not_set')\n", " }\n", " \n", " # Test common packages\n", " common_packages = [\n", " 'numpy', 'pandas', 'scipy', 'matplotlib', 'sklearn', 'requests',\n", " 'flask', 'django', 'pytest', 'jupyter', 'ipython', 'click',\n", " 'yaml', 'json', 'csv', 'sqlite3', 'pickle', 'datetime',\n", " 'os', 'sys', 'subprocess', 'threading', 'multiprocessing'\n", " ]\n", " \n", " package_status = {}\n", " \n", " for package in common_packages:\n", " try:\n", " module = importlib.import_module(package)\n", " version = getattr(module, '__version__', 'unknown')\n", " package_status[package] = {\n", " 'available': True,\n", " 'version': version,\n", " 'location': getattr(module, '__file__', 'built-in')\n", " }\n", " except ImportError:\n", " package_status[package] = {\n", " 'available': False,\n", " 'error': 'Not installed'\n", " }\n", " except Exception as e:\n", " package_status[package] = {\n", " 'available': False,\n", " 'error': str(e)\n", " }\n", " \n", " # Get pip list (if available)\n", " pip_packages = []\n", " try:\n", " result = subprocess.run(\n", " [sys.executable, '-m', 'pip', 'list', '--format=freeze'], \n", " capture_output=True, \n", " text=True, \n", " timeout=30\n", " )\n", " if result.returncode == 0:\n", " pip_packages = result.stdout.strip().split('\\n')[:20] # First 20 packages\n", " except Exception as e:\n", " pip_packages = [f\"Error getting pip list: {str(e)}\"]\n", " \n", " # Test basic functionality\n", " functionality_tests = {}\n", " \n", " # Test file I/O\n", " try:\n", " import tempfile\n", " with tempfile.NamedTemporaryFile(mode='w', delete=True) as f:\n", " f.write(\"test\")\n", " f.flush()\n", " functionality_tests['file_io'] = {'status': 'ok', 'message': 'File I/O working'}\n", " except Exception as e:\n", " functionality_tests['file_io'] = {'status': 'error', 'message': str(e)}\n", " \n", " # Test networking\n", " try:\n", " import socket\n", " socket.gethostname()\n", " functionality_tests['networking'] = {'status': 'ok', 'message': 'Basic networking working'}\n", " except Exception as e:\n", " functionality_tests['networking'] = {'status': 'error', 'message': str(e)}\n", " \n", " # Test multiprocessing\n", " try:\n", " import multiprocessing\n", " cpu_count = multiprocessing.cpu_count()\n", " functionality_tests['multiprocessing'] = {\n", " 'status': 'ok', \n", " 'message': f'Multiprocessing available, {cpu_count} CPUs detected'\n", " }\n", " except Exception as e:\n", " functionality_tests['multiprocessing'] = {'status': 'error', 'message': str(e)}\n", " \n", " # Test numerical computing\n", " numerical_test = {'status': 'ok', 'tests': []}\n", " try:\n", " # Basic math\n", " import math\n", " result = math.sqrt(16)\n", " numerical_test['tests'].append(f\"math.sqrt(16) = {result}\")\n", " \n", " # NumPy if available\n", " if package_status['numpy']['available']:\n", " import numpy as np\n", " arr = np.array([1, 2, 3, 4, 5])\n", " mean_val = np.mean(arr)\n", " numerical_test['tests'].append(f\"numpy.mean([1,2,3,4,5]) = {mean_val}\")\n", " \n", " functionality_tests['numerical_computing'] = numerical_test\n", " except Exception as e:\n", " functionality_tests['numerical_computing'] = {'status': 'error', 'message': str(e)}\n", " \n", " # Performance test\n", " performance_test = {}\n", " try:\n", " import time\n", " start_time = time.time()\n", " \n", " # Simple computation benchmark\n", " total = sum(i*i for i in range(100000))\n", " \n", " end_time = time.time()\n", " computation_time = end_time - start_time\n", " \n", " performance_test = {\n", " 'computation_time_seconds': computation_time,\n", " 'result': total,\n", " 'operations_per_second': 100000 / computation_time if computation_time > 0 else 0\n", " }\n", " except Exception as e:\n", " performance_test = {'error': str(e)}\n", " \n", " environment_report = {\n", " 'test_metadata': {\n", " 'test_timestamp': datetime.now().isoformat(),\n", " 'test_location': 'remote_server'\n", " },\n", " 'python_information': python_info,\n", " 'system_information': system_info,\n", " 'environment_variables': env_vars,\n", " 'package_availability': {\n", " 'total_tested': len(common_packages),\n", " 'available': sum(1 for pkg in package_status.values() if pkg.get('available', False)),\n", " 'unavailable': sum(1 for pkg in package_status.values() if not pkg.get('available', False)),\n", " 'details': package_status\n", " },\n", " 'installed_packages_sample': pip_packages,\n", " 'functionality_tests': functionality_tests,\n", " 'performance_benchmark': performance_test\n", " }\n", " \n", " print(\"Remote environment testing completed\")\n", " \n", " return environment_report\n", "\n", "# Test remote environment\n", "env_results = test_remote_environment()\n", "\n", "print(f\"\\nREMOTE ENVIRONMENT TEST COMPLETE\")\n", "python_info = env_results['python_information']\n", "print(f\"Python version: {python_info['version_info']['major']}.{python_info['version_info']['minor']}.{python_info['version_info']['micro']}\")\n", "print(f\"Python executable: {python_info['executable']}\")\n", "\n", "system_info = env_results['system_information']\n", "print(f\"\\nSystem: {system_info['system']} {system_info['release']}\")\n", "print(f\"Hostname: {system_info['hostname']}\")\n", "print(f\"Architecture: {system_info['machine']}\")\n", "\n", "pkg_info = env_results['package_availability']\n", "print(f\"\\nPackage Availability:\")\n", "print(f\" Available: {pkg_info['available']}/{pkg_info['total_tested']}\")\n", "print(f\" Unavailable: {pkg_info['unavailable']}/{pkg_info['total_tested']}\")\n", "\n", "# Show some available packages\n", "available_packages = [name for name, info in pkg_info['details'].items() if info.get('available', False)]\n", "print(f\" Key packages available: {', '.join(available_packages[:10])}\")\n", "\n", "func_tests = env_results['functionality_tests']\n", "print(f\"\\nFunctionality Tests:\")\n", "for test_name, test_result in func_tests.items():\n", " status = test_result.get('status', 'unknown')\n", " message = test_result.get('message', 'No message')\n", " print(f\" {test_name}: {status.upper()} - {message}\")\n", "\n", "if 'error' not in env_results['performance_benchmark']:\n", " perf = env_results['performance_benchmark']\n", " print(f\"\\nPerformance Benchmark:\")\n", " print(f\" Computation time: {perf['computation_time_seconds']:.4f} seconds\")\n", " print(f\" Operations per second: {perf['operations_per_second']:,.0f}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SSH Configuration Management\n", "\n", "Best practices for SSH configuration with Clustrix:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def ssh_configuration_examples():\n", " \"\"\"\n", " Examples of different SSH configuration patterns.\n", " \"\"\"\n", " \n", " configurations = {\n", " 'basic_ssh': {\n", " 'description': 'Basic SSH connection with key authentication',\n", " 'config': {\n", " 'cluster_type': 'ssh',\n", " 'cluster_host': 'server.example.com',\n", " 'username': 'user',\n", " 'key_file': '~/.ssh/id_rsa',\n", " 'port': 22\n", " },\n", " 'use_case': 'Single remote server or workstation'\n", " },\n", " 'custom_port': {\n", " 'description': 'SSH connection with custom port',\n", " 'config': {\n", " 'cluster_type': 'ssh',\n", " 'cluster_host': 'secure-server.example.com',\n", " 'username': 'admin',\n", " 'key_file': '~/.ssh/admin_key',\n", " 'port': 2222\n", " },\n", " 'use_case': 'Servers with non-standard SSH ports'\n", " },\n", " 'password_auth': {\n", " 'description': 'SSH with password authentication (less secure)',\n", " 'config': {\n", " 'cluster_type': 'ssh',\n", " 'cluster_host': 'legacy-server.example.com',\n", " 'username': 'olduser',\n", " 'password': 'your-password' # Not recommended for production\n", " },\n", " 'use_case': 'Legacy systems without key-based auth'\n", " },\n", " 'conda_environment': {\n", " 'description': 'SSH with conda environment activation',\n", " 'config': {\n", " 'cluster_type': 'ssh',\n", " 'cluster_host': 'ml-server.example.com',\n", " 'username': 'researcher',\n", " 'key_file': '~/.ssh/research_key',\n", " 'conda_env_name': 'pytorch',\n", " 'python_executable': 'python'\n", " },\n", " 'use_case': 'Servers with conda environments for specific packages'\n", " },\n", " 'virtual_environment': {\n", " 'description': 'SSH with Python virtual environment',\n", " 'config': {\n", " 'cluster_type': 'ssh',\n", " 'cluster_host': 'dev-server.example.com',\n", " 'username': 'developer',\n", " 'key_file': '~/.ssh/dev_key',\n", " 'virtualenv_path': '/home/developer/venv/myproject',\n", " 'python_executable': 'python3'\n", " },\n", " 'use_case': 'Development servers with project-specific environments'\n", " },\n", " 'cloud_instance': {\n", " 'description': 'Cloud instance with specific configuration',\n", " 'config': {\n", " 'cluster_type': 'ssh',\n", " 'cluster_host': 'ec2-12-34-56-78.compute-1.amazonaws.com',\n", " 'username': 'ubuntu', # Common for Ubuntu AMIs\n", " 'key_file': '~/.ssh/aws-keypair.pem',\n", " 'remote_work_dir': '/tmp/clustrix',\n", " 'cleanup_on_success': True\n", " },\n", " 'use_case': 'AWS, GCP, or Azure cloud instances'\n", " }\n", " }\n", " \n", " print(\"SSH Configuration Examples:\")\n", " print(\"=\" * 50)\n", " \n", " for config_name, config_info in configurations.items():\n", " print(f\"\\n{config_name.upper().replace('_', ' ')}:\")\n", " print(f\" Description: {config_info['description']}\")\n", " print(f\" Use case: {config_info['use_case']}\")\n", " print(f\" Configuration:\")\n", " for key, value in config_info['config'].items():\n", " print(f\" {key}: {value}\")\n", " \n", " return configurations\n", "\n", "def ssh_security_best_practices():\n", " \"\"\"\n", " SSH security best practices for Clustrix.\n", " \"\"\"\n", " \n", " practices = {\n", " 'key_management': {\n", " 'title': 'SSH Key Management',\n", " 'practices': [\n", " 'Use strong, unique SSH keys for each server/project',\n", " 'Prefer Ed25519 keys: ssh-keygen -t ed25519 -f ~/.ssh/clustrix_key',\n", " 'Use RSA 4096-bit keys if Ed25519 not supported: ssh-keygen -t rsa -b 4096',\n", " 'Protect private keys with strong passphrases',\n", " 'Set proper permissions: chmod 600 ~/.ssh/private_key',\n", " 'Regularly rotate keys (every 6-12 months)'\n", " ]\n", " },\n", " 'connection_security': {\n", " 'title': 'Connection Security',\n", " 'practices': [\n", " 'Always use key-based authentication, avoid passwords',\n", " 'Disable SSH agent forwarding unless necessary',\n", " 'Use SSH config files for consistent settings',\n", " 'Enable SSH connection multiplexing for efficiency',\n", " 'Set reasonable connection timeouts',\n", " 'Use non-standard SSH ports when possible'\n", " ]\n", " },\n", " 'server_hardening': {\n", " 'title': 'Server-Side Security',\n", " 'practices': [\n", " 'Disable SSH password authentication',\n", " 'Disable SSH root login',\n", " 'Use fail2ban or similar intrusion prevention',\n", " 'Regularly update SSH server software',\n", " 'Monitor SSH access logs',\n", " 'Use firewall rules to restrict SSH access'\n", " ]\n", " },\n", " 'clustrix_specific': {\n", " 'title': 'Clustrix-Specific Security',\n", " 'practices': [\n", " 'Use dedicated SSH keys for Clustrix operations',\n", " 'Restrict remote work directories to user-specific paths',\n", " 'Enable cleanup_on_success to remove temporary files',\n", " 'Limit max_parallel_jobs to prevent resource exhaustion',\n", " 'Monitor remote execution logs for anomalies',\n", " 'Use isolated Python environments for execution'\n", " ]\n", " }\n", " }\n", " \n", " print(\"\\nSSH Security Best Practices:\")\n", " print(\"=\" * 40)\n", " \n", " for category, info in practices.items():\n", " print(f\"\\n{info['title']}:\")\n", " for i, practice in enumerate(info['practices'], 1):\n", " print(f\" {i}. {practice}\")\n", " \n", " return practices\n", "\n", "def ssh_troubleshooting_guide():\n", " \"\"\"\n", " Common SSH issues and solutions.\n", " \"\"\"\n", " \n", " issues = {\n", " 'connection_refused': {\n", " 'problem': 'Connection refused or timeout',\n", " 'solutions': [\n", " 'Check if SSH service is running on remote server',\n", " 'Verify correct hostname/IP address',\n", " 'Check if firewall is blocking SSH port',\n", " 'Confirm SSH port number (default 22)',\n", " 'Test connection with: ssh -v user@hostname'\n", " ]\n", " },\n", " 'permission_denied': {\n", " 'problem': 'Permission denied (publickey)',\n", " 'solutions': [\n", " 'Verify SSH key is added to remote ~/.ssh/authorized_keys',\n", " 'Check SSH key permissions (600 for private key)',\n", " 'Ensure correct username for the server',\n", " 'Verify key file path in Clustrix configuration',\n", " 'Test key with: ssh -i ~/.ssh/key_file user@hostname'\n", " ]\n", " },\n", " 'host_key_verification': {\n", " 'problem': 'Host key verification failed',\n", " 'solutions': [\n", " 'Add host to known_hosts: ssh-keyscan hostname >> ~/.ssh/known_hosts',\n", " 'Remove old host key: ssh-keygen -R hostname',\n", " 'Connect manually first to accept host key',\n", " 'Check if hostname/IP changed',\n", " 'Verify server authenticity before accepting'\n", " ]\n", " },\n", " 'python_not_found': {\n", " 'problem': 'Python executable not found on remote server',\n", " 'solutions': [\n", " 'Specify correct python_executable in configuration',\n", " 'Check if Python is installed: which python3',\n", " 'Add Python to PATH on remote server',\n", " 'Use full path: /usr/bin/python3',\n", " 'Install Python if missing'\n", " ]\n", " },\n", " 'environment_issues': {\n", " 'problem': 'Python environment or package issues',\n", " 'solutions': [\n", " 'Verify conda/virtualenv paths are correct',\n", " 'Check environment activation commands',\n", " 'Install missing packages in remote environment',\n", " 'Use pip install --user for user-level packages',\n", " 'Test environment manually: ssh user@host \"source env/bin/activate && python\"'\n", " ]\n", " }\n", " }\n", " \n", " print(\"\\nSSH Troubleshooting Guide:\")\n", " print(\"=\" * 30)\n", " \n", " for issue_name, issue_info in issues.items():\n", " print(f\"\\n{issue_info['problem']}:\")\n", " for i, solution in enumerate(issue_info['solutions'], 1):\n", " print(f\" {i}. {solution}\")\n", " \n", " return issues\n", "\n", "# Display all SSH guidance\n", "ssh_configs = ssh_configuration_examples()\n", "security_practices = ssh_security_best_practices()\n", "troubleshooting = ssh_troubleshooting_guide()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SSH Connection Testing" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def test_ssh_connection():\n", " \"\"\"\n", " Test SSH connection and basic functionality.\n", " \"\"\"\n", " from clustrix import ClusterExecutor\n", " \n", " try:\n", " # Get current configuration\n", " config = clustrix.get_config()\n", " \n", " if config.cluster_type != 'ssh':\n", " print(\"Current configuration is not for SSH. Please configure for SSH first.\")\n", " return\n", " \n", " print(\"Testing SSH connection...\")\n", " print(f\"Host: {config.cluster_host}\")\n", " print(f\"Username: {config.username}\")\n", " print(f\"Port: {getattr(config, 'port', 22)}\")\n", " \n", " # Create executor and test connection\n", " executor = ClusterExecutor(config)\n", " \n", " print(\"\\n1. Testing SSH connection...\")\n", " executor.connect()\n", " print(\" ✓ SSH connection successful\")\n", " \n", " print(\"\\n2. Testing basic commands...\")\n", " \n", " # Test hostname\n", " stdout, stderr = executor._execute_command(\"hostname\")\n", " if stdout:\n", " print(f\" ✓ Remote hostname: {stdout.strip()}\")\n", " \n", " # Test whoami\n", " stdout, stderr = executor._execute_command(\"whoami\")\n", " if stdout:\n", " print(f\" ✓ Remote user: {stdout.strip()}\")\n", " \n", " # Test pwd\n", " stdout, stderr = executor._execute_command(\"pwd\")\n", " if stdout:\n", " print(f\" ✓ Remote working directory: {stdout.strip()}\")\n", " \n", " print(\"\\n3. Testing Python availability...\")\n", " \n", " # Test Python\n", " python_cmd = getattr(config, 'python_executable', 'python3')\n", " stdout, stderr = executor._execute_command(f\"{python_cmd} --version\")\n", " if stdout:\n", " print(f\" ✓ Python version: {stdout.strip()}\")\n", " elif stderr:\n", " print(f\" ✓ Python version: {stderr.strip()}\")\n", " \n", " # Test Python path\n", " stdout, stderr = executor._execute_command(f\"which {python_cmd}\")\n", " if stdout:\n", " print(f\" ✓ Python executable: {stdout.strip()}\")\n", " \n", " print(\"\\n4. Testing remote work directory...\")\n", " \n", " # Test work directory creation\n", " work_dir = getattr(config, 'remote_work_dir', '/tmp/clustrix')\n", " stdout, stderr = executor._execute_command(f\"mkdir -p {work_dir} && echo 'Directory created'\")\n", " if \"Directory created\" in stdout:\n", " print(f\" ✓ Work directory accessible: {work_dir}\")\n", " \n", " # Test write permissions\n", " test_file = f\"{work_dir}/test_file.txt\"\n", " stdout, stderr = executor._execute_command(f\"echo 'test' > {test_file} && cat {test_file} && rm {test_file}\")\n", " if \"test\" in stdout:\n", " print(f\" ✓ Write permissions confirmed\")\n", " \n", " print(\"\\n5. Testing environment activation...\")\n", " \n", " conda_env = getattr(config, 'conda_env_name', None)\n", " venv_path = getattr(config, 'virtualenv_path', None)\n", " \n", " if conda_env:\n", " stdout, stderr = executor._execute_command(f\"conda activate {conda_env} && echo 'Conda environment activated'\")\n", " if \"activated\" in stdout:\n", " print(f\" ✓ Conda environment '{conda_env}' activated\")\n", " else:\n", " print(f\" ⚠ Conda environment '{conda_env}' activation failed\")\n", " \n", " elif venv_path:\n", " stdout, stderr = executor._execute_command(f\"source {venv_path}/bin/activate && echo 'Virtual environment activated'\")\n", " if \"activated\" in stdout:\n", " print(f\" ✓ Virtual environment '{venv_path}' activated\")\n", " else:\n", " print(f\" ⚠ Virtual environment '{venv_path}' activation failed\")\n", " \n", " else:\n", " print(\" - No environment activation configured\")\n", " \n", " executor.disconnect()\n", " print(\"\\n✓ All SSH tests completed successfully!\")\n", " print(\"\\nYour SSH configuration is working correctly with Clustrix.\")\n", " \n", " except Exception as e:\n", " print(f\"\\n✗ SSH connection test failed: {e}\")\n", " print(\"\\nTroubleshooting suggestions:\")\n", " print(\"1. Check your SSH configuration\")\n", " print(\"2. Verify SSH key authentication\")\n", " print(\"3. Test manual SSH connection: ssh user@hostname\")\n", " print(\"4. Check firewall and network connectivity\")\n", "\n", "# Test SSH connection\n", "print(\"SSH Connection Test:\")\n", "print(\"=\" * 25)\n", "test_ssh_connection()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Summary\n", "\n", "This tutorial covered SSH-based remote execution with Clustrix:\n", "\n", "1. **SSH Configuration** - Setting up Clustrix for direct SSH connections\n", "2. **Basic Remote Computation** - Simple mathematical operations on remote servers\n", "3. **Data Processing** - Generating and analyzing data remotely\n", "4. **File System Operations** - Remote directory analysis and system information\n", "5. **Environment Testing** - Validating remote Python environments\n", "6. **Configuration Management** - Best practices and security guidelines\n", "7. **Connection Testing** - Troubleshooting and verification tools\n", "\n", "### Key SSH Advantages:\n", "\n", "- **Simplicity**: No job scheduler complexity, direct execution\n", "- **Flexibility**: Works with any SSH-accessible server or workstation\n", "- **Speed**: Immediate execution without queue waiting times\n", "- **Control**: Direct access to remote file system and environment\n", "- **Debugging**: Easy to test and troubleshoot connection issues\n", "- **Cost-Effective**: Utilize existing servers without additional infrastructure\n", "\n", "### Best Practices:\n", "\n", "- **Security**: Always use SSH key authentication, never passwords in production\n", "- **Key Management**: Use unique, strong SSH keys with proper permissions\n", "- **Environment Isolation**: Use conda or virtual environments for package management\n", "- **Resource Limits**: Set max_parallel_jobs to prevent overwhelming the server\n", "- **Cleanup**: Enable cleanup_on_success to maintain clean remote directories\n", "- **Monitoring**: Regular checks of remote resource usage and access logs\n", "\n", "### Use Cases:\n", "\n", "- **Development**: Testing code on different environments\n", "- **Workstations**: Utilizing powerful desktop machines remotely\n", "- **Cloud Instances**: Running computations on cloud VMs\n", "- **Legacy Systems**: Accessing older servers without modern schedulers\n", "- **Personal Computing**: Home lab and personal server utilization\n", "- **Prototyping**: Quick testing before moving to larger cluster systems\n", "\n", "### When to Use SSH vs. Other Cluster Types:\n", "\n", "**Choose SSH when**:\n", "- Working with single servers or small clusters\n", "- Need immediate execution without queuing\n", "- Prototyping or development work\n", "- Working with cloud instances or workstations\n", "\n", "**Choose schedulers (SLURM/PBS/SGE) when**:\n", "- Working with large HPC clusters\n", "- Need resource management and fair sharing\n", "- Running production workloads with resource constraints\n", "- Require advanced scheduling features\n", "\n", "**Choose Kubernetes when**:\n", "- Need containerized execution environments\n", "- Require auto-scaling and fault tolerance\n", "- Working with microservices or cloud-native applications\n", "- Need orchestration across multiple nodes\n", "\n", "### Next Steps:\n", "\n", "- Compare with [SLURM Tutorial](slurm_tutorial.ipynb) for HPC cluster computing\n", "- Explore [Kubernetes Tutorial](kubernetes_tutorial.ipynb) for containerized execution\n", "- Review [SSH Setup Guide](../ssh_setup.rst) for detailed security configuration\n", "- Check the [API Documentation](../api/decorator.rst) for advanced decorator options\n", "\n", "For more information, visit the [Clustrix Documentation](https://clustrix.readthedocs.io)." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.0" } }, "nbformat": 4, "nbformat_minor": 4 }