"""
File Packaging System for Clustrix
This module handles packaging of function dependencies into transferable archives
for remote execution, including local modules, data files, and filesystem utilities.
"""
import os
import sys
import json
import hashlib
import zipfile
import tempfile
import inspect
import shutil
from datetime import datetime
from pathlib import Path
from typing import Dict, Set, Optional, Any, Callable, List
from .dependency_analysis import DependencyGraph, analyze_function_dependencies
from .config import ClusterConfig
[docs]
class PackageInfo:
"""Information about a created package."""
[docs]
def __init__(
self,
package_id: str,
package_path: str,
function_name: str,
dependencies: DependencyGraph,
size_bytes: int,
created_at: datetime,
config_hash: str,
metadata: Dict[str, Any],
):
self.package_id = package_id
self.package_path = package_path
self.function_name = function_name
self.dependencies = dependencies
self.size_bytes = size_bytes
self.created_at = created_at
self.config_hash = config_hash
self.metadata = metadata
def __repr__(self):
return f"PackageInfo(package_id='{self.package_id}', function_name='{self.function_name}')"
[docs]
class ExecutionContext:
"""Context information for function execution."""
[docs]
def __init__(
self,
working_directory: str,
python_version: str,
environment_variables: Dict[str, str],
cluster_config: ClusterConfig,
function_args: tuple,
function_kwargs: dict,
):
self.working_directory = working_directory
self.python_version = python_version
self.environment_variables = environment_variables
self.cluster_config = cluster_config
self.function_args = function_args
self.function_kwargs = function_kwargs
def __repr__(self):
return f"ExecutionContext(working_directory='{self.working_directory}')"
[docs]
class FilePackager:
"""Packages function dependencies for remote execution."""
[docs]
def __init__(self):
self.temp_dir = tempfile.mkdtemp(prefix="clustrix_packaging_")
[docs]
def __del__(self):
"""Clean up temporary directory."""
if hasattr(self, "temp_dir") and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir, ignore_errors=True)
[docs]
def package_function(
self, func: Callable, context: ExecutionContext
) -> PackageInfo:
"""
Package a function with all its dependencies.
Args:
func: The function to package
context: Execution context information
Returns:
PackageInfo with details about the created package
"""
# Analyze function dependencies
dependencies = analyze_function_dependencies(func)
# Detect external dependencies
external_deps = self._detect_external_dependencies(dependencies)
# Generate unique package ID
package_id = self._generate_package_id(func, dependencies, context)
# Create package archive
package_path = self._create_package_archive(
func, dependencies, context, package_id, external_deps
)
# Get package size
size_bytes = os.path.getsize(package_path)
# Create package info
package_info = PackageInfo(
package_id=package_id,
package_path=package_path,
function_name=func.__name__,
dependencies=dependencies,
size_bytes=size_bytes,
created_at=datetime.now(),
config_hash=self._hash_config(context.cluster_config),
metadata={
"python_version": context.python_version,
"working_directory": context.working_directory,
"has_filesystem_ops": dependencies.requires_cluster_filesystem,
"import_count": len(dependencies.imports),
"local_function_count": len(dependencies.local_function_calls),
"file_reference_count": len(dependencies.file_references),
"external_dependencies": external_deps,
},
)
return package_info
def _generate_package_id(
self, func: Callable, dependencies: DependencyGraph, context: ExecutionContext
) -> str:
"""Generate a unique package identifier."""
# Create hash based on function source, dependencies, and context
hash_input = [
func.__name__,
dependencies.source_code,
str(sorted(dependencies.source_files)),
str(sorted(dependencies.local_modules)),
str(sorted(dependencies.data_files)),
context.python_version,
self._hash_config(context.cluster_config),
]
combined = "".join(hash_input)
hash_obj = hashlib.sha256(combined.encode("utf-8"))
return hash_obj.hexdigest()[:16] # Use first 16 characters
def _hash_config(self, config: ClusterConfig) -> str:
"""Create a hash of the cluster configuration."""
config_dict = {
"cluster_type": config.cluster_type,
"cluster_host": getattr(config, "cluster_host", None),
"remote_work_dir": getattr(config, "remote_work_dir", None),
"local_work_dir": getattr(config, "local_work_dir", None),
}
config_str = json.dumps(config_dict, sort_keys=True)
return hashlib.md5(config_str.encode()).hexdigest()[:8]
def _create_package_archive(
self,
func: Callable,
dependencies: DependencyGraph,
context: ExecutionContext,
package_id: str,
external_deps: List[str],
) -> str:
"""Create the package archive with all dependencies."""
package_path = os.path.join(self.temp_dir, f"clustrix_package_{package_id}.zip")
with zipfile.ZipFile(package_path, "w", zipfile.ZIP_DEFLATED) as zf:
# Add function source and metadata
self._add_function_metadata(
zf, func, dependencies, context, package_id, external_deps
)
# Add cluster configuration
self._add_cluster_config(zf, context.cluster_config)
# Add source files
self._add_source_files(zf, dependencies.source_files)
# Add local modules
self._add_local_modules(zf, dependencies.local_modules)
# Add data files
self._add_data_files(zf, dependencies.data_files, context.working_directory)
# Add filesystem utilities if needed
if dependencies.requires_cluster_filesystem:
self._add_filesystem_utilities(zf)
# Add execution script
self._add_execution_script(zf, dependencies, context)
# Add environment information
self._add_environment_info(zf, context)
return package_path
def _add_function_metadata(
self,
zf: zipfile.ZipFile,
func: Callable,
dependencies: DependencyGraph,
context: ExecutionContext,
package_id: str,
external_deps: List[str],
):
"""Add function metadata to the package."""
metadata = {
"package_id": package_id,
"function_info": {
"name": func.__name__,
"source": dependencies.source_code,
"module": getattr(func, "__module__", None),
"qualname": getattr(func, "__qualname__", None),
},
"dependencies": {
"imports": [
{
"module": imp.module,
"names": imp.names,
"alias": imp.alias,
"is_from_import": imp.is_from_import,
"lineno": imp.lineno,
}
for imp in dependencies.imports
],
"local_functions": [
{
"function_name": call.function_name,
"lineno": call.lineno,
"defined_in_scope": call.defined_in_scope,
"source_file": call.source_file,
}
for call in dependencies.local_function_calls
],
"file_references": [
{
"path": ref.path,
"operation": ref.operation,
"lineno": ref.lineno,
"is_relative": ref.is_relative,
}
for ref in dependencies.file_references
],
"filesystem_calls": [
{
"function": call.function,
"args": call.args,
"lineno": call.lineno,
"context": call.context,
}
for call in dependencies.filesystem_calls
],
"requires_cluster_filesystem": dependencies.requires_cluster_filesystem,
"external_dependencies": external_deps,
},
"execution_info": {
"args": context.function_args,
"kwargs": context.function_kwargs,
"working_directory": context.working_directory,
"python_version": context.python_version,
},
"created_at": datetime.now().isoformat(),
}
metadata_json = json.dumps(metadata, indent=2, default=str)
zf.writestr("metadata.json", metadata_json)
def _add_cluster_config(self, zf: zipfile.ZipFile, config: ClusterConfig):
"""Add cluster configuration to the package."""
# Convert config to serializable dict
config_dict = {
"cluster_type": config.cluster_type,
"cluster_host": getattr(config, "cluster_host", None),
"username": getattr(config, "username", None),
"remote_work_dir": getattr(config, "remote_work_dir", None),
"local_work_dir": getattr(config, "local_work_dir", None),
"module_loads": getattr(config, "module_loads", []),
"environment_variables": getattr(config, "environment_variables", {}),
}
config_json = json.dumps(config_dict, indent=2)
zf.writestr("cluster_config.json", config_json)
def _add_source_files(self, zf: zipfile.ZipFile, source_files: Set[str]):
"""Add source files to the package."""
for source_file in source_files:
if os.path.exists(source_file):
# Use relative path within the package
arcname = f"sources/{os.path.basename(source_file)}"
zf.write(source_file, arcname)
def _add_local_modules(self, zf: zipfile.ZipFile, local_modules: Set[str]):
"""Add local modules to the package."""
for module_file in local_modules:
if os.path.exists(module_file):
# Preserve directory structure for modules
module_path = Path(module_file)
# Try to find the package root
package_root = self._find_package_root(module_path)
if package_root:
rel_path = module_path.relative_to(package_root)
arcname = f"modules/{rel_path}"
else:
arcname = f"modules/{module_path.name}"
zf.write(module_file, arcname)
def _add_data_files(
self, zf: zipfile.ZipFile, data_files: Set[str], working_dir: str
):
"""Add data files to the package."""
for data_file in data_files:
if data_file == "<unknown>":
continue
# Handle relative paths
if not os.path.isabs(data_file):
full_path = os.path.join(working_dir, data_file)
else:
full_path = data_file
if os.path.exists(full_path):
# Preserve relative path structure
if os.path.isabs(data_file):
# For absolute paths, just use the filename
arcname = f"data/{os.path.basename(data_file)}"
else:
# For relative paths, preserve the structure
arcname = f"data/{data_file}"
zf.write(full_path, arcname)
def _add_filesystem_utilities(self, zf: zipfile.ZipFile):
"""Add cluster filesystem utilities to the package."""
try:
# Import the filesystem module to get its source
from . import filesystem
# Get the filesystem module source and fix relative imports
fs_source = inspect.getsource(filesystem)
# Replace relative imports with inline definitions to make it standalone
fs_source_fixed = fs_source.replace(
"from .config import ClusterConfig",
'''# ClusterConfig class definition (inline for standalone packaging)
class ClusterConfig:
"""Minimal ClusterConfig for packaged execution."""
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
@property
def cluster_type(self):
return getattr(self, '_cluster_type', 'local')
@cluster_type.setter
def cluster_type(self, value):
self._cluster_type = value''',
)
zf.writestr("clustrix_filesystem.py", fs_source_fixed)
# Create a clustrix module with the filesystem functions
clustrix_module = '''
"""
Clustrix module for remote execution with filesystem utilities.
"""
from clustrix_filesystem import ClusterFilesystem
# Global config will be set during execution
_global_config = None
def _get_global_config():
"""Get the global config, or raise an error if not set."""
if _global_config is None:
raise RuntimeError("Cluster config not initialized. This should be set automatically during package execution.")
return _global_config
def _set_global_config(config):
"""Set the global config for filesystem operations."""
global _global_config
_global_config = config
# Filesystem convenience functions that use the global config
def cluster_ls(path=".", config=None):
"""List directory contents locally or remotely based on config."""
config = config or _get_global_config()
fs = ClusterFilesystem(config)
return fs.ls(path)
def cluster_find(pattern, path=".", config=None):
"""Find files matching pattern locally or remotely based on config."""
config = config or _get_global_config()
fs = ClusterFilesystem(config)
return fs.find(pattern, path)
def cluster_stat(path, config=None):
"""Get file information locally or remotely based on config."""
config = config or _get_global_config()
fs = ClusterFilesystem(config)
return fs.stat(path)
def cluster_exists(path, config=None):
"""Check if file/directory exists locally or remotely based on config."""
config = config or _get_global_config()
fs = ClusterFilesystem(config)
return fs.exists(path)
def cluster_isdir(path, config=None):
"""Check if path is directory locally or remotely based on config."""
config = config or _get_global_config()
fs = ClusterFilesystem(config)
return fs.isdir(path)
def cluster_isfile(path, config=None):
"""Check if path is file locally or remotely based on config."""
config = config or _get_global_config()
fs = ClusterFilesystem(config)
return fs.isfile(path)
def cluster_glob(pattern, path=".", config=None):
"""Pattern matching for files locally or remotely based on config."""
config = config or _get_global_config()
fs = ClusterFilesystem(config)
return fs.glob(pattern, path)
def cluster_du(path, config=None):
"""Get directory usage locally or remotely based on config."""
config = config or _get_global_config()
fs = ClusterFilesystem(config)
return fs.du(path)
def cluster_count_files(path, pattern="*", config=None):
"""Count files matching pattern locally or remotely based on config."""
config = config or _get_global_config()
fs = ClusterFilesystem(config)
return fs.count_files(path, pattern)
'''
zf.writestr("clustrix.py", clustrix_module)
# Also add the convenience functions as a separate module
convenience_functions = '''
"""
Cluster filesystem convenience functions for remote execution.
"""
from clustrix_filesystem import ClusterFilesystem
def setup_filesystem_functions(cluster_config):
"""Set up filesystem functions with the given config."""
fs = ClusterFilesystem(cluster_config)
# Create convenience functions bound to the config
functions = {
'cluster_ls': lambda path=".": fs.ls(path),
'cluster_find': lambda pattern, path=".": fs.find(pattern, path),
'cluster_stat': lambda path: fs.stat(path),
'cluster_exists': lambda path: fs.exists(path),
'cluster_isdir': lambda path: fs.isdir(path),
'cluster_isfile': lambda path: fs.isfile(path),
'cluster_glob': lambda pattern, path=".": fs.glob(pattern, path),
'cluster_du': lambda path: fs.du(path),
'cluster_count_files': lambda path, pattern="*": fs.count_files(path, pattern)
}
return functions
'''
zf.writestr("filesystem_utils.py", convenience_functions)
except Exception as e:
# If we can't get the filesystem source, create a minimal version
print(f"Warning: Could not package filesystem utilities: {e}")
def _add_execution_script(
self,
zf: zipfile.ZipFile,
dependencies: DependencyGraph,
context: ExecutionContext,
):
"""Add the execution script that will run on the remote cluster."""
script_content = self._generate_execution_script(dependencies, context)
zf.writestr("execute.py", script_content)
def _add_environment_info(self, zf: zipfile.ZipFile, context: ExecutionContext):
"""Add environment information to the package."""
env_info = {
"python_version": context.python_version,
"platform": sys.platform,
"environment_variables": context.environment_variables,
"python_path": sys.path,
"installed_packages": self._get_installed_packages(),
}
env_json = json.dumps(env_info, indent=2)
zf.writestr("environment.json", env_json)
def _get_installed_packages(self) -> Dict[str, str]:
"""Get information about installed packages."""
try:
import pkg_resources # type: ignore[import-untyped]
installed = {}
for dist in pkg_resources.working_set:
installed[dist.project_name] = dist.version
return installed
except ImportError:
return {}
def _detect_external_dependencies(self, dependencies: DependencyGraph) -> List[str]:
"""Detect external packages that need to be installed in remote environment."""
external_deps = []
stdlib_modules = self._get_stdlib_modules()
for import_info in dependencies.imports:
module_name = import_info.module
# Skip standard library modules
if module_name in stdlib_modules:
continue
# Skip local modules (already detected)
if module_name in [
os.path.basename(mod).replace(".py", "")
for mod in dependencies.local_modules
]:
continue
# Try to determine if this is an external package
try:
# Try to import to see if it's available
__import__(module_name)
# Check if it's in site-packages (external dependency)
module = sys.modules.get(module_name)
if module and hasattr(module, "__file__") and module.__file__:
module_path = Path(module.__file__)
# If it contains 'site-packages' in the path, it's external
if "site-packages" in str(module_path):
# Map known module names to package names
package_name = self._map_module_to_package(module_name)
if package_name and package_name not in external_deps:
external_deps.append(package_name)
except ImportError:
# Module not available, but might be external - add it anyway
package_name = self._map_module_to_package(module_name)
if package_name and package_name not in external_deps:
external_deps.append(package_name)
# Always add paramiko if filesystem operations are needed
if dependencies.requires_cluster_filesystem and "paramiko" not in external_deps:
external_deps.append("paramiko")
return external_deps
def _get_stdlib_modules(self) -> Set[str]:
"""Get a set of standard library module names."""
# This is a basic set - in production, could use more comprehensive detection
stdlib_modules = {
"os",
"sys",
"json",
"pickle",
"traceback",
"pathlib",
"tempfile",
"datetime",
"hashlib",
"zipfile",
"inspect",
"shutil",
"glob",
"textwrap",
"ast",
"types",
"socket",
"subprocess",
"re",
"time",
"collections",
"itertools",
"functools",
"operator",
"math",
"random",
"string",
"urllib",
"http",
"email",
"csv",
"configparser",
"logging",
"unittest",
"io",
"contextlib",
"copy",
"weakref",
}
return stdlib_modules
def _map_module_to_package(self, module_name: str) -> Optional[str]:
"""Map a module name to its pip package name."""
# Common mappings of import names to package names
module_to_package = {
"paramiko": "paramiko",
"numpy": "numpy",
"np": "numpy",
"pandas": "pandas",
"pd": "pandas",
"sklearn": "scikit-learn",
"cv2": "opencv-python",
"PIL": "Pillow",
"yaml": "PyYAML",
"requests": "requests",
"matplotlib": "matplotlib",
"plt": "matplotlib",
"scipy": "scipy",
"torch": "torch",
"tensorflow": "tensorflow",
"tf": "tensorflow",
}
return module_to_package.get(module_name, module_name)
def _find_package_root(self, module_path: Path) -> Optional[Path]:
"""Find the root directory of a Python package."""
current = module_path.parent
while current != current.parent: # Not at filesystem root
if (current / "__init__.py").exists():
# This directory has __init__.py, keep going up
current = current.parent
else:
# This is the package root
return current.parent if current.parent != current else current
return None
def _generate_execution_script(
self, dependencies: DependencyGraph, context: ExecutionContext
) -> str:
"""Generate the execution script for remote execution."""
script = f'''#!/usr/bin/env python3
"""
Clustrix Remote Execution Script
Generated automatically for function: {dependencies.function_name}
"""
import sys
import os
import json
import pickle
import traceback
from pathlib import Path
def setup_environment():
"""Set up the execution environment."""
# Add package directories to Python path
package_dir = Path(__file__).parent
# Add sources and modules to path
sources_dir = package_dir / "sources"
modules_dir = package_dir / "modules"
if sources_dir.exists():
sys.path.insert(0, str(sources_dir))
if modules_dir.exists():
sys.path.insert(0, str(modules_dir))
# Change to data directory if it exists
data_dir = package_dir / "data"
if data_dir.exists():
os.chdir(str(data_dir))
def install_external_dependencies():
"""Install external dependencies if needed."""
package_dir = Path(__file__).parent
# Load metadata to get external dependencies
metadata_file = package_dir / "metadata.json"
if not metadata_file.exists():
return
with open(metadata_file, 'r') as f:
metadata = json.load(f)
external_deps = metadata.get("dependencies", {{}}).get("external_dependencies", [])
if external_deps:
print(f"Installing external dependencies: {{', '.join(external_deps)}}")
for dep in external_deps:
try:
import subprocess
result = subprocess.run([
sys.executable, "-m", "pip", "install", dep
], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=300)
if result.returncode == 0:
print(f"Successfully installed {{dep}}")
else:
stderr_output = result.stderr.decode() if result.stderr else "Unknown error"
print(f"Failed to install {{dep}}: {{stderr_output}}")
except Exception as e:
print(f"Error installing {{dep}}: {{e}}")
else:
print("No external dependencies to install")
def setup_cluster_filesystem():
"""Set up cluster filesystem functions if needed."""
package_dir = Path(__file__).parent
# Load cluster config
config_file = package_dir / "cluster_config.json"
if not config_file.exists():
return {{}}
with open(config_file, 'r') as f:
config_data = json.load(f)
# Try to set up filesystem functions
try:
sys.path.insert(0, str(package_dir))
from filesystem_utils import setup_filesystem_functions
# Create a simple config object
class Config:
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
config = Config(**config_data)
return setup_filesystem_functions(config)
except Exception as e:
print(f"Warning: Could not set up filesystem functions: {{e}}")
return {{}}
def load_and_execute():
"""Load function metadata and execute."""
package_dir = Path(__file__).parent
# Load metadata
with open(package_dir / "metadata.json", "r") as f:
metadata = json.load(f)
function_info = metadata["function_info"]
execution_info = metadata["execution_info"]
# Set up environment
setup_environment()
# Install external dependencies first
install_external_dependencies()
# Set up filesystem functions if needed
if metadata["dependencies"]["requires_cluster_filesystem"]:
fs_functions = setup_cluster_filesystem()
globals().update(fs_functions)
try:
# Execute function source code
function_source = function_info["source"]
local_namespace = {{}}
exec(function_source, globals(), local_namespace)
# Get the function
function_name = function_info["name"]
if function_name in local_namespace:
func = local_namespace[function_name]
elif function_name in globals():
func = globals()[function_name]
else:
raise ValueError(f"Function {{function_name}} not found after execution")
# Get arguments and reconstruct config objects if needed
args = execution_info["args"]
kwargs = execution_info["kwargs"]
# Load cluster config for reconstruction
config_file = package_dir / "cluster_config.json"
if config_file.exists():
with open(config_file, 'r') as f:
config_data = json.load(f)
# Create a proper config object
class ClusterConfig:
def __init__(self, **kwargs):
# Set cluster_type explicitly to avoid recursion
self.cluster_type = kwargs.get('cluster_type', 'local')
# Set default values for filesystem operations
self.cluster_port = kwargs.get('cluster_port', 22)
self.key_file = kwargs.get('key_file', None)
self.password = kwargs.get('password', None)
# Set all other attributes
for k, v in kwargs.items():
setattr(self, k, v)
config_obj = ClusterConfig(**config_data)
# Replace any string config arguments with proper config object
new_args = []
for arg in args:
if isinstance(arg, str) and "cluster_type" in str(arg):
# This looks like a serialized config, replace with proper object
new_args.append(config_obj)
else:
new_args.append(arg)
args = tuple(new_args)
# Also check kwargs
for key, value in kwargs.items():
if isinstance(value, str) and "cluster_type" in str(value):
kwargs[key] = config_obj
# Execute the function
result = func(*args, **kwargs)
# Save result to accessible directory (where SLURM job was submitted)
# Use environment variable set by wrapper script
result_working_dir = os.environ.get("CLUSTRIX_ORIGINAL_CWD", "/tmp")
# Create result file in accessible location
result_file = f"result_{{function_name}}_{{os.environ.get('SLURM_JOB_ID', 'unknown')}}.json"
result_path = os.path.join(result_working_dir, result_file)
# Save result as JSON for easy access
result_data = {{
"function_name": function_name,
"status": "SUCCESS",
"result": result,
"metadata": {{
"hostname": os.environ.get("HOSTNAME", "unknown"),
"slurm_job_id": os.environ.get("SLURM_JOB_ID", "not_set"),
"python_version": sys.version.split()[0],
"execution_directory": str(package_dir),
"timestamp": __import__("datetime").datetime.now().isoformat()
}}
}}
with open(result_path, "w") as f:
json.dump(result_data, f, indent=2, default=str)
print(f"Function {{function_name}} executed successfully")
print(f"Result saved to: {{result_path}}")
return result
except Exception as e:
# Save error information to accessible location
result_working_dir = os.environ.get("CLUSTRIX_ORIGINAL_CWD", "/tmp")
function_name = metadata.get("function_info", {{}}).get("name", "unknown")
error_file = f"error_{{function_name}}_{{os.environ.get('SLURM_JOB_ID', 'unknown')}}.json"
error_path = os.path.join(result_working_dir, error_file)
error_info = {{
"function_name": function_name,
"status": "ERROR",
"error": str(e),
"error_type": type(e).__name__,
"traceback": traceback.format_exc(),
"metadata": {{
"hostname": os.environ.get("HOSTNAME", "unknown"),
"slurm_job_id": os.environ.get("SLURM_JOB_ID", "not_set"),
"timestamp": __import__("datetime").datetime.now().isoformat()
}}
}}
with open(error_path, "w") as f:
json.dump(error_info, f, indent=2)
print(f"Function execution failed: {{e}}")
print(f"Error saved to: {{error_path}}")
raise
if __name__ == "__main__":
try:
result = load_and_execute()
print("Execution completed successfully")
except Exception as e:
print(f"Execution failed: {{e}}")
sys.exit(1)
'''
return script
[docs]
def create_execution_context(
cluster_config: ClusterConfig,
func_args: tuple = (),
func_kwargs: Optional[dict] = None,
) -> ExecutionContext:
"""
Create an execution context for function packaging.
Args:
cluster_config: The cluster configuration
func_args: Arguments to pass to the function
func_kwargs: Keyword arguments to pass to the function
Returns:
ExecutionContext for the function
"""
if func_kwargs is None:
func_kwargs = {}
return ExecutionContext(
working_directory=os.getcwd(),
python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
environment_variables=dict(os.environ),
cluster_config=cluster_config,
function_args=func_args,
function_kwargs=func_kwargs,
)
[docs]
def package_function_for_execution(
func: Callable,
cluster_config: ClusterConfig,
func_args: tuple = (),
func_kwargs: Optional[dict] = None,
) -> PackageInfo:
"""
Convenience function to package a function for remote execution.
Args:
func: The function to package
cluster_config: Cluster configuration
func_args: Arguments to pass to the function
func_kwargs: Keyword arguments to pass to the function
Returns:
PackageInfo with details about the created package
"""
context = create_execution_context(cluster_config, func_args, func_kwargs)
packager = FilePackager()
# Create the package
package_info = packager.package_function(func, context)
# Move the package to a more permanent location for testing/convenience use
persistent_dir = tempfile.mkdtemp(prefix="clustrix_packages_")
new_package_path = os.path.join(
persistent_dir, os.path.basename(package_info.package_path)
)
# Copy the package to the new location
import shutil
shutil.copy2(package_info.package_path, new_package_path)
# Update the package info with the new path
package_info.package_path = new_package_path
return package_info