File Packaging SystemΒΆ

File Packaging System for Clustrix

This module handles packaging of function dependencies into transferable archives for remote execution, including local modules, data files, and filesystem utilities.

class clustrix.file_packaging.PackageInfo(package_id, package_path, function_name, dependencies, size_bytes, created_at, config_hash, metadata)[source]ΒΆ

Bases: object

Information about a created package.

__init__(package_id, package_path, function_name, dependencies, size_bytes, created_at, config_hash, metadata)[source]ΒΆ
class clustrix.file_packaging.ExecutionContext(working_directory, python_version, environment_variables, cluster_config, function_args, function_kwargs)[source]ΒΆ

Bases: object

Context information for function execution.

__init__(working_directory, python_version, environment_variables, cluster_config, function_args, function_kwargs)[source]ΒΆ
class clustrix.file_packaging.FilePackager[source]ΒΆ

Bases: object

Packages function dependencies for remote execution.

__init__()[source]ΒΆ
__del__()[source]ΒΆ

Clean up temporary directory.

package_function(func, context)[source]ΒΆ

Package a function with all its dependencies.

Parameters:
Return type:

PackageInfo

Returns:

PackageInfo with details about the created package

clustrix.file_packaging.create_execution_context(cluster_config, func_args=(), func_kwargs=None)[source]ΒΆ

Create an execution context for function packaging.

Parameters:
  • cluster_config (ClusterConfig) – The cluster configuration

  • func_args (tuple) – Arguments to pass to the function

  • func_kwargs (Optional[dict]) – Keyword arguments to pass to the function

Return type:

ExecutionContext

Returns:

ExecutionContext for the function

clustrix.file_packaging.package_function_for_execution(func, cluster_config, func_args=(), func_kwargs=None)[source]ΒΆ

Convenience function to package a function for remote execution.

Parameters:
  • func (Callable) – The function to package

  • cluster_config (ClusterConfig) – Cluster configuration

  • func_args (tuple) – Arguments to pass to the function

  • func_kwargs (Optional[dict]) – Keyword arguments to pass to the function

Return type:

PackageInfo

Returns:

PackageInfo with details about the created package

OverviewΒΆ

The file packaging system enables seamless remote execution of locally-defined functions by automatically analyzing dependencies, packaging all required code and data files, and deploying them to remote clusters. This replaces the traditional pickle-based approach with a more robust and flexible solution.

Key FeaturesΒΆ

  • AST-Based Packaging: Analyzes function source code rather than relying on pickle serialization

  • Dependency Resolution: Automatically detects and includes local functions, imports, and data files

  • External Package Management: Automatically installs required external packages on remote systems

  • Filesystem Integration: Seamlessly integrates with cluster filesystem utilities

  • Cross-Platform Compatibility: Works across different Python versions and platforms

  • Cluster Detection: Automatically adapts to shared filesystem configurations

ArchitectureΒΆ

The packaging system consists of several components working together:

  1. Dependency Analysis: Identifies all function dependencies using AST analysis

  2. File Collection: Gathers required source files and data files

  3. Package Creation: Creates a ZIP archive with all dependencies and metadata

  4. Remote Deployment: Transfers and extracts packages on remote clusters

  5. Execution Setup: Recreates the execution environment and runs the function

Core ComponentsΒΆ

File PackagerΒΆ

class clustrix.file_packaging.FilePackager[source]ΒΆ

Bases: object

Packages function dependencies for remote execution.

__init__()[source]ΒΆ
__del__()[source]ΒΆ

Clean up temporary directory.

package_function(func, context)[source]ΒΆ

Package a function with all its dependencies.

Parameters:
Return type:

PackageInfo

Returns:

PackageInfo with details about the created package

Data StructuresΒΆ

class clustrix.file_packaging.PackageInfo(package_id, package_path, function_name, dependencies, size_bytes, created_at, config_hash, metadata)[source]ΒΆ

Information about a created package.

__init__(package_id, package_path, function_name, dependencies, size_bytes, created_at, config_hash, metadata)[source]ΒΆ
class clustrix.file_packaging.ExecutionContext(working_directory, python_version, environment_variables, cluster_config, function_args, function_kwargs)[source]ΒΆ

