{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Complete Clustrix API Demonstration\n", "\n", "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/ContextLab/clustrix/blob/master/docs/notebooks/complete_api_demo.ipynb)\n", "\n", "This notebook provides a comprehensive demonstration of all Clustrix user-facing functions and features. It serves as both a tutorial and a reference for the complete API.\n", "\n", "## Table of Contents\n", "\n", "1. [Installation and Setup](#installation-and-setup)\n", "2. [Configuration Functions](#configuration-functions)\n", "3. [Cluster Decorator](#cluster-decorator)\n", "4. [Local Execution](#local-execution)\n", "5. [Remote Cluster Execution](#remote-cluster-execution)\n", "6. [Advanced Features](#advanced-features)\n", "7. [Monitoring and Debugging](#monitoring-and-debugging)\n", "8. [Error Handling](#error-handling)\n", "9. [Best Practices](#best-practices)" ], "id": "cell-0" }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Installation and Setup\n", "\n", "First, let's install and import Clustrix:" ], "id": "cell-1" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Install Clustrix (uncomment if needed)\n", "# !pip install clustrix\n", "# !pip install clustrix[kubernetes] # With Kubernetes support\n", "\n", "# Import all Clustrix components\n", "import clustrix\n", "from clustrix import cluster, configure, get_config\n", "from clustrix.config import ClusterConfig\n", "from clustrix.executor import ClusterExecutor\n", "from clustrix.local_executor import LocalExecutor\n", "\n", "# Standard libraries for examples\n", "import numpy as np\n", "import time\n", "import os\n", "from datetime import datetime\n", "\n", "print(f\"Clustrix version: {clustrix.__version__ if hasattr(clustrix, '__version__') else 'development'}\")\n", "print(f\"Import successful at {datetime.now()}\")" ], "id": "cell-2" }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Configuration Functions\n", "\n", "### 1. Basic Configuration\n", "\n", "The `configure()` function is the primary way to set up Clustrix:" ], "id": "cell-3" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Basic local configuration\n", "clustrix.configure(\n", " cluster_type=\"local\", # Use local execution\n", " default_cores=4, # Default number of cores\n", " default_memory=\"8GB\", # Default memory allocation\n", " auto_parallel=True, # Enable automatic parallelization\n", " max_parallel_jobs=10 # Maximum concurrent jobs\n", ")\n", "\n", "print(\"✓ Basic local configuration set\")\n", "\n", "# Get current configuration\n", "config = clustrix.get_config()\n", "print(f\"Current cluster type: {config.cluster_type}\")\n", "print(f\"Default cores: {config.default_cores}\")\n", "print(f\"Default memory: {config.default_memory}\")\n", "print(f\"Auto parallel: {config.auto_parallel}\")" ], "id": "cell-4" }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2. All Configuration Options\n", "\n", "Comprehensive configuration with all available options:" ], "id": "cell-5" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def demonstrate_all_config_options():\n", " \"\"\"\n", " Demonstrate all available configuration options for different cluster types.\n", " \"\"\"\n", " \n", " configurations = {\n", " 'local': {\n", " 'cluster_type': 'local',\n", " 'default_cores': 4,\n", " 'default_memory': '8GB',\n", " 'auto_parallel': True,\n", " 'max_parallel_jobs': 8,\n", " 'cleanup_on_success': True\n", " },\n", " 'slurm': {\n", " 'cluster_type': 'slurm',\n", " 'cluster_host': 'slurm-cluster.university.edu',\n", " 'username': 'researcher',\n", " 'key_file': '~/.ssh/id_rsa',\n", " 'port': 22,\n", " 'default_cores': 8,\n", " 'default_memory': '32GB',\n", " 'default_time': '02:00:00',\n", " 'default_partition': 'normal',\n", " 'default_account': 'research_group',\n", " 'default_qos': 'normal',\n", " 'remote_work_dir': '/scratch/researcher/clustrix',\n", " 'module_loads': ['python/3.9', 'gcc/9.3.0'],\n", " 'conda_env_name': 'myproject',\n", " 'cleanup_on_success': True,\n", " 'max_parallel_jobs': 20\n", " },\n", " 'pbs': {\n", " 'cluster_type': 'pbs',\n", " 'cluster_host': 'pbs-cluster.org',\n", " 'username': 'scientist',\n", " 'key_file': '~/.ssh/pbs_key',\n", " 'default_cores': 6,\n", " 'default_memory': '24GB',\n", " 'default_time': '04:00:00',\n", " 'default_queue': 'bioqueue',\n", " 'remote_work_dir': '/home/scientist/clustrix',\n", " 'walltime': '04:00:00', # PBS-specific\n", " 'features': 'infiniband',\n", " 'cleanup_on_success': True\n", " },\n", " 'sge': {\n", " 'cluster_type': 'sge',\n", " 'cluster_host': 'sge-cluster.example.com',\n", " 'username': 'engineer',\n", " 'key_file': '~/.ssh/sge_key',\n", " 'default_cores': 12,\n", " 'default_memory': '48GB',\n", " 'default_time': '06:00:00',\n", " 'default_queue': 'all.q',\n", " 'pe': 'smp', # SGE parallel environment\n", " 'remote_work_dir': '/home/engineer/clustrix'\n", " },\n", " 'kubernetes': {\n", " 'cluster_type': 'kubernetes',\n", " 'k8s_namespace': 'default',\n", " 'k8s_config_file': '~/.kube/config',\n", " 'default_cores': 4,\n", " 'default_memory': '8Gi',\n", " 'default_cpu_limit': 6,\n", " 'default_memory_limit': '12Gi',\n", " 'container_image': 'python:3.11-slim',\n", " 'image_pull_policy': 'IfNotPresent',\n", " 'job_ttl_seconds': 3600,\n", " 'backoff_limit': 3,\n", " 'restart_policy': 'OnFailure'\n", " },\n", " 'ssh': {\n", " 'cluster_type': 'ssh',\n", " 'cluster_host': 'remote-server.example.com',\n", " 'username': 'developer',\n", " 'key_file': '~/.ssh/dev_key',\n", " 'port': 22,\n", " 'remote_work_dir': '/home/developer/clustrix',\n", " 'python_executable': 'python3',\n", " 'virtualenv_path': '/home/developer/venv/myproject',\n", " 'cleanup_on_success': True,\n", " 'max_parallel_jobs': 5\n", " }\n", " }\n", " \n", " print(\"Configuration Options for All Cluster Types:\")\n", " print(\"=\" * 50)\n", " \n", " for cluster_type, config_options in configurations.items():\n", " print(f\"\\n{cluster_type.upper()} Configuration:\")\n", " for key, value in config_options.items():\n", " print(f\" {key}: {value}\")\n", " \n", " return configurations\n", "\n", "# Display all configuration options\n", "all_configs = demonstrate_all_config_options()" ], "id": "cell-6" }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3. Configuration from File\n", "\n", "Load configuration from YAML files:" ], "id": "cell-7" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import tempfile\n", "import yaml\n", "import os\n", "\n", "# Create a sample configuration file\n", "sample_config = {\n", " 'cluster_type': 'local',\n", " 'default_cores': 6,\n", " 'default_memory': '16GB',\n", " 'auto_parallel': True,\n", " 'max_parallel_jobs': 12,\n", " 'cleanup_on_success': True,\n", " 'environment_variables': {\n", " 'OMP_NUM_THREADS': '6',\n", " 'PYTHONPATH': '/custom/path'\n", " }\n", "}\n", "\n", "# Write to temporary file\n", "with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f:\n", " yaml.dump(sample_config, f)\n", " config_file = f.name\n", "\n", "print(f\"Created configuration file: {config_file}\")\n", "\n", "# Load configuration from file\n", "config = ClusterConfig.from_file(config_file)\n", "print(f\"\\nLoaded configuration:\")\n", "print(f\" Cluster type: {config.cluster_type}\")\n", "print(f\" Cores: {config.default_cores}\")\n", "print(f\" Memory: {config.default_memory}\")\n", "print(f\" Max parallel jobs: {config.max_parallel_jobs}\")\n", "\n", "# Apply the configuration\n", "clustrix.configure(**config.__dict__)\n", "\n", "# Cleanup\n", "os.unlink(config_file)\n", "print(\"\\n✓ Configuration loaded from file and applied\")" ], "id": "cell-8" }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cluster Decorator\n", "\n", "### 1. Basic Decorator Usage\n", "\n", "The `@cluster` decorator is the main interface for distributed execution:" ], "id": "cell-9" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Basic decorator usage\n", "@cluster\n", "def simple_function(x, y):\n", " \"\"\"A simple function with default cluster settings.\"\"\"\n", " import time\n", " time.sleep(0.1) # Simulate some work\n", " return x + y\n", "\n", "result = simple_function(5, 10)\n", "print(f\"Simple function result: {result}\")\n", "\n", "# Decorator with resource specification\n", "@cluster(cores=4, memory=\"8GB\")\n", "def resource_specific_function(data_size):\n", " \"\"\"Function with specific resource requirements.\"\"\"\n", " import numpy as np\n", " data = np.random.random(data_size)\n", " return {\n", " 'mean': np.mean(data),\n", " 'std': np.std(data),\n", " 'size': len(data)\n", " }\n", "\n", "stats = resource_specific_function(100000)\n", "print(f\"Resource-specific function result: mean={stats['mean']:.4f}, std={stats['std']:.4f}\")" ], "id": "cell-10" }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2. All Decorator Parameters\n", "\n", "Comprehensive demonstration of all decorator parameters:" ], "id": "cell-11" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def demonstrate_decorator_parameters():\n", " \"\"\"\n", " Show all available parameters for the @cluster decorator.\n", " \"\"\"\n", " \n", " # Basic resource parameters\n", " @cluster(\n", " cores=8, # Number of CPU cores\n", " memory=\"32GB\", # Memory allocation\n", " time=\"02:00:00\", # Time limit (HH:MM:SS)\n", " parallel=True # Enable automatic parallelization\n", " )\n", " def basic_resources_demo(n):\n", " \"\"\"Basic resource specification.\"\"\"\n", " return sum(i**2 for i in range(n))\n", " \n", " # Scheduler-specific parameters\n", " @cluster(\n", " cores=16,\n", " memory=\"64GB\",\n", " time=\"04:00:00\",\n", " # SLURM-specific\n", " partition=\"gpu\", # SLURM partition\n", " account=\"research_group\", # SLURM account\n", " qos=\"high\", # Quality of Service\n", " gres=\"gpu:2\", # Generic resources (GPUs)\n", " constraint=\"haswell\", # Node constraints\n", " array=\"1-10\", # Job array specification\n", " # PBS-specific\n", " queue=\"bioqueue\", # PBS queue\n", " walltime=\"04:00:00\", # PBS walltime\n", " features=\"infiniband\", # PBS features\n", " # SGE-specific\n", " pe=\"smp 16\", # SGE parallel environment\n", " sge_array=\"1-20\" # SGE task array\n", " )\n", " def scheduler_specific_demo(data):\n", " \"\"\"Scheduler-specific parameter demonstration.\"\"\"\n", " import numpy as np\n", " return np.mean(data)\n", " \n", " # Kubernetes-specific parameters\n", " @cluster(\n", " cores=4,\n", " memory=\"16Gi\", # Kubernetes memory format\n", " cpu_limit=6, # CPU limit (can exceed cores)\n", " memory_limit=\"24Gi\", # Memory limit\n", " container_image=\"python:3.11\", # Container image\n", " job_name=\"custom-job\", # Kubernetes job name\n", " parallelism=3, # Parallel pod execution\n", " completions=10, # Total completions needed\n", " backoff_limit=3, # Retry limit on failure\n", " restart_policy=\"OnFailure\", # Pod restart policy\n", " job_ttl_seconds=7200, # Job cleanup time\n", " active_deadline_seconds=3600 # Maximum job runtime\n", " )\n", " def kubernetes_demo(task_id):\n", " \"\"\"Kubernetes-specific parameter demonstration.\"\"\"\n", " import os\n", " import time\n", " time.sleep(1)\n", " return {\n", " 'task_id': task_id,\n", " 'pod_name': os.environ.get('HOSTNAME', 'unknown'),\n", " 'completion_time': time.time()\n", " }\n", " \n", " # Environment and execution parameters\n", " @cluster(\n", " cores=4,\n", " memory=\"16GB\",\n", " environment={'OMP_NUM_THREADS': '4', 'CUDA_VISIBLE_DEVICES': '0'},\n", " conda_env=\"myproject\", # Conda environment\n", " virtualenv_path=\"/path/to/venv\", # Virtual environment\n", " python_executable=\"python3\", # Python command\n", " working_directory=\"/tmp\", # Working directory\n", " cleanup_files=True, # Cleanup temporary files\n", " timeout=3600 # Execution timeout\n", " )\n", " def environment_demo(message):\n", " \"\"\"Environment configuration demonstration.\"\"\"\n", " import os\n", " return {\n", " 'message': message,\n", " 'omp_threads': os.environ.get('OMP_NUM_THREADS', 'not_set'),\n", " 'cuda_devices': os.environ.get('CUDA_VISIBLE_DEVICES', 'not_set'),\n", " 'working_dir': os.getcwd()\n", " }\n", " \n", " print(\"Decorator Parameter Demonstrations:\")\n", " print(\"=\" * 40)\n", " \n", " # Run basic resources demo\n", " print(\"\\n1. Basic Resources Demo:\")\n", " result1 = basic_resources_demo(1000)\n", " print(f\" Sum of squares: {result1:,}\")\n", " \n", " # Run environment demo\n", " print(\"\\n2. Environment Demo:\")\n", " result2 = environment_demo(\"Hello from cluster!\")\n", " print(f\" Message: {result2['message']}\")\n", " print(f\" OMP threads: {result2['omp_threads']}\")\n", " print(f\" Working dir: {result2['working_dir']}\")\n", " \n", " return {\n", " 'basic_resources': basic_resources_demo,\n", " 'scheduler_specific': scheduler_specific_demo,\n", " 'kubernetes': kubernetes_demo,\n", " 'environment': environment_demo\n", " }\n", "\n", "# Demonstrate all decorator parameters\n", "decorator_functions = demonstrate_decorator_parameters()" ], "id": "cell-12" }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3. Automatic Parallelization\n", "\n", "Clustrix can automatically parallelize loops:" ], "id": "cell-13" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Sequential execution (default)\n", "@cluster(cores=4, parallel=False)\n", "def sequential_processing(items):\n", " \"\"\"Process items sequentially.\"\"\"\n", " import time\n", " results = []\n", " for item in items:\n", " time.sleep(0.01) # Simulate work\n", " results.append(item ** 2)\n", " return results\n", "\n", "# Parallel execution\n", "@cluster(cores=4, parallel=True)\n", "def parallel_processing(items):\n", " \"\"\"Process items in parallel.\"\"\"\n", " import time\n", " results = []\n", " for item in items: # This loop will be parallelized\n", " time.sleep(0.01) # Simulate work\n", " results.append(item ** 2)\n", " return results\n", "\n", "# Test data\n", "test_items = list(range(20))\n", "\n", "# Time sequential execution\n", "start = time.time()\n", "seq_result = sequential_processing(test_items)\n", "seq_time = time.time() - start\n", "\n", "# Time parallel execution\n", "start = time.time()\n", "par_result = parallel_processing(test_items)\n", "par_time = time.time() - start\n", "\n", "print(f\"Sequential execution: {seq_time:.3f} seconds\")\n", "print(f\"Parallel execution: {par_time:.3f} seconds\")\n", "print(f\"Speedup: {seq_time/par_time:.2f}x\")\n", "print(f\"Results match: {seq_result == par_result}\")\n", "print(f\"Sample results: {seq_result[:5]}\")" ], "id": "cell-14" }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Local Execution\n", "\n", "### 1. Local Executor Direct Usage\n", "\n", "Use the LocalExecutor directly for fine-grained control:" ], "id": "cell-15" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from clustrix.local_executor import LocalExecutor\n", "\n", "# Create local executor\n", "local_config = ClusterConfig(\n", " cluster_type=\"local\",\n", " default_cores=4,\n", " auto_parallel=True\n", ")\n", "\n", "executor = LocalExecutor(local_config)\n", "\n", "# Define a function to execute\n", "def compute_statistics(data):\n", " \"\"\"Compute basic statistics on data.\"\"\"\n", " import numpy as np\n", " return {\n", " 'mean': np.mean(data),\n", " 'std': np.std(data),\n", " 'median': np.median(data),\n", " 'min': np.min(data),\n", " 'max': np.max(data)\n", " }\n", "\n", "# Execute function with local executor\n", "test_data = np.random.normal(100, 15, 10000)\n", "result = executor.execute_function(compute_statistics, (test_data,), {})\n", "\n", "print(\"Local Executor Results:\")\n", "for key, value in result.items():\n", " print(f\" {key}: {value:.4f}\")\n", "\n", "# Test parallel loop execution\n", "def parallel_computation(n_iterations):\n", " \"\"\"Function with parallelizable loop.\"\"\"\n", " import numpy as np\n", " results = []\n", " for i in range(n_iterations):\n", " # Simulate CPU-intensive work\n", " data = np.random.random(1000)\n", " result = np.sum(data ** 2)\n", " results.append(result)\n", " return np.mean(results)\n", "\n", "# Execute with automatic parallelization\n", "start_time = time.time()\n", "parallel_result = executor.execute_loop_parallel(\n", " parallel_computation, \n", " 'i', \n", " range(100), # Will be chunked across cores\n", " cores=4\n", ")\n", "execution_time = time.time() - start_time\n", "\n", "print(f\"\\nParallel loop execution:\")\n", "print(f\" Result: {parallel_result:.6f}\")\n", "print(f\" Execution time: {execution_time:.3f} seconds\")" ], "id": "cell-16" }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2. CPU vs I/O Detection\n", "\n", "Clustrix automatically chooses between multiprocessing and threading:" ], "id": "cell-17" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from clustrix.local_executor import choose_executor_type\n", "\n", "# CPU-intensive function\n", "def cpu_intensive_task(n):\n", " \"\"\"CPU-bound computation.\"\"\"\n", " total = 0\n", " for i in range(n):\n", " total += i ** 0.5\n", " return total\n", "\n", "# I/O-intensive function\n", "def io_intensive_task(filename):\n", " \"\"\"I/O-bound operation.\"\"\"\n", " import time\n", " time.sleep(0.1) # Simulate I/O wait\n", " with open(filename, 'w') as f:\n", " f.write(\"test data\")\n", " return f\"File {filename} written\"\n", "\n", "# Function with network I/O patterns\n", "def network_task(url):\n", " \"\"\"Network request simulation.\"\"\"\n", " import urllib.request\n", " import time\n", " time.sleep(0.05) # Simulate network latency\n", " return f\"Fetched {url}\"\n", "\n", "# Test executor type selection\n", "test_cases = [\n", " (cpu_intensive_task, (10000,), {}),\n", " (io_intensive_task, (\"/tmp/test.txt\",), {}),\n", " (network_task, (\"http://example.com\",), {})\n", "]\n", "\n", "print(\"Executor Type Selection:\")\n", "print(\"=\" * 30)\n", "\n", "for func, args, kwargs in test_cases:\n", " use_threads = choose_executor_type(func, args, kwargs)\n", " executor_type = \"ThreadPoolExecutor\" if use_threads else \"ProcessPoolExecutor\"\n", " task_type = \"I/O-bound\" if use_threads else \"CPU-bound\"\n", " \n", " print(f\"Function: {func.__name__}\")\n", " print(f\" Detected as: {task_type}\")\n", " print(f\" Will use: {executor_type}\")\n", " print()" ], "id": "cell-18" }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Remote Cluster Execution\n", "\n", "### 1. Cluster Executor Direct Usage\n", "\n", "Use ClusterExecutor for direct cluster operations:" ], "id": "cell-19" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Note: This section demonstrates the API but won't actually connect to remote clusters\n", "# in this demo notebook\n", "\n", "def demonstrate_cluster_executor_api():\n", " \"\"\"\n", " Demonstrate the ClusterExecutor API without actually connecting.\n", " \"\"\"\n", " \n", " # Example configurations for different cluster types\n", " cluster_configs = {\n", " 'slurm': ClusterConfig(\n", " cluster_type=\"slurm\",\n", " cluster_host=\"slurm-cluster.edu\",\n", " username=\"researcher\",\n", " key_file=\"~/.ssh/id_rsa\",\n", " default_partition=\"normal\"\n", " ),\n", " 'pbs': ClusterConfig(\n", " cluster_type=\"pbs\",\n", " cluster_host=\"pbs-cluster.org\",\n", " username=\"scientist\",\n", " default_queue=\"bioqueue\"\n", " ),\n", " 'kubernetes': ClusterConfig(\n", " cluster_type=\"kubernetes\",\n", " k8s_namespace=\"default\",\n", " container_image=\"python:3.11-slim\"\n", " )\n", " }\n", " \n", " print(\"Cluster Executor API Demonstration:\")\n", " print(\"=\" * 40)\n", " \n", " for cluster_type, config in cluster_configs.items():\n", " print(f\"\\n{cluster_type.upper()} Executor:\")\n", " \n", " # Create executor (but don't connect)\n", " executor = ClusterExecutor(config)\n", " \n", " print(f\" Cluster type: {executor.config.cluster_type}\")\n", " print(f\" Config object: {type(executor.config).__name__}\")\n", " \n", " # Show available methods\n", " methods = [method for method in dir(executor) \n", " if not method.startswith('_') and callable(getattr(executor, method))]\n", " print(f\" Available methods: {', '.join(methods[:5])}...\")\n", " \n", " # Example of what cluster execution would look like\n", " print(\"\\nExample cluster execution pattern:\")\n", " print(\"\"\"\n", " # 1. Create and configure executor\n", " executor = ClusterExecutor(config)\n", " \n", " # 2. Connect to cluster\n", " executor.connect()\n", " \n", " # 3. Submit job\n", " job_id = executor.submit_job(function, args, kwargs, job_config)\n", " \n", " # 4. Monitor job status\n", " status = executor.get_job_status(job_id)\n", " \n", " # 5. Retrieve results\n", " result = executor.get_result(job_id)\n", " \n", " # 6. Cleanup\n", " executor.cleanup_job(job_id)\n", " executor.disconnect()\n", " \"\"\")\n", " \n", " return cluster_configs\n", "\n", "# Demonstrate the API\n", "cluster_configs = demonstrate_cluster_executor_api()" ], "id": "cell-20" }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2. Job Management Functions\n", "\n", "Functions for managing cluster jobs:" ], "id": "cell-21" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def demonstrate_job_management():\n", " \"\"\"\n", " Demonstrate job management functions and patterns.\n", " \"\"\"\n", " \n", " print(\"Job Management Functions:\")\n", " print(\"=\" * 30)\n", " \n", " # Job submission patterns\n", " job_patterns = {\n", " 'single_job': {\n", " 'description': 'Submit single job with specific resources',\n", " 'example': '''\n", "@cluster(cores=8, memory=\"32GB\", time=\"02:00:00\")\n", "def my_computation(data):\n", " return process_data(data)\n", " '''\n", " },\n", " 'job_array': {\n", " 'description': 'Submit job array for parameter sweeps',\n", " 'example': '''\n", "@cluster(cores=4, memory=\"16GB\", array=\"1-100\")\n", "def parameter_sweep(base_params):\n", " task_id = int(os.environ.get('SLURM_ARRAY_TASK_ID', '1'))\n", " params = modify_params(base_params, task_id)\n", " return run_simulation(params)\n", " '''\n", " },\n", " 'parallel_jobs': {\n", " 'description': 'Submit multiple independent jobs',\n", " 'example': '''\n", "@cluster(cores=4, memory=\"16GB\", parallel=True)\n", "def parallel_analysis(datasets):\n", " results = []\n", " for dataset in datasets: # Each iteration becomes separate job\n", " results.append(analyze_dataset(dataset))\n", " return results\n", " '''\n", " },\n", " 'dependent_jobs': {\n", " 'description': 'Chain jobs with dependencies',\n", " 'example': '''\n", "# Job 1: Data preprocessing\n", "@cluster(cores=4, memory=\"16GB\")\n", "def preprocess_data(raw_data):\n", " return clean_and_transform(raw_data)\n", "\n", "# Job 2: Analysis (depends on Job 1)\n", "@cluster(cores=8, memory=\"32GB\", dependency=\"afterok:$JOB1_ID\")\n", "def analyze_processed_data(processed_data):\n", " return run_analysis(processed_data)\n", " '''\n", " }\n", " }\n", " \n", " for pattern_name, pattern_info in job_patterns.items():\n", " print(f\"\\n{pattern_name.upper().replace('_', ' ')}:\")\n", " print(f\" Description: {pattern_info['description']}\")\n", " print(f\" Example:{pattern_info['example']}\")\n", " \n", " # Job monitoring functions\n", " print(\"\\n\" + \"=\" * 30)\n", " print(\"Job Monitoring Functions:\")\n", " \n", " monitoring_functions = {\n", " 'get_job_status()': 'Check current status of submitted job',\n", " 'list_active_jobs()': 'List all active jobs for user',\n", " 'get_job_info()': 'Get detailed information about specific job',\n", " 'cancel_job()': 'Cancel running or queued job',\n", " 'get_job_output()': 'Retrieve stdout/stderr from completed job',\n", " 'get_job_resources()': 'Get resource usage statistics',\n", " 'estimate_queue_time()': 'Estimate queue wait time for job'\n", " }\n", " \n", " for func_name, description in monitoring_functions.items():\n", " print(f\" {func_name:20} - {description}\")\n", " \n", " return job_patterns\n", "\n", "# Demonstrate job management\n", "job_patterns = demonstrate_job_management()" ], "id": "cell-22" }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Advanced Features\n", "\n", "### 1. Custom Serialization\n", "\n", "Handle complex objects and custom serialization:" ], "id": "cell-23" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pickle\n", "import cloudpickle\n", "import dill\n", "\n", "class CustomClass:\n", " \"\"\"A custom class to test serialization.\"\"\"\n", " \n", " def __init__(self, name, data):\n", " self.name = name\n", " self.data = data\n", " \n", " def process(self):\n", " return f\"Processed {self.name} with {len(self.data)} items\"\n", " \n", " def __repr__(self):\n", " return f\"CustomClass(name='{self.name}', data_length={len(self.data)})\"\n", "\n", "# Test serialization with different libraries\n", "@cluster(cores=2)\n", "def test_serialization(custom_obj, serializer_name):\n", " \"\"\"Test custom object serialization.\"\"\"\n", " result = custom_obj.process()\n", " return {\n", " 'serializer': serializer_name,\n", " 'object_name': custom_obj.name,\n", " 'result': result,\n", " 'data_length': len(custom_obj.data)\n", " }\n", "\n", "# Create test object\n", "test_obj = CustomClass(\"test_object\", list(range(1000)))\n", "\n", "# Test with different serializers\n", "serializers = ['cloudpickle', 'dill', 'pickle']\n", "\n", "print(\"Serialization Testing:\")\n", "print(\"=\" * 25)\n", "\n", "for serializer in serializers:\n", " try:\n", " result = test_serialization(test_obj, serializer)\n", " print(f\"\\n{serializer.upper()}:\")\n", " print(f\" ✓ Serialization successful\")\n", " print(f\" Object: {result['object_name']}\")\n", " print(f\" Result: {result['result']}\")\n", " except Exception as e:\n", " print(f\"\\n{serializer.upper()}:\")\n", " print(f\" ✗ Serialization failed: {e}\")\n", "\n", "# Test lambda function serialization\n", "@cluster(cores=2)\n", "def test_lambda_serialization(data, transform_func):\n", " \"\"\"Test lambda function serialization.\"\"\"\n", " transformed = [transform_func(x) for x in data]\n", " return {\n", " 'original_data': data,\n", " 'transformed_data': transformed,\n", " 'function_type': str(type(transform_func))\n", " }\n", "\n", "# Test with lambda\n", "test_data = [1, 2, 3, 4, 5]\n", "lambda_func = lambda x: x ** 2\n", "\n", "try:\n", " lambda_result = test_lambda_serialization(test_data, lambda_func)\n", " print(f\"\\nLAMBDA FUNCTION SERIALIZATION:\")\n", " print(f\" ✓ Success\")\n", " print(f\" Original: {lambda_result['original_data']}\")\n", " print(f\" Transformed: {lambda_result['transformed_data']}\")\nexcept Exception as e:\n", " print(f\"\\nLAMBDA FUNCTION SERIALIZATION:\")\n", " print(f\" ✗ Failed: {e}\")" ], "id": "cell-24" }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2. Environment Management\n", "\n", "Manage remote environments and dependencies:" ], "id": "cell-25" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def demonstrate_environment_management():\n", " \"\"\"\n", " Demonstrate environment management features.\n", " \"\"\"\n", " \n", " print(\"Environment Management Features:\")\n", " print(\"=\" * 35)\n", " \n", " # Environment configuration options\n", " env_configs = {\n", " 'conda_environment': {\n", " 'description': 'Use conda environment on remote cluster',\n", " 'config': {\n", " 'conda_env_name': 'myproject',\n", " 'conda_path': '/opt/conda/bin/conda'\n", " },\n", " 'usage': '''\n", "@cluster(cores=4, conda_env=\"myproject\")\n", "def ml_computation(data):\n", " import tensorflow as tf # Available in conda env\n", " return train_model(data)\n", " '''\n", " },\n", " 'virtual_environment': {\n", " 'description': 'Use Python virtual environment',\n", " 'config': {\n", " 'virtualenv_path': '/home/user/venv/myproject',\n", " 'python_executable': 'python3'\n", " },\n", " 'usage': '''\n", "configure(\n", " cluster_type=\"ssh\",\n", " virtualenv_path=\"/home/user/venv/myproject\"\n", ")\n", " '''\n", " },\n", " 'module_loading': {\n", " 'description': 'Load environment modules (HPC clusters)',\n", " 'config': {\n", " 'module_loads': ['python/3.9', 'gcc/9.3.0', 'openmpi/4.1']\n", " },\n", " 'usage': '''\n", "configure(\n", " cluster_type=\"slurm\",\n", " module_loads=[\"python/3.9\", \"gcc/9.3.0\"]\n", ")\n", " '''\n", " },\n", " 'environment_variables': {\n", " 'description': 'Set custom environment variables',\n", " 'config': {\n", " 'environment_variables': {\n", " 'OMP_NUM_THREADS': '8',\n", " 'CUDA_VISIBLE_DEVICES': '0,1',\n", " 'PYTHONPATH': '/custom/path'\n", " }\n", " },\n", " 'usage': '''\n", "@cluster(\n", " cores=8,\n", " environment={\n", " 'OMP_NUM_THREADS': '8',\n", " 'CUDA_VISIBLE_DEVICES': '0,1'\n", " }\n", ")\n", "def gpu_computation(data):\n", " return process_on_gpu(data)\n", " '''\n", " },\n", " 'dependency_management': {\n", " 'description': 'Automatic dependency installation',\n", " 'config': {\n", " 'pip_requirements': ['numpy>=1.20', 'scipy>=1.7', 'scikit-learn'],\n", " 'conda_packages': ['tensorflow', 'pytorch']\n", " },\n", " 'usage': '''\n", "# Clustrix automatically captures local environment\n", "# and recreates it on remote cluster using pip freeze\n", "@cluster(cores=4)\n", "def analysis_with_deps(data):\n", " import pandas as pd # Will be installed if missing\n", " import sklearn # Will be installed if missing\n", " return analyze_data(data)\n", " '''\n", " }\n", " }\n", " \n", " for env_type, env_info in env_configs.items():\n", " print(f\"\\n{env_type.upper().replace('_', ' ')}:\")\n", " print(f\" Description: {env_info['description']}\")\n", " print(f\" Configuration:\")\n", " for key, value in env_info['config'].items():\n", " print(f\" {key}: {value}\")\n", " print(f\" Usage example:{env_info['usage']}\")\n", " \n", " return env_configs\n", "\n", "# Demonstrate environment management\n", "env_configs = demonstrate_environment_management()" ], "id": "cell-26" }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3. Error Handling and Recovery\n", "\n", "Robust error handling and recovery mechanisms:" ], "id": "cell-27" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import random\n", "\n", "# Function that may fail randomly\n", "@cluster(cores=2)\n", "def unreliable_computation(data, failure_rate=0.3):\n", " \"\"\"A computation that may fail randomly.\"\"\"\n", " import random\n", " import time\n", " \n", " # Simulate random failures\n", " if random.random() < failure_rate:\n", " raise RuntimeError(f\"Simulated failure during computation\")\n", " \n", " # Simulate work\n", " time.sleep(0.1)\n", " result = sum(x**2 for x in data)\n", " return result\n", "\n", "# Function with retry logic\n", "@cluster(cores=2)\n", "def computation_with_retry(data, max_retries=3):\n", " \"\"\"Computation with built-in retry logic.\"\"\"\n", " import random\n", " import time\n", " \n", " for attempt in range(max_retries + 1):\n", " try:\n", " # Simulate potential failure\n", " if random.random() < 0.4 and attempt < max_retries:\n", " raise RuntimeError(f\"Attempt {attempt + 1} failed\")\n", " \n", " # Actual computation\n", " time.sleep(0.05)\n", " result = sum(x**3 for x in data)\n", " \n", " return {\n", " 'result': result,\n", " 'attempts': attempt + 1,\n", " 'success': True\n", " }\n", " \n", " except Exception as e:\n", " if attempt == max_retries:\n", " return {\n", " 'result': None,\n", " 'attempts': attempt + 1,\n", " 'success': False,\n", " 'error': str(e)\n", " }\n", " time.sleep(0.1 * (attempt + 1)) # Exponential backoff\n", "\n", "# Function with graceful degradation\n", "@cluster(cores=2)\n", "def robust_computation(data, fallback_method=True):\n", " \"\"\"Computation with fallback method.\"\"\"\n", " import numpy as np\n", " \n", " try:\n", " # Primary method (may fail)\n", " if len(data) > 1000: # Simulate failure condition\n", " raise MemoryError(\"Not enough memory for primary method\")\n", " \n", " # Primary computation\n", " result = np.fft.fft(data).real\n", " return {\n", " 'result': np.mean(result),\n", " 'method': 'primary_fft',\n", " 'success': True\n", " }\n", " \n", " except Exception as e:\n", " if fallback_method:\n", " # Fallback method\n", " result = np.mean(data) # Simple fallback\n", " return {\n", " 'result': result,\n", " 'method': 'fallback_mean',\n", " 'success': True,\n", " 'warning': f\"Used fallback due to: {str(e)}\"\n", " }\n", " else:\n", " raise\n", "\n", "print(\"Error Handling and Recovery:\")\n", "print(\"=\" * 30)\n", "\n", "# Test unreliable computation\n", "test_data = list(range(50))\n", "successes = 0\n", "failures = 0\n", "\n", "print(\"\\n1. Testing Unreliable Computation:\")\n", "for i in range(10):\n", " try:\n", " result = unreliable_computation(test_data, failure_rate=0.3)\n", " successes += 1\n", " except Exception as e:\n", " failures += 1\n", "\n", "print(f\" Successes: {successes}/10\")\n", "print(f\" Failures: {failures}/10\")\n", "\n", "# Test computation with retry\n", "print(\"\\n2. Testing Computation with Retry:\")\n", "retry_results = []\n", "for i in range(5):\n", " result = computation_with_retry(test_data, max_retries=3)\n", " retry_results.append(result)\n", " status = \"✓\" if result['success'] else \"✗\"\n", " print(f\" {status} Attempt {i+1}: {result['attempts']} tries, Success: {result['success']}\")\n", "\n", "# Test robust computation with fallback\n", "print(\"\\n3. Testing Robust Computation:\")\n", "\n", "# Small data (should use primary method)\n", "small_data = list(range(100))\n", "small_result = robust_computation(small_data)\n", "print(f\" Small data: {small_result['method']}, Result: {small_result['result']:.4f}\")\n", "\n", "# Large data (should use fallback)\n", "large_data = list(range(2000))\n", "large_result = robust_computation(large_data)\n", "print(f\" Large data: {large_result['method']}, Result: {large_result['result']:.4f}\")\n", "if 'warning' in large_result:\n", " print(f\" Warning: {large_result['warning']}\")" ], "id": "cell-28" }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Monitoring and Debugging\n", "\n", "### 1. Performance Monitoring\n", "\n", "Monitor execution performance and resource usage:" ], "id": "cell-29" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import psutil\n", "import threading\n", "import time\n", "from datetime import datetime\n", "\n", "class PerformanceMonitor:\n", " \"\"\"Monitor performance during function execution.\"\"\"\n", " \n", " def __init__(self, interval=0.1):\n", " self.interval = interval\n", " self.monitoring = False\n", " self.metrics = []\n", " \n", " def start_monitoring(self):\n", " \"\"\"Start performance monitoring.\"\"\"\n", " self.monitoring = True\n", " self.metrics = []\n", " \n", " def monitor():\n", " while self.monitoring:\n", " try:\n", " cpu_percent = psutil.cpu_percent()\n", " memory = psutil.virtual_memory()\n", " \n", " self.metrics.append({\n", " 'timestamp': time.time(),\n", " 'cpu_percent': cpu_percent,\n", " 'memory_percent': memory.percent,\n", " 'memory_used_gb': memory.used / (1024**3)\n", " })\n", " except:\n", " pass # Skip if monitoring fails\n", " \n", " time.sleep(self.interval)\n", " \n", " self.monitor_thread = threading.Thread(target=monitor, daemon=True)\n", " self.monitor_thread.start()\n", " \n", " def stop_monitoring(self):\n", " \"\"\"Stop performance monitoring.\"\"\"\n", " self.monitoring = False\n", " if hasattr(self, 'monitor_thread'):\n", " self.monitor_thread.join(timeout=1.0)\n", " \n", " def get_summary(self):\n", " \"\"\"Get performance summary.\"\"\"\n", " if not self.metrics:\n", " return {'error': 'No metrics collected'}\n", " \n", " cpu_values = [m['cpu_percent'] for m in self.metrics]\n", " memory_values = [m['memory_percent'] for m in self.metrics]\n", " \n", " return {\n", " 'duration_seconds': self.metrics[-1]['timestamp'] - self.metrics[0]['timestamp'],\n", " 'samples_collected': len(self.metrics),\n", " 'cpu_usage': {\n", " 'mean': np.mean(cpu_values),\n", " 'max': np.max(cpu_values),\n", " 'min': np.min(cpu_values),\n", " 'std': np.std(cpu_values)\n", " },\n", " 'memory_usage': {\n", " 'mean': np.mean(memory_values),\n", " 'max': np.max(memory_values),\n", " 'min': np.min(memory_values),\n", " 'peak_gb': np.max([m['memory_used_gb'] for m in self.metrics])\n", " }\n", " }\n", "\n", "# Monitored computation function\n", "@cluster(cores=4)\n", "def monitored_computation(size, complexity=\"medium\"):\n", " \"\"\"A computation that can be monitored for performance.\"\"\"\n", " import numpy as np\n", " import time\n", " \n", " # Different complexity levels\n", " if complexity == \"low\":\n", " data = np.random.random(size)\n", " result = np.sum(data)\n", " elif complexity == \"medium\":\n", " data = np.random.random((size, 10))\n", " result = np.sum(np.dot(data, data.T))\n", " else: # high\n", " data = np.random.random((size, size//10))\n", " for _ in range(3):\n", " data = np.dot(data, data.T[:data.shape[1], :])\n", " result = np.sum(data)\n", " \n", " return {\n", " 'result': float(result),\n", " 'size': size,\n", " 'complexity': complexity\n", " }\n", "\n", "print(\"Performance Monitoring:\")\n", "print(\"=\" * 25)\n", "\n", "# Test different complexity levels\n", "test_cases = [\n", " (1000, \"low\"),\n", " (500, \"medium\"),\n", " (100, \"high\")\n", "]\n", "\n", "for size, complexity in test_cases:\n", " print(f\"\\nTesting {complexity} complexity (size={size}):\")\n", " \n", " # Start monitoring\n", " monitor = PerformanceMonitor(interval=0.05)\n", " monitor.start_monitoring()\n", " \n", " # Run computation\n", " start_time = time.time()\n", " result = monitored_computation(size, complexity)\n", " end_time = time.time()\n", " \n", " # Stop monitoring\n", " monitor.stop_monitoring()\n", " \n", " # Get results\n", " perf_summary = monitor.get_summary()\n", " execution_time = end_time - start_time\n", " \n", " print(f\" Execution time: {execution_time:.3f} seconds\")\n", " print(f\" Result: {result['result']:.2e}\")\n", " \n", " if 'error' not in perf_summary:\n", " print(f\" CPU usage: {perf_summary['cpu_usage']['mean']:.1f}% avg, {perf_summary['cpu_usage']['max']:.1f}% max\")\n", " print(f\" Memory usage: {perf_summary['memory_usage']['mean']:.1f}% avg, {perf_summary['memory_usage']['peak_gb']:.2f} GB peak\")\n", " print(f\" Samples collected: {perf_summary['samples_collected']}\")" ], "id": "cell-30" }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2. Debugging Utilities\n", "\n", "Utilities for debugging distributed computations:" ], "id": "cell-31" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "import traceback\n", "import logging\n", "\n", "# Configure logging\n", "logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\n", "logger = logging.getLogger(__name__)\n", "\n", "# Function with debug information\n", "@cluster(cores=2)\n", "def debug_computation(data, debug_level=\"info\"):\n", " \"\"\"Computation with extensive debugging information.\"\"\"\n", " import sys\n", " import os\n", " import platform\n", " import time\n", " from datetime import datetime\n", " \n", " debug_info = {\n", " 'execution_start': datetime.now().isoformat(),\n", " 'python_version': sys.version,\n", " 'platform': platform.platform(),\n", " 'working_directory': os.getcwd(),\n", " 'process_id': os.getpid(),\n", " 'environment_vars': dict(os.environ),\n", " 'input_data_type': str(type(data)),\n", " 'input_data_length': len(data) if hasattr(data, '__len__') else 'unknown'\n", " }\n", " \n", " try:\n", " # Simulate computation with progress tracking\n", " if debug_level == \"verbose\":\n", " print(f\"Starting computation at {debug_info['execution_start']}\")\n", " print(f\"Input data: {debug_info['input_data_type']} with {debug_info['input_data_length']} items\")\n", " \n", " result = 0\n", " for i, value in enumerate(data):\n", " if debug_level == \"verbose\" and i % (len(data) // 5) == 0:\n", " print(f\"Progress: {i}/{len(data)} ({100*i/len(data):.1f}%)\")\n", " \n", " result += value ** 2\n", " \n", " # Simulate occasional issues\n", " if i == len(data) // 2 and debug_level == \"test_error\":\n", " raise ValueError(f\"Test error at position {i}\")\n", " \n", " debug_info.update({\n", " 'execution_end': datetime.now().isoformat(),\n", " 'success': True,\n", " 'result': result,\n", " 'items_processed': len(data)\n", " })\n", " \n", " if debug_level in [\"info\", \"verbose\"]:\n", " print(f\"Computation completed successfully\")\n", " \n", " return debug_info\n", " \n", " except Exception as e:\n", " debug_info.update({\n", " 'execution_end': datetime.now().isoformat(),\n", " 'success': False,\n", " 'error_type': str(type(e).__name__),\n", " 'error_message': str(e),\n", " 'traceback': traceback.format_exc()\n", " })\n", " \n", " if debug_level in [\"info\", \"verbose\"]:\n", " print(f\"Computation failed: {e}\")\n", " \n", " return debug_info\n", "\n", "# Function to test serialization issues\n", "@cluster(cores=2)\n", "def test_serialization_debug(problematic_object):\n", " \"\"\"Test function that may have serialization issues.\"\"\"\n", " try:\n", " # Try to use the problematic object\n", " result = problematic_object.some_method() if hasattr(problematic_object, 'some_method') else str(problematic_object)\n", " return {'success': True, 'result': result}\n", " except Exception as e:\n", " return {\n", " 'success': False,\n", " 'error': str(e),\n", " 'object_type': str(type(problematic_object))\n", " }\n", "\n", "print(\"Debugging Utilities:\")\n", "print(\"=\" * 20)\n", "\n", "# Test normal execution with debug info\n", "print(\"\\n1. Normal Execution with Debug Info:\")\n", "test_data = list(range(100))\n", "debug_result = debug_computation(test_data, debug_level=\"info\")\n", "\n", "print(f\" Success: {debug_result['success']}\")\n", "print(f\" Platform: {debug_result['platform'][:50]}...\")\n", "print(f\" Process ID: {debug_result['process_id']}\")\n", "print(f\" Items processed: {debug_result.get('items_processed', 'N/A')}\")\n", "if 'result' in debug_result:\n", " print(f\" Result: {debug_result['result']}\")\n", "\n", "# Test error handling\n", "print(\"\\n2. Error Handling Test:\")\n", "error_result = debug_computation(test_data, debug_level=\"test_error\")\n", "\n", "print(f\" Success: {error_result['success']}\")\n", "if not error_result['success']:\n", " print(f\" Error type: {error_result['error_type']}\")\n", " print(f\" Error message: {error_result['error_message']}\")\n", " print(f\" Traceback available: {'traceback' in error_result}\")\n", "\n", "# Test serialization debugging\n", "print(\"\\n3. Serialization Testing:\")\n", "\n", "# Test with simple object (should work)\n", "simple_obj = [1, 2, 3, 4, 5]\n", "simple_result = test_serialization_debug(simple_obj)\n", "print(f\" Simple object: {simple_result['success']}\")\n", "\n", "# Test with complex object (may have issues)\n", "class ComplexObject:\n", " def __init__(self):\n", " self.data = \"test\"\n", " \n", " def some_method(self):\n", " return f\"Method called on {self.data}\"\n", "\n", "complex_obj = ComplexObject()\n", "complex_result = test_serialization_debug(complex_obj)\n", "print(f\" Complex object: {complex_result['success']}\")\n", "if complex_result['success']:\n", " print(f\" Result: {complex_result['result']}\")\n", "else:\n", " print(f\" Error: {complex_result['error'][:50]}...\")\n", "\n", "# Show debugging best practices\n", "print(\"\\n4. Debugging Best Practices:\")\n", "best_practices = [\n", " \"Use debug_level parameters to control output verbosity\",\n", " \"Include execution environment information in results\",\n", " \"Test serialization with simple objects first\",\n", " \"Use try-catch blocks to capture and return error information\",\n", " \"Include timestamps for performance analysis\",\n", " \"Monitor resource usage during execution\",\n", " \"Test with small datasets before scaling up\"\n", "]\n", "\n", "for i, practice in enumerate(best_practices, 1):\n", " print(f\" {i}. {practice}\")" ], "id": "cell-32" }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Best Practices\n", "\n", "### 1. Performance Optimization\n", "\n", "Best practices for optimal performance:" ], "id": "cell-33" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def demonstrate_performance_best_practices():\n", " \"\"\"\n", " Demonstrate best practices for performance optimization.\n", " \"\"\"\n", " \n", " print(\"Performance Optimization Best Practices:\")\n", " print(\"=\" * 45)\n", " \n", " best_practices = {\n", " 'resource_allocation': {\n", " 'title': 'Resource Allocation',\n", " 'practices': [\n", " \"Profile your code locally before scaling to clusters\",\n", " \"Use appropriate core counts (typically 1-2x physical cores)\",\n", " \"Allocate memory with 20-30% buffer for overhead\",\n", " \"Set realistic time limits with buffer for completion\",\n", " \"Use parallel=True for CPU-bound loops\",\n", " \"Consider I/O vs CPU workload for executor selection\"\n", " ],\n", " 'example': '''\n", "# Good resource allocation\n", "@cluster(\n", " cores=8, # Based on profiling\n", " memory=\"32GB\", # 25% buffer included\n", " time=\"02:30:00\", # 30min buffer for 2hr job\n", " parallel=True # Enable for CPU-bound work\n", ")\n", "def optimized_computation(data):\n", " return process_data_efficiently(data)\n", " '''\n", " },\n", " 'data_management': {\n", " 'title': 'Data Management',\n", " 'practices': [\n", " \"Minimize data transfer between local and remote\",\n", " \"Use efficient data formats (NumPy arrays, not lists)\",\n", " \"Chunk large datasets for parallel processing\",\n", " \"Avoid loading unnecessary data into memory\",\n", " \"Use generators for large data streams\",\n", " \"Consider data locality for cluster placement\"\n", " ],\n", " 'example': '''\n", "# Efficient data handling\n", "@cluster(cores=8, parallel=True)\n", "def process_large_dataset(chunk_size=10000):\n", " \"\"\"Process data in chunks to optimize memory usage.\"\"\"\n", " import numpy as np\n", " \n", " results = []\n", " for chunk_id in range(100): # Parallelized\n", " # Generate chunk on remote (not transfer)\n", " chunk = np.random.random(chunk_size)\n", " result = np.mean(chunk ** 2) # Efficient NumPy\n", " results.append(result)\n", " \n", " return np.mean(results) # Return summary, not raw data\n", " '''\n", " },\n", " 'parallelization': {\n", " 'title': 'Parallelization Strategy',\n", " 'practices': [\n", " \"Identify embarrassingly parallel components\",\n", " \"Minimize shared state between parallel tasks\",\n", " \"Use appropriate chunk sizes for load balancing\",\n", " \"Avoid fine-grained parallelism with high overhead\",\n", " \"Consider communication costs in distributed algorithms\",\n", " \"Test parallel efficiency with different core counts\"\n", " ],\n", " 'example': '''\n", "# Good parallelization pattern\n", "@cluster(cores=16, parallel=True)\n", "def parallel_monte_carlo(n_samples=1000000):\n", " \"\"\"Monte Carlo with optimal chunk size.\"\"\"\n", " import numpy as np\n", " \n", " results = []\n", " chunk_size = n_samples // 100 # 100 chunks for load balancing\n", " \n", " for chunk in range(100): # Parallelized across cores\n", " # Independent computation per chunk\n", " x = np.random.random(chunk_size)\n", " y = np.random.random(chunk_size)\n", " inside = (x**2 + y**2) <= 1\n", " results.append(np.sum(inside))\n", " \n", " return 4 * sum(results) / n_samples\n", " '''\n", " },\n", " 'cluster_optimization': {\n", " 'title': 'Cluster-Specific Optimization',\n", " 'practices': [\n", " \"Choose appropriate partitions/queues for workload\",\n", " \"Use job arrays for parameter sweeps\",\n", " \"Leverage cluster-specific features (GPUs, fast storage)\",\n", " \"Monitor queue times and adjust submission strategy\",\n", " \"Use checkpointing for long-running jobs\",\n", " \"Clean up temporary files to avoid storage issues\"\n", " ],\n", " 'example': '''\n", "# Cluster-optimized job submission\n", "@cluster(\n", " cores=32,\n", " memory=\"128GB\",\n", " time=\"12:00:00\",\n", " partition=\"bigmem\", # Appropriate partition\n", " array=\"1-100\", # Parameter sweep\n", " gres=\"gpu:2\", # Request GPUs if needed\n", " cleanup_on_success=True # Clean temporary files\n", ")\n", "def cluster_optimized_job(params):\n", " return run_with_checkpointing(params)\n", " '''\n", " }\n", " }\n", " \n", " for category, info in best_practices.items():\n", " print(f\"\\n{info['title'].upper()}:\")\n", " for i, practice in enumerate(info['practices'], 1):\n", " print(f\" {i}. {practice}\")\n", " print(f\"\\nExample:{info['example']}\")\n", " \n", " return best_practices\n", "\n", "# Demonstrate performance best practices\n", "perf_practices = demonstrate_performance_best_practices()" ], "id": "cell-34" }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2. Security and Reliability\n", "\n", "Best practices for secure and reliable distributed computing:" ], "id": "cell-35" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def demonstrate_security_best_practices():\n", " \"\"\"\n", " Demonstrate security and reliability best practices.\n", " \"\"\"\n", " \n", " print(\"Security and Reliability Best Practices:\")\n", " print(\"=\" * 45)\n", " \n", " security_practices = {\n", " 'authentication': {\n", " 'title': 'Authentication and Access',\n", " 'practices': [\n", " \"Use SSH key authentication, never passwords\",\n", " \"Protect private keys with strong passphrases\",\n", " \"Use separate keys for different environments\",\n", " \"Regularly rotate SSH keys (6-12 months)\",\n", " \"Set proper file permissions (600 for private keys)\",\n", " \"Use SSH config for consistent settings\"\n", " ],\n", " 'example': '''\n", "# Secure SSH configuration\n", "configure(\n", " cluster_type=\"slurm\",\n", " cluster_host=\"secure-cluster.edu\",\n", " username=\"researcher\",\n", " key_file=\"~/.ssh/clustrix_production_key\", # Dedicated key\n", " port=2222, # Non-standard port\n", " # Never use password in production\n", ")\n", " '''\n", " },\n", " 'data_security': {\n", " 'title': 'Data Security',\n", " 'practices': [\n", " \"Never include secrets or credentials in code\",\n", " \"Use environment variables for sensitive data\",\n", " \"Encrypt sensitive data before transfer\",\n", " \"Clean up temporary files containing sensitive data\",\n", " \"Use secure remote directories with proper permissions\",\n", " \"Audit data access and transfers\"\n", " ],\n", " 'example': '''\n", "# Secure data handling\n", "@cluster(cores=4, cleanup_on_success=True)\n", "def secure_data_processing(encrypted_data):\n", " \"\"\"Process data securely with cleanup.\"\"\"\n", " import os\n", " import tempfile\n", " \n", " # Use environment variable for decryption key\n", " decryption_key = os.environ.get('DECRYPTION_KEY')\n", " if not decryption_key:\n", " raise ValueError(\"Decryption key not found\")\n", " \n", " # Process in temporary location\n", " with tempfile.TemporaryDirectory() as temp_dir:\n", " # Decrypt and process\n", " data = decrypt_data(encrypted_data, decryption_key)\n", " result = analyze_data(data)\n", " \n", " # Clear sensitive data\n", " del data, decryption_key\n", " \n", " return result # Only return non-sensitive results\n", " '''\n", " },\n", " 'reliability': {\n", " 'title': 'Reliability and Fault Tolerance',\n", " 'practices': [\n", " \"Implement retry logic for transient failures\",\n", " \"Use checkpointing for long-running computations\",\n", " \"Validate inputs before expensive computations\",\n", " \"Monitor resource usage to avoid exhaustion\",\n", " \"Set appropriate timeouts for all operations\",\n", " \"Log important events for debugging\"\n", " ],\n", " 'example': '''\n", "# Reliable computation with fault tolerance\n", "@cluster(cores=8, time=\"04:00:00\", backoff_limit=3)\n", "def reliable_computation(data, checkpoint_interval=1000):\n", " \"\"\"Computation with checkpointing and validation.\"\"\"\n", " import os\n", " import pickle\n", " import logging\n", " \n", " # Validate inputs\n", " if not data or len(data) == 0:\n", " raise ValueError(\"Input data is empty\")\n", " \n", " # Setup logging\n", " logging.basicConfig(level=logging.INFO)\n", " logger = logging.getLogger(__name__)\n", " \n", " # Check for existing checkpoint\n", " checkpoint_file = \"computation_checkpoint.pkl\"\n", " start_index = 0\n", " results = []\n", " \n", " if os.path.exists(checkpoint_file):\n", " with open(checkpoint_file, 'rb') as f:\n", " checkpoint = pickle.load(f)\n", " start_index = checkpoint['index']\n", " results = checkpoint['results']\n", " logger.info(f\"Resuming from checkpoint at index {start_index}\")\n", " \n", " # Process with checkpointing\n", " for i in range(start_index, len(data)):\n", " try:\n", " result = expensive_operation(data[i])\n", " results.append(result)\n", " \n", " # Save checkpoint periodically\n", " if (i + 1) % checkpoint_interval == 0:\n", " checkpoint = {'index': i + 1, 'results': results}\n", " with open(checkpoint_file, 'wb') as f:\n", " pickle.dump(checkpoint, f)\n", " logger.info(f\"Checkpoint saved at index {i + 1}\")\n", " \n", " except Exception as e:\n", " logger.error(f\"Error at index {i}: {e}\")\n", " # Continue with next item\n", " results.append(None)\n", " \n", " # Cleanup checkpoint file\n", " if os.path.exists(checkpoint_file):\n", " os.unlink(checkpoint_file)\n", " \n", " return {'results': results, 'success_rate': sum(1 for r in results if r is not None) / len(results)}\n", " '''\n", " },\n", " 'monitoring': {\n", " 'title': 'Monitoring and Maintenance',\n", " 'practices': [\n", " \"Monitor cluster resource usage regularly\",\n", " \"Set up alerts for job failures\",\n", " \"Track job completion times and success rates\",\n", " \"Monitor disk usage in work directories\",\n", " \"Keep logs of cluster operations\",\n", " \"Regularly update and patch cluster software\"\n", " ],\n", " 'example': '''\n", "# Computation with monitoring\n", "@cluster(cores=4, time=\"02:00:00\")\n", "def monitored_computation(data):\n", " \"\"\"Computation with built-in monitoring.\"\"\"\n", " import psutil\n", " import time\n", " import logging\n", " \n", " logger = logging.getLogger(__name__)\n", " start_time = time.time()\n", " \n", " # Log start\n", " logger.info(f\"Starting computation with {len(data)} items\")\n", " \n", " # Monitor resources\n", " initial_memory = psutil.virtual_memory().percent\n", " \n", " try:\n", " result = process_data(data)\n", " \n", " # Log success\n", " execution_time = time.time() - start_time\n", " final_memory = psutil.virtual_memory().percent\n", " \n", " logger.info(f\"Computation completed in {execution_time:.2f}s\")\n", " logger.info(f\"Memory usage: {initial_memory:.1f}% -> {final_memory:.1f}%\")\n", " \n", " return {\n", " 'result': result,\n", " 'execution_time': execution_time,\n", " 'memory_delta': final_memory - initial_memory\n", " }\n", " \n", " except Exception as e:\n", " logger.error(f\"Computation failed after {time.time() - start_time:.2f}s: {e}\")\n", " raise\n", " '''\n", " }\n", " }\n", " \n", " for category, info in security_practices.items():\n", " print(f\"\\n{info['title'].upper()}:\")\n", " for i, practice in enumerate(info['practices'], 1):\n", " print(f\" {i}. {practice}\")\n", " print(f\"\\nExample:{info['example']}\")\n", " \n", " return security_practices\n", "\n", "# Demonstrate security best practices\n", "security_practices = demonstrate_security_best_practices()" ], "id": "cell-36" }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Summary\n", "\n", "This notebook has demonstrated the complete Clustrix API including:\n", "\n", "### Core Functions:\n", "- `clustrix.configure()` - Configure cluster connections and defaults\n", "- `@cluster` decorator - Distributed function execution\n", "- `clustrix.get_config()` - Retrieve current configuration\n", "- `ClusterConfig.from_file()` - Load configuration from files\n", "\n", "### Advanced Features:\n", "- **Automatic Parallelization** - `parallel=True` for loop distribution\n", "- **Resource Specification** - cores, memory, time limits\n", "- **Environment Management** - conda, virtualenv, modules\n", "- **Error Handling** - robust error recovery and debugging\n", "- **Performance Monitoring** - resource usage tracking\n", "- **Custom Serialization** - handling complex objects\n", "\n", "### Cluster Types Supported:\n", "- **Local** - multiprocessing and threading\n", "- **SLURM** - HPC workload manager\n", "- **PBS/Torque** - batch systems\n", "- **SGE** - Sun Grid Engine\n", "- **Kubernetes** - containerized execution\n", "- **SSH** - direct remote execution\n", "\n", "### Best Practices Covered:\n", "- Performance optimization strategies\n", "- Security and authentication\n", "- Reliability and fault tolerance\n", "- Monitoring and debugging\n", "- Resource management\n", "\n", "For more information, see:\n", "- [Clustrix Documentation](https://clustrix.readthedocs.io)\n", "- [Cluster-specific tutorials](slurm_tutorial.ipynb)\n", "- [SSH Setup Guide](../ssh_setup.rst)\n", "- [API Reference](../api/decorator.rst)" ], "id": "cell-37" } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.0" } }, "nbformat": 4, "nbformat_minor": 4 }