Filesystem Utilities TutorialΒΆ

This tutorial demonstrates how to use Clustrix’s unified filesystem utilities for seamless file operations across local and remote clusters.

OverviewΒΆ

Clustrix provides a set of filesystem utilities that work identically whether you’re operating on local files or files on remote clusters. This enables data-driven cluster computing workflows where your code can discover, analyze, and process files without worrying about whether they’re local or remote.

Key BenefitsΒΆ

  • Unified API: Same function calls work locally and remotely

  • Automatic SSH Management: No need to manage SSH connections manually

  • Path Normalization: Consistent behavior across different operating systems

  • Data-Driven Workflows: Enable processing based on actual file contents and metadata

  • Seamless Integration: Works perfectly with the @cluster decorator

Getting StartedΒΆ

Basic SetupΒΆ

from clustrix import cluster_ls, cluster_find, cluster_stat, cluster_exists
from clustrix.config import ClusterConfig

# Local configuration
local_config = ClusterConfig(
    cluster_type="local",
    local_work_dir="./data"  # Local directory to work in
)

# Remote cluster configuration
remote_config = ClusterConfig(
    cluster_type="slurm",
    cluster_host="cluster.example.edu",
    username="researcher",
    remote_work_dir="/scratch/project"
)

Available OperationsΒΆ

Clustrix provides nine core filesystem operations:

  1. cluster_ls() - List directory contents

  2. cluster_find() - Find files by pattern (recursive)

  3. cluster_stat() - Get file information (size, modified time, permissions)

  4. cluster_exists() - Check if file/directory exists

  5. cluster_isdir() - Check if path is a directory

  6. cluster_isfile() - Check if path is a file

  7. cluster_glob() - Pattern matching for files

  8. cluster_du() - Directory usage information

  9. cluster_count_files() - Count files matching pattern

Basic OperationsΒΆ

Directory ListingΒΆ

# List files in current directory
files = cluster_ls(".", config)
print(f"Found {len(files)} items:")
for file in files[:5]:  # Show first 5
    print(f"  - {file}")

# List files in a specific directory
data_files = cluster_ls("datasets/", config)

File DiscoveryΒΆ

# Find all Python files recursively
py_files = cluster_find("*.py", ".", config)

# Find CSV files in a specific directory
csv_files = cluster_find("*.csv", "data/", config)

# Find files with multiple extensions
data_files = cluster_find("*.{csv,json,h5}", "datasets/", config)

File InformationΒΆ

# Get detailed file information
file_info = cluster_stat("large_dataset.h5", config)
print(f"File: {file_info.size:,} bytes")
print(f"Modified: {file_info.modified_datetime}")
print(f"Is directory: {file_info.is_dir}")
print(f"Permissions: {file_info.permissions}")

File Existence CheckingΒΆ

# Check if results already exist
if cluster_exists("results/output.json", config):
    print("Results already computed!")
else:
    print("Need to run computation")

# Check file types
if cluster_isdir("datasets", config):
    print("datasets is a directory")

if cluster_isfile("README.md", config):
    print("README.md is a file")

Pattern MatchingΒΆ

# Use glob patterns for flexible file matching
log_files = cluster_glob("*.log", "logs/", config)
backup_files = cluster_glob("backup_*.{tar,zip}", "backups/", config)

# Find all image files
images = cluster_glob("*.{png,jpg,jpeg,gif}", "images/", config)

Directory AnalysisΒΆ

# Get directory usage information
usage = cluster_du("datasets/", config)
print(f"Total size: {usage.total_gb:.2f} GB")
print(f"File count: {usage.file_count:,}")
print(f"Average file size: {usage.total_mb/usage.file_count:.1f} MB")

# Count specific file types
total_files = cluster_count_files(".", "*", config)
python_files = cluster_count_files(".", "*.py", config)
print(f"Python files: {python_files}/{total_files}")

Data-Driven WorkflowsΒΆ

The real power of filesystem utilities comes when combined with the @cluster decorator for data-driven processing:

Automatic Dataset ProcessingΒΆ

from clustrix import cluster

@cluster(cores=8, memory="16GB")
def process_datasets(config):
    """Process all CSV files in the input directory."""
    import pandas as pd

    # Find all CSV files
    csv_files = cluster_find("*.csv", "input/", config)
    print(f"Found {len(csv_files)} CSV files to process")

    results = []
    for filename in csv_files:  # This loop gets parallelized automatically!
        # Get file info to make processing decisions
        file_info = cluster_stat(filename, config)

        if file_info.size > 100_000_000:  # > 100MB
            print(f"Processing large file: {filename}")
            # Use chunked processing for large files
            result = process_large_csv(filename, config)
        else:
            # Process smaller files normally
            result = process_small_csv(filename, config)

        results.append(result)

    return results