Context information for function execution.

__init__(working_directory, python_version, environment_variables, cluster_config, function_args, function_kwargs)[source]ΒΆ

Convenience FunctionsΒΆ

clustrix.file_packaging.package_function_for_execution(func, cluster_config, func_args=(), func_kwargs=None)[source]ΒΆ

Convenience function to package a function for remote execution.

Parameters:
  • func (Callable) – The function to package

  • cluster_config (ClusterConfig) – Cluster configuration

  • func_args (tuple) – Arguments to pass to the function

  • func_kwargs (Optional[dict]) – Keyword arguments to pass to the function

Return type:

PackageInfo

Returns:

PackageInfo with details about the created package

clustrix.file_packaging.create_execution_context(cluster_config, func_args=(), func_kwargs=None)[source]ΒΆ

Create an execution context for function packaging.

Parameters:
  • cluster_config (ClusterConfig) – The cluster configuration

  • func_args (tuple) – Arguments to pass to the function

  • func_kwargs (Optional[dict]) – Keyword arguments to pass to the function

Return type:

ExecutionContext

Returns:

ExecutionContext for the function

Usage ExamplesΒΆ

Basic Function PackagingΒΆ

from clustrix.file_packaging import package_function_for_execution
from clustrix.config import ClusterConfig

def simple_analysis():
    """A simple function to be executed remotely."""
    import math

    result = math.sqrt(42)
    return result

# Configure target cluster
config = ClusterConfig(
    cluster_type="slurm",
    cluster_host="cluster.edu",
    username="researcher",
    remote_work_dir="/scratch/project"
)

# Package the function
package_info = package_function_for_execution(
    func=simple_analysis,
    cluster_config=config,
    func_args=(),
    func_kwargs={}
)

print(f"Package created: {package_info.package_path}")
print(f"Package ID: {package_info.package_id}")
print(f"Size: {package_info.size_bytes:,} bytes")

Function with Local DependenciesΒΆ

def helper_function(data):
    """Helper function for data processing."""
    return [x * 2 for x in data if x > 0]

def process_data(filename):
    """Process a single data file."""
    with open(filename, 'r') as f:
        numbers = [int(line.strip()) for line in f]
    return helper_function(numbers)

def main_analysis():
    """Main function that uses local dependencies."""
    from clustrix import cluster_find

    # Find all data files
    data_files = cluster_find("*.txt", "input/")

    results = []
    for filename in data_files:
        result = process_data(filename)
        results.append(sum(result))

    return results

# Add local functions to global scope
main_analysis.__globals__['helper_function'] = helper_function
main_analysis.__globals__['process_data'] = process_data

# Package with dependencies
package_info = package_function_for_execution(
    func=main_analysis,
    cluster_config=config,
    func_args=(),
    func_kwargs={}
)

# Check dependency detection
print(f"Has local dependencies: {package_info.metadata['has_dependencies']}")

Filesystem-Intensive FunctionΒΆ

def filesystem_analysis():
    """Function that uses cluster filesystem extensively."""
    from clustrix import (
        cluster_ls, cluster_find, cluster_stat,
        cluster_exists, cluster_du, cluster_count_files
    )

    # Directory analysis
    all_files = cluster_ls("data/")
    total_files = cluster_count_files("data/", "*")
    usage = cluster_du("data/")

    # Find specific file types
    csv_files = cluster_find("*.csv", "data/")
    json_files = cluster_find("*.json", "data/")

    # Analyze file sizes
    large_files = []
    for filename in all_files:
        full_path = f"data/{filename}"
        if cluster_exists(full_path):
            file_info = cluster_stat(full_path)
            if file_info.size > 1_000_000:  # > 1MB
                large_files.append({
                    'name': filename,
                    'size': file_info.size,
                    'size_mb': file_info.size / 1_000_000
                })

    return {
        'total_files': total_files,
        'total_size_gb': usage.total_gb,
        'csv_files': len(csv_files),
        'json_files': len(json_files),
        'large_files': large_files
    }

