File Packaging SystemΒΆ
File Packaging System for Clustrix
This module handles packaging of function dependencies into transferable archives for remote execution, including local modules, data files, and filesystem utilities.
- class clustrix.file_packaging.PackageInfo(package_id, package_path, function_name, dependencies, size_bytes, created_at, config_hash, metadata)[source]ΒΆ
Bases:
objectInformation about a created package.
- class clustrix.file_packaging.ExecutionContext(working_directory, python_version, environment_variables, cluster_config, function_args, function_kwargs)[source]ΒΆ
Bases:
objectContext information for function execution.
- class clustrix.file_packaging.FilePackager[source]ΒΆ
Bases:
objectPackages function dependencies for remote execution.
- package_function(func, context)[source]ΒΆ
Package a function with all its dependencies.
- Parameters:
func (
Callable) β The function to packagecontext (
ExecutionContext) β Execution context information
- Return type:
- Returns:
PackageInfo with details about the created package
- clustrix.file_packaging.create_execution_context(cluster_config, func_args=(), func_kwargs=None)[source]ΒΆ
Create an execution context for function packaging.
- Parameters:
cluster_config (
ClusterConfig) β The cluster configurationfunc_args (
tuple) β Arguments to pass to the functionfunc_kwargs (
Optional[dict]) β Keyword arguments to pass to the function
- Return type:
- Returns:
ExecutionContext for the function
- clustrix.file_packaging.package_function_for_execution(func, cluster_config, func_args=(), func_kwargs=None)[source]ΒΆ
Convenience function to package a function for remote execution.
- Parameters:
func (
Callable) β The function to packagecluster_config (
ClusterConfig) β Cluster configurationfunc_args (
tuple) β Arguments to pass to the functionfunc_kwargs (
Optional[dict]) β Keyword arguments to pass to the function
- Return type:
- Returns:
PackageInfo with details about the created package
OverviewΒΆ
The file packaging system enables seamless remote execution of locally-defined functions by automatically analyzing dependencies, packaging all required code and data files, and deploying them to remote clusters. This replaces the traditional pickle-based approach with a more robust and flexible solution.
Key FeaturesΒΆ
AST-Based Packaging: Analyzes function source code rather than relying on pickle serialization
Dependency Resolution: Automatically detects and includes local functions, imports, and data files
External Package Management: Automatically installs required external packages on remote systems
Filesystem Integration: Seamlessly integrates with cluster filesystem utilities
Cross-Platform Compatibility: Works across different Python versions and platforms
Cluster Detection: Automatically adapts to shared filesystem configurations
ArchitectureΒΆ
The packaging system consists of several components working together:
Dependency Analysis: Identifies all function dependencies using AST analysis
File Collection: Gathers required source files and data files
Package Creation: Creates a ZIP archive with all dependencies and metadata
Remote Deployment: Transfers and extracts packages on remote clusters
Execution Setup: Recreates the execution environment and runs the function
Core ComponentsΒΆ
File PackagerΒΆ
- class clustrix.file_packaging.FilePackager[source]ΒΆ
Bases:
objectPackages function dependencies for remote execution.
- package_function(func, context)[source]ΒΆ
Package a function with all its dependencies.
- Parameters:
func (
Callable) β The function to packagecontext (
ExecutionContext) β Execution context information
- Return type:
- Returns:
PackageInfo with details about the created package
Data StructuresΒΆ
- class clustrix.file_packaging.PackageInfo(package_id, package_path, function_name, dependencies, size_bytes, created_at, config_hash, metadata)[source]ΒΆ
Information about a created package.
Convenience FunctionsΒΆ
- clustrix.file_packaging.package_function_for_execution(func, cluster_config, func_args=(), func_kwargs=None)[source]ΒΆ
Convenience function to package a function for remote execution.
- Parameters:
func (
Callable) β The function to packagecluster_config (
ClusterConfig) β Cluster configurationfunc_args (
tuple) β Arguments to pass to the functionfunc_kwargs (
Optional[dict]) β Keyword arguments to pass to the function
- Return type:
- Returns:
PackageInfo with details about the created package
- clustrix.file_packaging.create_execution_context(cluster_config, func_args=(), func_kwargs=None)[source]ΒΆ
Create an execution context for function packaging.
- Parameters:
cluster_config (
ClusterConfig) β The cluster configurationfunc_args (
tuple) β Arguments to pass to the functionfunc_kwargs (
Optional[dict]) β Keyword arguments to pass to the function
- Return type:
- Returns:
ExecutionContext for the function
Usage ExamplesΒΆ
Basic Function PackagingΒΆ
from clustrix.file_packaging import package_function_for_execution
from clustrix.config import ClusterConfig
def simple_analysis():
"""A simple function to be executed remotely."""
import math
result = math.sqrt(42)
return result
# Configure target cluster
config = ClusterConfig(
cluster_type="slurm",
cluster_host="cluster.edu",
username="researcher",
remote_work_dir="/scratch/project"
)
# Package the function
package_info = package_function_for_execution(
func=simple_analysis,
cluster_config=config,
func_args=(),
func_kwargs={}
)
print(f"Package created: {package_info.package_path}")
print(f"Package ID: {package_info.package_id}")
print(f"Size: {package_info.size_bytes:,} bytes")
Function with Local DependenciesΒΆ
def helper_function(data):
"""Helper function for data processing."""
return [x * 2 for x in data if x > 0]
def process_data(filename):
"""Process a single data file."""
with open(filename, 'r') as f:
numbers = [int(line.strip()) for line in f]
return helper_function(numbers)
def main_analysis():
"""Main function that uses local dependencies."""
from clustrix import cluster_find
# Find all data files
data_files = cluster_find("*.txt", "input/")
results = []
for filename in data_files:
result = process_data(filename)
results.append(sum(result))
return results
# Add local functions to global scope
main_analysis.__globals__['helper_function'] = helper_function
main_analysis.__globals__['process_data'] = process_data
# Package with dependencies
package_info = package_function_for_execution(
func=main_analysis,
cluster_config=config,
func_args=(),
func_kwargs={}
)
# Check dependency detection
print(f"Has local dependencies: {package_info.metadata['has_dependencies']}")
Filesystem-Intensive FunctionΒΆ
def filesystem_analysis():
"""Function that uses cluster filesystem extensively."""
from clustrix import (
cluster_ls, cluster_find, cluster_stat,
cluster_exists, cluster_du, cluster_count_files
)
# Directory analysis
all_files = cluster_ls("data/")
total_files = cluster_count_files("data/", "*")
usage = cluster_du("data/")
# Find specific file types
csv_files = cluster_find("*.csv", "data/")
json_files = cluster_find("*.json", "data/")
# Analyze file sizes
large_files = []
for filename in all_files:
full_path = f"data/{filename}"
if cluster_exists(full_path):
file_info = cluster_stat(full_path)
if file_info.size > 1_000_000: # > 1MB
large_files.append({
'name': filename,
'size': file_info.size,
'size_mb': file_info.size / 1_000_000
})
return {
'total_files': total_files,
'total_size_gb': usage.total_gb,
'csv_files': len(csv_files),
'json_files': len(json_files),
'large_files': large_files
}
# Package filesystem-intensive function
package_info = package_function_for_execution(
func=filesystem_analysis,
cluster_config=config,
func_args=(),
func_kwargs={}
)
# Check filesystem integration
print(f"Requires cluster filesystem: {package_info.metadata['has_filesystem_ops']}")
Function with External PackagesΒΆ
def scientific_analysis():
"""Function that requires external packages."""
import numpy as np
import pandas as pd
import scipy.stats as stats
from clustrix import cluster_find
# Find CSV data files
data_files = cluster_find("*.csv", "experiments/")
results = []
for filename in data_files:
# Load data with pandas
df = pd.read_csv(filename)
# Perform statistical analysis
numerical_cols = df.select_dtypes(include=[np.number]).columns
for col in numerical_cols:
data = df[col].dropna()
if len(data) > 10:
# Statistical tests
normality_p = stats.shapiro(data)[1]
mean_val = np.mean(data)
std_val = np.std(data)
results.append({
'file': filename,
'column': col,
'mean': mean_val,
'std': std_val,
'normal': normality_p > 0.05
})
return results
# Package with external dependencies
package_info = package_function_for_execution(
func=scientific_analysis,
cluster_config=config,
func_args=(),
func_kwargs={}
)
# External packages will be automatically installed on the remote cluster
Advanced Packaging with Custom ContextΒΆ
from clustrix.file_packaging import FilePackager, create_execution_context
def custom_analysis(data_dir, output_format="json"):
"""Function with custom arguments."""
import json
from clustrix import cluster_find, cluster_stat
files = cluster_find("*.dat", data_dir)
results = {}
for filename in files:
file_info = cluster_stat(filename)
results[filename] = {
'size': file_info.size,
'modified': file_info.modified
}
if output_format == "json":
return json.dumps(results, indent=2)
else:
return results
# Create custom execution context
context = create_execution_context(
cluster_config=config,
func_args=("experiments/",),
func_kwargs={"output_format": "dict"}
)
# Use FilePackager directly for more control
packager = FilePackager()
package_info = packager.package_function(custom_analysis, context)
print(f"Custom package created: {package_info.package_path}")
Package InspectionΒΆ
import zipfile
import json
def inspect_package(package_path):
"""Inspect the contents of a package."""
with zipfile.ZipFile(package_path, 'r') as zf:
print("Package contents:")
for filename in zf.namelist():
info = zf.getinfo(filename)
print(f" {filename} ({info.file_size:,} bytes)")
# Read metadata
metadata_content = zf.read("metadata.json").decode()
metadata = json.loads(metadata_content)
print("\nFunction metadata:")
print(f" Name: {metadata['function_info']['name']}")
print(f" Has dependencies: {metadata['dependencies']['requires_dependencies']}")
print(f" Filesystem operations: {metadata['dependencies']['requires_cluster_filesystem']}")
# Show detected imports
if metadata['dependencies']['imports']:
print("\nDetected imports:")
for imp in metadata['dependencies']['imports']:
print(f" {imp['module']} ({'from' if imp['is_from_import'] else 'direct'})")
# Show filesystem calls
if metadata['dependencies']['filesystem_calls']:
print("\nFilesystem operations:")
for call in metadata['dependencies']['filesystem_calls']:
print(f" {call['function']}() on line {call['lineno']}")
# Inspect a created package
inspect_package(package_info.package_path)
Package Deployment and ExecutionΒΆ
from clustrix.executor import ClusterExecutor
def complete_workflow():
"""Complete workflow from packaging to execution."""
def analysis_function():
from clustrix import cluster_ls, cluster_count_files
files = cluster_ls(".")
file_count = cluster_count_files(".", "*.py")
return {
'total_files': len(files),
'python_files': file_count
}
# 1. Package the function
package_info = package_function_for_execution(
func=analysis_function,
cluster_config=config,
func_args=(),
func_kwargs={}
)
print(f"Function packaged: {package_info.package_id}")
# 2. Execute via ClusterExecutor (this would normally be done by @cluster decorator)
executor = ClusterExecutor(config)
# The executor automatically handles package deployment and execution
# This is normally done internally by the @cluster decorator
return package_info
Error HandlingΒΆ
def handle_packaging_errors():
"""Examples of error handling in packaging."""
# Built-in functions cannot be packaged
try:
package_info = package_function_for_execution(
func=len, # Built-in function
cluster_config=config,
func_args=(),
func_kwargs={}
)
except ValueError as e:
print(f"Cannot package built-in function: {e}")
# Functions with missing dependencies
def function_with_missing_deps():
from clustrix import cluster_ls
files = cluster_ls(".")
# This would fail at runtime, but packaging succeeds
return undefined_function(files)
# Packaging succeeds, but runtime execution would fail
package_info = package_function_for_execution(
func=function_with_missing_deps,
cluster_config=config,
func_args=(),
func_kwargs={}
)
print("Package created despite missing dependencies")
Configuration and OptionsΒΆ
Environment VariablesΒΆ
The packaging system respects several environment variables:
import os
# Configure package storage location
os.environ['CLUSTRIX_PACKAGE_DIR'] = '/tmp/clustrix_packages'
# Configure Python path for remote execution
os.environ['CLUSTRIX_REMOTE_PYTHON_PATH'] = '/usr/local/bin/python3'
# Enable debug mode for packaging
os.environ['CLUSTRIX_DEBUG_PACKAGING'] = '1'
Package CleanupΒΆ
import os
import glob
def cleanup_packages():
"""Clean up old packages."""
package_pattern = "/tmp/clustrix_package_*.zip"
old_packages = glob.glob(package_pattern)
for package_path in old_packages:
# Check if package is older than 1 hour
import time
if time.time() - os.path.getmtime(package_path) > 3600:
os.remove(package_path)
print(f"Removed old package: {package_path}")
Performance ConsiderationsΒΆ
Package Size: Large packages take longer to transfer to remote clusters
Dependency Analysis: Complex functions take longer to analyze
Caching: Identical functions produce identical package IDs for caching
Network Transfer: Consider package size for remote clusters with slow networks
Storage: Packages are stored locally and remotely; clean up periodically
Best PracticesΒΆ
Function Design: Keep functions focused and minimize external dependencies
Import Organization: Use standard import patterns for better detection
Local Functions: Ensure helper functions are in the main functionβs global scope
File Paths: Use relative paths for better portability
Error Handling: Design functions to handle missing files gracefully
Testing: Test functions locally before remote packaging
Documentation: Document function dependencies and requirements
Integration with @cluster DecoratorΒΆ
The packaging system is automatically used by the @cluster decorator:
from clustrix import cluster
@cluster(cores=8, cluster_host="cluster.edu")
def automated_packaging():
"""This function will be automatically packaged and executed remotely."""
from clustrix import cluster_find, cluster_stat
data_files = cluster_find("*.csv", "data/")
total_size = 0
for filename in data_files: # This loop gets parallelized automatically
file_info = cluster_stat(filename)
total_size += file_info.size
return total_size
# The decorator handles packaging, deployment, and execution automatically
result = automated_packaging()
print(f"Total data size: {result:,} bytes")
TroubleshootingΒΆ
Common IssuesΒΆ
Import Errors: Ensure all required packages are available on the remote cluster
Path Issues: Use relative paths or cluster filesystem functions
Missing Dependencies: Add local functions to the main functionβs global scope
Large Packages: Consider breaking large functions into smaller components
Permission Issues: Ensure write access to package storage directories
Debug ModeΒΆ
import os
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
os.environ['CLUSTRIX_DEBUG_PACKAGING'] = '1'
# Package function with detailed logging
package_info = package_function_for_execution(
func=your_function,
cluster_config=config,
func_args=(),
func_kwargs={}
)
See AlsoΒΆ
Dependency Analysis - Dependency detection and analysis
Filesystem Utilities - Cluster filesystem utilities used by packaged functions
Decorator API - The @cluster decorator that uses the packaging system
Configuration API - Configuration management for clusters
Filesystem Utilities Tutorial - Complete examples using the packaging system