Filesystem Utilities TutorialΒΆ
This tutorial demonstrates how to use Clustrixβs unified filesystem utilities for seamless file operations across local and remote clusters.
OverviewΒΆ
Clustrix provides a set of filesystem utilities that work identically whether youβre operating on local files or files on remote clusters. This enables data-driven cluster computing workflows where your code can discover, analyze, and process files without worrying about whether theyβre local or remote.
Key BenefitsΒΆ
Unified API: Same function calls work locally and remotely
Automatic SSH Management: No need to manage SSH connections manually
Path Normalization: Consistent behavior across different operating systems
Data-Driven Workflows: Enable processing based on actual file contents and metadata
Seamless Integration: Works perfectly with the
@clusterdecorator
Getting StartedΒΆ
Basic SetupΒΆ
from clustrix import cluster_ls, cluster_find, cluster_stat, cluster_exists
from clustrix.config import ClusterConfig
# Local configuration
local_config = ClusterConfig(
cluster_type="local",
local_work_dir="./data" # Local directory to work in
)
# Remote cluster configuration
remote_config = ClusterConfig(
cluster_type="slurm",
cluster_host="cluster.example.edu",
username="researcher",
remote_work_dir="/scratch/project"
)
Available OperationsΒΆ
Clustrix provides nine core filesystem operations:
cluster_ls() - List directory contents
cluster_find() - Find files by pattern (recursive)
cluster_stat() - Get file information (size, modified time, permissions)
cluster_exists() - Check if file/directory exists
cluster_isdir() - Check if path is a directory
cluster_isfile() - Check if path is a file
cluster_glob() - Pattern matching for files
cluster_du() - Directory usage information
cluster_count_files() - Count files matching pattern
Basic OperationsΒΆ
Directory ListingΒΆ
# List files in current directory
files = cluster_ls(".", config)
print(f"Found {len(files)} items:")
for file in files[:5]: # Show first 5
print(f" - {file}")
# List files in a specific directory
data_files = cluster_ls("datasets/", config)
File DiscoveryΒΆ
# Find all Python files recursively
py_files = cluster_find("*.py", ".", config)
# Find CSV files in a specific directory
csv_files = cluster_find("*.csv", "data/", config)
# Find files with multiple extensions
data_files = cluster_find("*.{csv,json,h5}", "datasets/", config)
File InformationΒΆ
# Get detailed file information
file_info = cluster_stat("large_dataset.h5", config)
print(f"File: {file_info.size:,} bytes")
print(f"Modified: {file_info.modified_datetime}")
print(f"Is directory: {file_info.is_dir}")
print(f"Permissions: {file_info.permissions}")
File Existence CheckingΒΆ
# Check if results already exist
if cluster_exists("results/output.json", config):
print("Results already computed!")
else:
print("Need to run computation")
# Check file types
if cluster_isdir("datasets", config):
print("datasets is a directory")
if cluster_isfile("README.md", config):
print("README.md is a file")
Pattern MatchingΒΆ
# Use glob patterns for flexible file matching
log_files = cluster_glob("*.log", "logs/", config)
backup_files = cluster_glob("backup_*.{tar,zip}", "backups/", config)
# Find all image files
images = cluster_glob("*.{png,jpg,jpeg,gif}", "images/", config)
Directory AnalysisΒΆ
# Get directory usage information
usage = cluster_du("datasets/", config)
print(f"Total size: {usage.total_gb:.2f} GB")
print(f"File count: {usage.file_count:,}")
print(f"Average file size: {usage.total_mb/usage.file_count:.1f} MB")
# Count specific file types
total_files = cluster_count_files(".", "*", config)
python_files = cluster_count_files(".", "*.py", config)
print(f"Python files: {python_files}/{total_files}")
Data-Driven WorkflowsΒΆ
The real power of filesystem utilities comes when combined with the @cluster decorator for data-driven processing:
Automatic Dataset ProcessingΒΆ
from clustrix import cluster
@cluster(cores=8, memory="16GB")
def process_datasets(config):
"""Process all CSV files in the input directory."""
import pandas as pd
# Find all CSV files
csv_files = cluster_find("*.csv", "input/", config)
print(f"Found {len(csv_files)} CSV files to process")
results = []
for filename in csv_files: # This loop gets parallelized automatically!
# Get file info to make processing decisions
file_info = cluster_stat(filename, config)
if file_info.size > 100_000_000: # > 100MB
print(f"Processing large file: {filename}")
# Use chunked processing for large files
result = process_large_csv(filename, config)
else:
# Process smaller files normally
result = process_small_csv(filename, config)
results.append(result)
return results
Conditional ProcessingΒΆ
@cluster(cores=4)
def smart_processing(config):
"""Only process files if results don't already exist."""
# Check if results already exist
if cluster_exists("results/final_output.json", config):
print("Results already computed, loading...")
return load_existing_results(config)
# Find input files
input_files = cluster_glob("*.dat", "input/", config)
# Process only if we have data
if len(input_files) == 0:
raise ValueError("No input files found!")
return process_files(input_files, config)
Adaptive Resource AllocationΒΆ
def determine_resources(config):
"""Determine resource requirements based on data size."""
# Count total files
total_files = cluster_count_files("input/", "*.csv", config)
# Get total data size
usage = cluster_du("input/", config)
# Adaptive resource allocation
if usage.total_gb > 100:
return {"cores": 16, "memory": "64GB", "time": "08:00:00"}
elif usage.total_gb > 10:
return {"cores": 8, "memory": "32GB", "time": "04:00:00"}
else:
return {"cores": 4, "memory": "16GB", "time": "02:00:00"}
# Use adaptive resources
resources = determine_resources(config)
@cluster(**resources)
def process_data(config):
# Processing logic here
pass
Monitoring and ValidationΒΆ
@cluster(cores=8)
def monitored_processing(config):
"""Processing with built-in monitoring."""
# Initial state
initial_usage = cluster_du(".", config)
print(f"Starting with {initial_usage.total_gb:.1f} GB")
# Find and validate input files
input_files = cluster_find("*.raw", "input/", config)
valid_files = []
for filename in input_files:
file_info = cluster_stat(filename, config)
# Validate file size and age
if file_info.size > 1000 and file_info.modified > cutoff_time:
valid_files.append(filename)
print(f"Validated {len(valid_files)} out of {len(input_files)} files")
# Process valid files
results = process_files(valid_files, config)
# Check final state
final_usage = cluster_du(".", config)
added_data = final_usage.total_gb - initial_usage.total_gb
print(f"Generated {added_data:.1f} GB of new data")
return results
Advanced PatternsΒΆ
Working with Large DatasetsΒΆ
@cluster(cores=16, memory="64GB")
def process_large_dataset(config):
"""Handle large datasets efficiently."""
# Find all data files
data_files = cluster_glob("*.h5", "datasets/", config)
# Group files by size for efficient processing
small_files = []
large_files = []
for filename in data_files:
file_info = cluster_stat(filename, config)
if file_info.size > 1_000_000_000: # > 1GB
large_files.append(filename)
else:
small_files.append(filename)
# Process small files in batches
small_results = process_file_batch(small_files, config)
# Process large files individually
large_results = []
for filename in large_files:
result = process_single_large_file(filename, config)
large_results.append(result)
return small_results + large_results
Multi-Directory ProcessingΒΆ
@cluster(cores=12)
def process_multiple_directories(config):
"""Process files from multiple directories."""
# Find all subdirectories with data
all_dirs = cluster_ls("data/", config)
data_dirs = [d for d in all_dirs if cluster_isdir(f"data/{d}", config)]
results = {}
for dir_name in data_dirs:
dir_path = f"data/{dir_name}"
# Check if this directory has CSV files
csv_files = cluster_find("*.csv", dir_path, config)
if len(csv_files) > 0:
print(f"Processing {len(csv_files)} files in {dir_name}")
results[dir_name] = process_directory(dir_path, config)
return results
File Synchronization PatternsΒΆ
def sync_processing_state(local_config, remote_config):
"""Synchronize processing state between local and remote."""
# Check what files we have locally
local_files = set(cluster_find("*.processed", "output/", local_config))
# Check what files exist remotely
remote_files = set(cluster_find("*.processed", "output/", remote_config))
# Find files that need to be processed
local_raw = set(cluster_find("*.raw", "input/", local_config))
remote_raw = set(cluster_find("*.raw", "input/", remote_config))
# Determine what needs processing
need_processing = (local_raw | remote_raw) - (local_files | remote_files)
return list(need_processing)
Best PracticesΒΆ
Configuration ManagementΒΆ
# Use environment-specific configs
def get_config():
import os
if os.getenv("CLUSTRIX_ENV") == "production":
return ClusterConfig(
cluster_type="slurm",
cluster_host="prod-cluster.edu",
username="prod_user",
remote_work_dir="/scratch/production"
)
else:
return ClusterConfig(
cluster_type="local",
local_work_dir="./dev_data"
)
Error HandlingΒΆ
def safe_file_operations(config):
"""Demonstrate proper error handling."""
try:
# Check if directory exists before listing
if not cluster_exists("data/", config):
print("Data directory doesn't exist")
return []
# Safe file listing
files = cluster_ls("data/", config)
# Validate files before processing
valid_files = []
for filename in files:
try:
file_info = cluster_stat(f"data/{filename}", config)
if file_info.size > 0: # Non-empty files only
valid_files.append(filename)
except FileNotFoundError:
print(f"File disappeared: {filename}")
continue
return valid_files
except Exception as e:
print(f"Error in file operations: {e}")
return []
Performance OptimizationΒΆ
def optimized_file_discovery(config):
"""Optimize file discovery for large directories."""
# Use count to check if directory has files before listing
file_count = cluster_count_files("large_directory/", "*", config)
if file_count == 0:
return []
if file_count > 10000:
# For very large directories, use pattern-specific searches
csv_files = cluster_find("*.csv", "large_directory/", config)
json_files = cluster_find("*.json", "large_directory/", config)
return csv_files + json_files
else:
# For smaller directories, list all and filter
all_files = cluster_ls("large_directory/", config)
return [f for f in all_files if f.endswith(('.csv', '.json'))]
Integration ExamplesΒΆ
With PandasΒΆ
@cluster(cores=8)
def pandas_integration(config):
"""Integrate filesystem utilities with Pandas."""
import pandas as pd
# Find all CSV files
csv_files = cluster_find("*.csv", "data/", config)
dataframes = []
for filename in csv_files:
# Check file size to determine read strategy
file_info = cluster_stat(filename, config)
if file_info.size > 500_000_000: # > 500MB
# Use chunked reading for large files
chunks = pd.read_csv(filename, chunksize=10000)
df = pd.concat([chunk.sample(frac=0.1) for chunk in chunks])
else:
df = pd.read_csv(filename)
dataframes.append(df)
return pd.concat(dataframes, ignore_index=True)
With NumPy/HDF5ΒΆ
@cluster(cores=4, memory="32GB")
def numpy_hdf5_integration(config):
"""Work with NumPy arrays and HDF5 files."""
import numpy as np
import h5py
# Find all HDF5 files
h5_files = cluster_find("*.h5", "arrays/", config)
total_arrays = 0
total_size = 0
for filename in h5_files:
file_info = cluster_stat(filename, config)
total_size += file_info.size
# Count arrays in each file (this would work locally)
if config.cluster_type == "local":
with h5py.File(filename, 'r') as f:
total_arrays += len(f.keys())
print(f"Found {total_arrays} arrays in {len(h5_files)} files")
print(f"Total size: {total_size / 1e9:.1f} GB")
return {"files": len(h5_files), "arrays": total_arrays, "size_gb": total_size / 1e9}
TroubleshootingΒΆ
Common IssuesΒΆ
SSH Connection Problems
# Test SSH connectivity
try:
files = cluster_ls(".", remote_config)
print("SSH connection working")
except Exception as e:
print(f"SSH connection failed: {e}")
# Check your SSH keys, hostname, username
Path Issues
# Always use relative paths or properly configured absolute paths
# Good:
files = cluster_ls("data/", config)
# Be careful with absolute paths:
files = cluster_ls("/scratch/user/data/", config) # Must exist on cluster
Performance Issues
# For large directories, avoid listing all files
# Instead of:
all_files = cluster_ls("huge_directory/", config) # Slow!
# Use:
csv_files = cluster_find("*.csv", "huge_directory/", config) # Faster!
Next StepsΒΆ
Explore the Filesystem Utilities API reference for detailed function documentation
Check out the complete tutorial at
examples/filesystem_tutorial.pyLearn about Configuration API for advanced configuration options
See Decorator API for more
@clusterdecorator patterns