# Package filesystem-intensive function
package_info = package_function_for_execution(
    func=filesystem_analysis,
    cluster_config=config,
    func_args=(),
    func_kwargs={}
)

# Check filesystem integration
print(f"Requires cluster filesystem: {package_info.metadata['has_filesystem_ops']}")

Function with External PackagesΒΆ

def scientific_analysis():
    """Function that requires external packages."""
    import numpy as np
    import pandas as pd
    import scipy.stats as stats
    from clustrix import cluster_find

    # Find CSV data files
    data_files = cluster_find("*.csv", "experiments/")

    results = []
    for filename in data_files:
        # Load data with pandas
        df = pd.read_csv(filename)

        # Perform statistical analysis
        numerical_cols = df.select_dtypes(include=[np.number]).columns

        for col in numerical_cols:
            data = df[col].dropna()
            if len(data) > 10:
                # Statistical tests
                normality_p = stats.shapiro(data)[1]
                mean_val = np.mean(data)
                std_val = np.std(data)

                results.append({
                    'file': filename,
                    'column': col,
                    'mean': mean_val,
                    'std': std_val,
                    'normal': normality_p > 0.05
                })

    return results

# Package with external dependencies
package_info = package_function_for_execution(
    func=scientific_analysis,
    cluster_config=config,
    func_args=(),
    func_kwargs={}
)

# External packages will be automatically installed on the remote cluster

Advanced Packaging with Custom ContextΒΆ

from clustrix.file_packaging import FilePackager, create_execution_context

def custom_analysis(data_dir, output_format="json"):
    """Function with custom arguments."""
    import json
    from clustrix import cluster_find, cluster_stat

    files = cluster_find("*.dat", data_dir)
    results = {}

    for filename in files:
        file_info = cluster_stat(filename)
        results[filename] = {
            'size': file_info.size,
            'modified': file_info.modified
        }

    if output_format == "json":
        return json.dumps(results, indent=2)
    else:
        return results

# Create custom execution context
context = create_execution_context(
    cluster_config=config,
    func_args=("experiments/",),
    func_kwargs={"output_format": "dict"}
)

# Use FilePackager directly for more control
packager = FilePackager()
package_info = packager.package_function(custom_analysis, context)

print(f"Custom package created: {package_info.package_path}")

Package InspectionΒΆ

import zipfile
import json

def inspect_package(package_path):
    """Inspect the contents of a package."""
    with zipfile.ZipFile(package_path, 'r') as zf:
        print("Package contents:")
        for filename in zf.namelist():
            info = zf.getinfo(filename)
            print(f"  {filename} ({info.file_size:,} bytes)")

        # Read metadata
        metadata_content = zf.read("metadata.json").decode()
        metadata = json.loads(metadata_content)

        print("\nFunction metadata:")
        print(f"  Name: {metadata['function_info']['name']}")
        print(f"  Has dependencies: {metadata['dependencies']['requires_dependencies']}")
        print(f"  Filesystem operations: {metadata['dependencies']['requires_cluster_filesystem']}")

        # Show detected imports
        if metadata['dependencies']['imports']:
            print("\nDetected imports:")
            for imp in metadata['dependencies']['imports']:
                print(f"  {imp['module']} ({'from' if imp['is_from_import'] else 'direct'})")

        # Show filesystem calls
        if metadata['dependencies']['filesystem_calls']:
            print("\nFilesystem operations:")
            for call in metadata['dependencies']['filesystem_calls']:
                print(f"  {call['function']}() on line {call['lineno']}")

# Inspect a created package
inspect_package(package_info.package_path)

Package Deployment and ExecutionΒΆ

from clustrix.executor import ClusterExecutor

def complete_workflow():
    """Complete workflow from packaging to execution."""

    def analysis_function():
        from clustrix import cluster_ls, cluster_count_files

        files = cluster_ls(".")
        file_count = cluster_count_files(".", "*.py")

        return {
            'total_files': len(files),
            'python_files': file_count
        }

    # 1. Package the function
    package_info = package_function_for_execution(
        func=analysis_function,
        cluster_config=config,
        func_args=(),
        func_kwargs={}
    )
    print(f"Function packaged: {package_info.package_id}")

    # 2. Execute via ClusterExecutor (this would normally be done by @cluster decorator)
    executor = ClusterExecutor(config)

    # The executor automatically handles package deployment and execution
    # This is normally done internally by the @cluster decorator

    return package_info