Conditional ProcessingΒΆ

@cluster(cores=4)
def smart_processing(config):
    """Only process files if results don't already exist."""

    # Check if results already exist
    if cluster_exists("results/final_output.json", config):
        print("Results already computed, loading...")
        return load_existing_results(config)

    # Find input files
    input_files = cluster_glob("*.dat", "input/", config)

    # Process only if we have data
    if len(input_files) == 0:
        raise ValueError("No input files found!")

    return process_files(input_files, config)

Adaptive Resource AllocationΒΆ

def determine_resources(config):
    """Determine resource requirements based on data size."""

    # Count total files
    total_files = cluster_count_files("input/", "*.csv", config)

    # Get total data size
    usage = cluster_du("input/", config)

    # Adaptive resource allocation
    if usage.total_gb > 100:
        return {"cores": 16, "memory": "64GB", "time": "08:00:00"}
    elif usage.total_gb > 10:
        return {"cores": 8, "memory": "32GB", "time": "04:00:00"}
    else:
        return {"cores": 4, "memory": "16GB", "time": "02:00:00"}

# Use adaptive resources
resources = determine_resources(config)

@cluster(**resources)
def process_data(config):
    # Processing logic here
    pass

Monitoring and ValidationΒΆ

@cluster(cores=8)
def monitored_processing(config):
    """Processing with built-in monitoring."""

    # Initial state
    initial_usage = cluster_du(".", config)
    print(f"Starting with {initial_usage.total_gb:.1f} GB")

    # Find and validate input files
    input_files = cluster_find("*.raw", "input/", config)

    valid_files = []
    for filename in input_files:
        file_info = cluster_stat(filename, config)

        # Validate file size and age
        if file_info.size > 1000 and file_info.modified > cutoff_time:
            valid_files.append(filename)

    print(f"Validated {len(valid_files)} out of {len(input_files)} files")

    # Process valid files
    results = process_files(valid_files, config)

    # Check final state
    final_usage = cluster_du(".", config)
    added_data = final_usage.total_gb - initial_usage.total_gb
    print(f"Generated {added_data:.1f} GB of new data")

    return results

Advanced PatternsΒΆ

Working with Large DatasetsΒΆ

@cluster(cores=16, memory="64GB")
def process_large_dataset(config):
    """Handle large datasets efficiently."""

    # Find all data files
    data_files = cluster_glob("*.h5", "datasets/", config)

    # Group files by size for efficient processing
    small_files = []
    large_files = []

    for filename in data_files:
        file_info = cluster_stat(filename, config)
        if file_info.size > 1_000_000_000:  # > 1GB
            large_files.append(filename)
        else:
            small_files.append(filename)

    # Process small files in batches
    small_results = process_file_batch(small_files, config)

    # Process large files individually
    large_results = []
    for filename in large_files:
        result = process_single_large_file(filename, config)
        large_results.append(result)

    return small_results + large_results

Multi-Directory ProcessingΒΆ

@cluster(cores=12)
def process_multiple_directories(config):
    """Process files from multiple directories."""

    # Find all subdirectories with data
    all_dirs = cluster_ls("data/", config)
    data_dirs = [d for d in all_dirs if cluster_isdir(f"data/{d}", config)]

    results = {}
    for dir_name in data_dirs:
        dir_path = f"data/{dir_name}"

        # Check if this directory has CSV files
        csv_files = cluster_find("*.csv", dir_path, config)
        if len(csv_files) > 0:
            print(f"Processing {len(csv_files)} files in {dir_name}")
            results[dir_name] = process_directory(dir_path, config)

    return results

File Synchronization PatternsΒΆ

def sync_processing_state(local_config, remote_config):
    """Synchronize processing state between local and remote."""

    # Check what files we have locally
    local_files = set(cluster_find("*.processed", "output/", local_config))

    # Check what files exist remotely
    remote_files = set(cluster_find("*.processed", "output/", remote_config))

    # Find files that need to be processed
    local_raw = set(cluster_find("*.raw", "input/", local_config))
    remote_raw = set(cluster_find("*.raw", "input/", remote_config))

    # Determine what needs processing
    need_processing = (local_raw | remote_raw) - (local_files | remote_files)

    return list(need_processing)