Error HandlingΒΆ

def handle_packaging_errors():
    """Examples of error handling in packaging."""

    # Built-in functions cannot be packaged
    try:
        package_info = package_function_for_execution(
            func=len,  # Built-in function
            cluster_config=config,
            func_args=(),
            func_kwargs={}
        )
    except ValueError as e:
        print(f"Cannot package built-in function: {e}")

    # Functions with missing dependencies
    def function_with_missing_deps():
        from clustrix import cluster_ls

        files = cluster_ls(".")
        # This would fail at runtime, but packaging succeeds
        return undefined_function(files)

    # Packaging succeeds, but runtime execution would fail
    package_info = package_function_for_execution(
        func=function_with_missing_deps,
        cluster_config=config,
        func_args=(),
        func_kwargs={}
    )
    print("Package created despite missing dependencies")

Configuration and OptionsΒΆ

Environment VariablesΒΆ

The packaging system respects several environment variables:

import os

# Configure package storage location
os.environ['CLUSTRIX_PACKAGE_DIR'] = '/tmp/clustrix_packages'

# Configure Python path for remote execution
os.environ['CLUSTRIX_REMOTE_PYTHON_PATH'] = '/usr/local/bin/python3'

# Enable debug mode for packaging
os.environ['CLUSTRIX_DEBUG_PACKAGING'] = '1'

Package CleanupΒΆ

import os
import glob

def cleanup_packages():
    """Clean up old packages."""
    package_pattern = "/tmp/clustrix_package_*.zip"
    old_packages = glob.glob(package_pattern)

    for package_path in old_packages:
        # Check if package is older than 1 hour
        import time
        if time.time() - os.path.getmtime(package_path) > 3600:
            os.remove(package_path)
            print(f"Removed old package: {package_path}")

Performance ConsiderationsΒΆ

  1. Package Size: Large packages take longer to transfer to remote clusters

  2. Dependency Analysis: Complex functions take longer to analyze

  3. Caching: Identical functions produce identical package IDs for caching

  4. Network Transfer: Consider package size for remote clusters with slow networks

  5. Storage: Packages are stored locally and remotely; clean up periodically

Best PracticesΒΆ

  1. Function Design: Keep functions focused and minimize external dependencies

  2. Import Organization: Use standard import patterns for better detection

  3. Local Functions: Ensure helper functions are in the main function’s global scope

  4. File Paths: Use relative paths for better portability

  5. Error Handling: Design functions to handle missing files gracefully

  6. Testing: Test functions locally before remote packaging

  7. Documentation: Document function dependencies and requirements

Integration with @cluster DecoratorΒΆ

The packaging system is automatically used by the @cluster decorator:

from clustrix import cluster

@cluster(cores=8, cluster_host="cluster.edu")
def automated_packaging():
    """This function will be automatically packaged and executed remotely."""
    from clustrix import cluster_find, cluster_stat

    data_files = cluster_find("*.csv", "data/")

    total_size = 0
    for filename in data_files:  # This loop gets parallelized automatically
        file_info = cluster_stat(filename)
        total_size += file_info.size

    return total_size

# The decorator handles packaging, deployment, and execution automatically
result = automated_packaging()
print(f"Total data size: {result:,} bytes")

TroubleshootingΒΆ

Common IssuesΒΆ

  1. Import Errors: Ensure all required packages are available on the remote cluster

  2. Path Issues: Use relative paths or cluster filesystem functions

  3. Missing Dependencies: Add local functions to the main function’s global scope

  4. Large Packages: Consider breaking large functions into smaller components

  5. Permission Issues: Ensure write access to package storage directories

Debug ModeΒΆ

import os
import logging

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
os.environ['CLUSTRIX_DEBUG_PACKAGING'] = '1'

# Package function with detailed logging
package_info = package_function_for_execution(
    func=your_function,
    cluster_config=config,
    func_args=(),
    func_kwargs={}
)

See AlsoΒΆ