Best PracticesΒΆ

Configuration ManagementΒΆ

# Use environment-specific configs
def get_config():
    import os

    if os.getenv("CLUSTRIX_ENV") == "production":
        return ClusterConfig(
            cluster_type="slurm",
            cluster_host="prod-cluster.edu",
            username="prod_user",
            remote_work_dir="/scratch/production"
        )
    else:
        return ClusterConfig(
            cluster_type="local",
            local_work_dir="./dev_data"
        )

Error HandlingΒΆ

def safe_file_operations(config):
    """Demonstrate proper error handling."""

    try:
        # Check if directory exists before listing
        if not cluster_exists("data/", config):
            print("Data directory doesn't exist")
            return []

        # Safe file listing
        files = cluster_ls("data/", config)

        # Validate files before processing
        valid_files = []
        for filename in files:
            try:
                file_info = cluster_stat(f"data/{filename}", config)
                if file_info.size > 0:  # Non-empty files only
                    valid_files.append(filename)
            except FileNotFoundError:
                print(f"File disappeared: {filename}")
                continue

        return valid_files

    except Exception as e:
        print(f"Error in file operations: {e}")
        return []

Performance OptimizationΒΆ

def optimized_file_discovery(config):
    """Optimize file discovery for large directories."""

    # Use count to check if directory has files before listing
    file_count = cluster_count_files("large_directory/", "*", config)

    if file_count == 0:
        return []

    if file_count > 10000:
        # For very large directories, use pattern-specific searches
        csv_files = cluster_find("*.csv", "large_directory/", config)
        json_files = cluster_find("*.json", "large_directory/", config)
        return csv_files + json_files
    else:
        # For smaller directories, list all and filter
        all_files = cluster_ls("large_directory/", config)
        return [f for f in all_files if f.endswith(('.csv', '.json'))]

Integration ExamplesΒΆ

With PandasΒΆ

@cluster(cores=8)
def pandas_integration(config):
    """Integrate filesystem utilities with Pandas."""
    import pandas as pd

    # Find all CSV files
    csv_files = cluster_find("*.csv", "data/", config)

    dataframes = []
    for filename in csv_files:
        # Check file size to determine read strategy
        file_info = cluster_stat(filename, config)

        if file_info.size > 500_000_000:  # > 500MB
            # Use chunked reading for large files
            chunks = pd.read_csv(filename, chunksize=10000)
            df = pd.concat([chunk.sample(frac=0.1) for chunk in chunks])
        else:
            df = pd.read_csv(filename)

        dataframes.append(df)

    return pd.concat(dataframes, ignore_index=True)

With NumPy/HDF5ΒΆ

@cluster(cores=4, memory="32GB")
def numpy_hdf5_integration(config):
    """Work with NumPy arrays and HDF5 files."""
    import numpy as np
    import h5py

    # Find all HDF5 files
    h5_files = cluster_find("*.h5", "arrays/", config)

    total_arrays = 0
    total_size = 0

    for filename in h5_files:
        file_info = cluster_stat(filename, config)
        total_size += file_info.size

        # Count arrays in each file (this would work locally)
        if config.cluster_type == "local":
            with h5py.File(filename, 'r') as f:
                total_arrays += len(f.keys())

    print(f"Found {total_arrays} arrays in {len(h5_files)} files")
    print(f"Total size: {total_size / 1e9:.1f} GB")

    return {"files": len(h5_files), "arrays": total_arrays, "size_gb": total_size / 1e9}

TroubleshootingΒΆ

Common IssuesΒΆ

SSH Connection Problems

# Test SSH connectivity
try:
    files = cluster_ls(".", remote_config)
    print("SSH connection working")
except Exception as e:
    print(f"SSH connection failed: {e}")
    # Check your SSH keys, hostname, username

Path Issues

# Always use relative paths or properly configured absolute paths
# Good:
files = cluster_ls("data/", config)

# Be careful with absolute paths:
files = cluster_ls("/scratch/user/data/", config)  # Must exist on cluster

Performance Issues

# For large directories, avoid listing all files
# Instead of:
all_files = cluster_ls("huge_directory/", config)  # Slow!

# Use:
csv_files = cluster_find("*.csv", "huge_directory/", config)  # Faster!

Next StepsΒΆ

  • Explore the Filesystem Utilities API reference for detailed function documentation

  • Check out the complete tutorial at examples/filesystem_tutorial.py

  • Learn about Configuration API for advanced configuration options

  • See Decorator API for more @cluster decorator patterns