Amazon Web Services (AWS) Cloud Tutorial

This tutorial demonstrates how to use Clustrix with Amazon Web Services (AWS) cloud infrastructure for scalable distributed computing.

Open In Colab

Overview

AWS provides several services that work well with Clustrix:

  • EC2: Virtual machines for compute clusters

  • AWS Batch: Managed job scheduling service

  • ECS: Container orchestration

  • ParallelCluster: HPC cluster management

  • S3: Object storage for data and results

  • VPC: Network isolation and security

Prerequisites

Before starting this tutorial, ensure you have:

  1. AWS Account: Active AWS account with billing enabled

  2. AWS CLI: Installed and configured on your local machine

  3. SSH Key Pair: Generated and uploaded to AWS EC2 for secure access

  4. IAM Permissions: Appropriate permissions for EC2, S3, and other services

  5. Basic AWS Knowledge: Understanding of AWS services, regions, and availability zones

  6. Python Environment: Python 3.7+ with pip installed

Complete AWS Setup Guide

Step 1: Create AWS Account

  1. Go to aws.amazon.com and create an account

  2. Verify your email and provide payment information

  3. Choose the Basic Support plan (free)

Step 2: Install AWS CLI

# On macOS
brew install awscli

# On Linux/WSL
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install

# On Windows
# Download and run the AWS CLI MSI installer from AWS documentation

Step 3: Create IAM User and Access Keys

  1. Go to AWS Console → IAM → Users → Create User

  2. Create a user with programmatic access

  3. Attach policies: AmazonEC2FullAccess, AmazonS3FullAccess, IAMReadOnlyAccess

  4. Save the Access Key ID and Secret Access Key securely

Step 4: Generate SSH Key Pair

# Generate SSH key pair locally
ssh-keygen -t rsa -b 4096 -f ~/.ssh/aws-clustrix-key

# Import public key to AWS
aws ec2 import-key-pair --key-name "clustrix-key" --public-key-material fileb://~/.ssh/aws-clustrix-key.pub

Installation and Setup

Install Clustrix with AWS dependencies:

[ ]:
# Install Clustrix with AWS support
!pip install clustrix boto3 awscli

# Import required libraries
import clustrix
from clustrix import cluster, configure
import boto3
import os
import numpy as np
import time

AWS Credentials Configuration

Configure your AWS credentials using one of the following methods:

Option 1: AWS CLI Configuration (Recommended)

Run the following command in your terminal to configure credentials interactively:

aws configure

You’ll be prompted to enter:

  • AWS Access Key ID

  • AWS Secret Access Key

  • Default region name (e.g., us-east-1)

  • Default output format (json)

This creates credential files at ~/.aws/credentials and ~/.aws/config.

[ ]:
# Configure AWS CLI (run this in terminal)
# aws configure

# Verify configuration
!aws sts get-caller-identity

Option 2: Environment Variables

[ ]:
# Option 2: Set AWS credentials as environment variables (if needed)
# os.environ['AWS_ACCESS_KEY_ID'] = 'your-access-key'
# os.environ['AWS_SECRET_ACCESS_KEY'] = 'your-secret-key'
# os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

# Test AWS connection
try:
    ec2 = boto3.client('ec2')
    regions = ec2.describe_regions()
    print(f"✓ Successfully connected to AWS. Available regions: {len(regions['Regions'])}")
except Exception as e:
    print(f"✗ AWS connection failed: {e}")

Method 1: Direct EC2 Instance Configuration

Prerequisites: Create Security Group

Before launching an EC2 instance, you need to create a security group that allows SSH access. You can do this through the AWS Console or use the function provided in the Security section below.

Quick Setup via AWS Console:

  1. Go to EC2 → Security Groups → Create Security Group

  2. Name: clustrix-sg

  3. Add inbound rule: SSH (port 22) from your IP address only

  4. Note the Security Group ID (sg-xxxxxxxxx)

Launch EC2 Instance for Clustrix

This example shows how to programmatically launch an EC2 instance suitable for Clustrix:

[ ]:
def launch_clustrix_ec2_instance(key_name, security_group_id, instance_type='t3.large'):
    """
    Launch an EC2 instance configured for Clustrix.

    Args:
        key_name: Name of your EC2 key pair
        security_group_id: Security group ID that allows SSH access
        instance_type: EC2 instance type

    Returns:
        Instance ID and public IP
    """
    ec2 = boto3.client('ec2')

    # User data script to setup Python environment
    user_data = '''
#!/bin/bash
yum update -y
yum install -y python3 python3-pip git
pip3 install clustrix numpy scipy pandas

# Install uv for faster package management
curl -LsSf https://astral.sh/uv/install.sh | sh
source $HOME/.cargo/env

# Create clustrix user
useradd -m -s /bin/bash clustrix
mkdir -p /home/clustrix/.ssh
cp /home/ec2-user/.ssh/authorized_keys /home/clustrix/.ssh/
chown -R clustrix:clustrix /home/clustrix/.ssh
chmod 700 /home/clustrix/.ssh
chmod 600 /home/clustrix/.ssh/authorized_keys

# Setup sudo access
echo "clustrix ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers
'''

    try:
        response = ec2.run_instances(
            ImageId='ami-0c02fb55956c7d316',  # Amazon Linux 2 AMI
            MinCount=1,
            MaxCount=1,
            InstanceType=instance_type,
            KeyName=key_name,
            SecurityGroupIds=[security_group_id],
            UserData=user_data,
            TagSpecifications=[
                {
                    'ResourceType': 'instance',
                    'Tags': [
                        {'Key': 'Name', 'Value': 'Clustrix-Compute-Node'},
                        {'Key': 'Purpose', 'Value': 'Clustrix-Tutorial'}
                    ]
                }
            ]
        )

        instance_id = response['Instances'][0]['InstanceId']

        # Wait for instance to be running
        waiter = ec2.get_waiter('instance_running')
        waiter.wait(InstanceIds=[instance_id])

        # Get public IP
        instance_info = ec2.describe_instances(InstanceIds=[instance_id])
        public_ip = instance_info['Reservations'][0]['Instances'][0].get('PublicIpAddress')

        return instance_id, public_ip

    except Exception as e:
        print(f"Error launching instance: {e}")
        return None, None

# Example usage (uncomment and modify with your details)
# instance_id, public_ip = launch_clustrix_ec2_instance(
#     key_name='clustrix-key',
#     security_group_id='sg-xxxxxxxxx'
# )
#
# if instance_id and public_ip:
#     print(f"✓ Instance launched: {instance_id}")
#     print(f"✓ Public IP: {public_ip}")
#     print("⏳ Wait 2-3 minutes for user data script to complete before connecting.")
# else:
#     print("✗ Failed to launch instance")

Configure Clustrix for EC2

[ ]:
# Configure Clustrix to use your EC2 instance
configure(
    cluster_type="ssh",
    cluster_host="your-ec2-public-ip",  # Replace with actual IP
    username="clustrix",  # or "ec2-user" if using default user
    key_file="~/.ssh/your-key.pem",  # Path to your private key
    remote_work_dir="/tmp/clustrix",
    package_manager="auto",  # Will use uv if available, fallback to pip
    default_cores=4,
    default_memory="8GB",
    default_time="01:00:00"
)

Configuration Complete!

Your Clustrix is now configured to use the EC2 instance. Make sure to replace your-ec2-public-ip with the actual IP address of your running EC2 instance.

Example: Remote Computation on EC2

[ ]:
@cluster(cores=2, memory="4GB")
def aws_monte_carlo_pi(n_samples=1000000):
    """Estimate π using Monte Carlo method on AWS EC2."""
    import numpy as np

    # Generate random points
    x = np.random.uniform(-1, 1, n_samples)
    y = np.random.uniform(-1, 1, n_samples)

    # Count points inside unit circle
    inside_circle = (x**2 + y**2) <= 1
    pi_estimate = 4 * np.sum(inside_circle) / n_samples

    return {
        'pi_estimate': pi_estimate,
        'n_samples': n_samples,
        'error': abs(pi_estimate - np.pi)
    }

# Example usage (uncomment to run on your EC2 instance):
# result = aws_monte_carlo_pi(n_samples=5000000)
# print(f"π estimate: {result['pi_estimate']:.6f}")
# print(f"Error: {result['error']:.6f}")
# print(f"Samples used: {result['n_samples']:,}")

Ready to Run!

The Monte Carlo π estimation function is now defined and ready to execute on your EC2 instance. Simply uncomment the example usage lines above to run the computation remotely on AWS.

Method 2: AWS Batch Configuration

AWS Batch provides managed job scheduling for more complex workloads:

[ ]:
def create_aws_batch_environment():
    """
    Example of setting up AWS Batch compute environment.
    This is a template - you'll need to adapt it to your specific needs.
    """
    batch = boto3.client('batch')
    ec2 = boto3.client('ec2')
    iam = boto3.client('iam')

    # This is a simplified example - real setup requires:
    # 1. VPC and subnet configuration
    # 2. IAM roles and policies
    # 3. Security groups
    # 4. Compute environment
    # 5. Job queue
    # 6. Job definition

    return {
        'compute_environment': 'clustrix-batch-env',
        'job_queue': 'clustrix-queue',
        'job_definition': 'clustrix-job-def'
    }

# batch_config = create_aws_batch_environment()

Note on AWS Batch Complexity

AWS Batch setup is complex and requires careful configuration of networking, IAM, and compute resources. For easier HPC setups, consider using AWS ParallelCluster or EKS instead. The function above provides a template structure for those who want to implement full Batch integration.

Method 3: AWS ParallelCluster Integration

AWS ParallelCluster is designed for HPC workloads and integrates well with Clustrix:

[ ]:
# Configure Clustrix for ParallelCluster
def configure_for_parallelcluster(cluster_name, master_ip):
    """Configure Clustrix to use AWS ParallelCluster."""
    configure(
        cluster_type="slurm",
        cluster_host=master_ip,
        username="ec2-user",
        key_file="~/.ssh/aws-clustrix-key",
        remote_work_dir="/shared/clustrix",  # Use shared storage
        package_manager="uv",
        module_loads=["python3"],  # Load required modules
        default_cores=4,
        default_memory="8GB",
        default_time="01:00:00",
        default_partition="compute"
    )
    return f"Configured Clustrix for ParallelCluster: {cluster_name}"

# Example usage:
# result = configure_for_parallelcluster("my-cluster", "10.0.0.100")
# print(result)

ParallelCluster Configuration Example

Here’s a sample ParallelCluster configuration file for use with Clustrix:

# Save as ~/.parallelcluster/config
[aws]
aws_region_name = us-east-1

[global]
cluster_template = clustrix-template
update_check = false
sanity_check = true

[cluster clustrix-template]
key_name = your-key-name
vpc_settings = vpc-settings
compute_instance_type = c5.xlarge
master_instance_type = t3.medium
initial_queue_size = 0
max_queue_size = 10
scheduler = slurm
placement_group = DYNAMIC
placement = compute
disable_hyperthreading = false
post_install = https://raw.githubusercontent.com/your-repo/clustrix-setup.sh

[vpc vpc-settings]
vpc_id = vpc-xxxxxxxxx
master_subnet_id = subnet-xxxxxxxxx
compute_subnet_id = subnet-xxxxxxxxx

Data Management with S3

Integrate S3 for data input/output:

[ ]:
@cluster(cores=2, memory="4GB")
def process_s3_data(bucket_name, input_key, output_key):
    """Process data from S3 and save results back to S3."""
    import boto3
    import numpy as np
    import pickle
    import io

    s3 = boto3.client('s3')

    # Download data from S3
    response = s3.get_object(Bucket=bucket_name, Key=input_key)
    data = pickle.loads(response['Body'].read())

    # Process the data
    processed_data = {
        'original_shape': data.shape if hasattr(data, 'shape') else len(data),
        'mean': np.mean(data) if hasattr(data, '__iter__') else data,
        'std': np.std(data) if hasattr(data, '__iter__') else 0,
        'processing_timestamp': time.time()
    }

    # Upload results to S3
    output_buffer = io.BytesIO()
    pickle.dump(processed_data, output_buffer)
    output_buffer.seek(0)

    s3.put_object(
        Bucket=bucket_name,
        Key=output_key,
        Body=output_buffer.getvalue()
    )

    return f"Processed data saved to s3://{bucket_name}/{output_key}"

# Example S3 utility functions
def upload_to_s3(data, bucket_name, key):
    """Upload data to S3."""
    s3 = boto3.client('s3')
    buffer = io.BytesIO()
    pickle.dump(data, buffer)
    buffer.seek(0)
    s3.put_object(Bucket=bucket_name, Key=key, Body=buffer.getvalue())
    print(f"✓ Data uploaded to s3://{bucket_name}/{key}")

def download_from_s3(bucket_name, key):
    """Download data from S3."""
    s3 = boto3.client('s3')
    response = s3.get_object(Bucket=bucket_name, Key=key)
    data = pickle.loads(response['Body'].read())
    print(f"✓ Data downloaded from s3://{bucket_name}/{key}")
    return data

# Example usage:
# sample_data = np.random.rand(1000, 100)
# upload_to_s3(sample_data, 'your-bucket', 'input/sample_data.pkl')
# result = process_s3_data('your-bucket', 'input/sample_data.pkl', 'output/results.pkl')
# print(result)

Security Best Practices

Security Group Configuration

[ ]:
def create_clustrix_security_group(vpc_id, your_ip):
    """
    Create a security group for Clustrix with minimal required access.

    Args:
        vpc_id: VPC ID where to create the security group
        your_ip: Your public IP address (get from https://checkip.amazonaws.com)

    Returns:
        Security group ID
    """
    ec2 = boto3.client('ec2')

    try:
        response = ec2.create_security_group(
            GroupName='clustrix-sg',
            Description='Security group for Clustrix compute nodes',
            VpcId=vpc_id
        )

        sg_id = response['GroupId']

        # Add SSH access from your IP only
        ec2.authorize_security_group_ingress(
            GroupId=sg_id,
            IpPermissions=[
                {
                    'IpProtocol': 'tcp',
                    'FromPort': 22,
                    'ToPort': 22,
                    'IpRanges': [{'CidrIp': f'{your_ip}/32', 'Description': 'SSH access'}]
                }
            ]
        )

        print(f"✓ Created security group: {sg_id}")
        return sg_id

    except Exception as e:
        print(f"✗ Error creating security group: {e}")
        return None

# Helper function to get your public IP
def get_my_public_ip():
    """Get your current public IP address."""
    import requests
    try:
        response = requests.get('https://checkip.amazonaws.com')
        return response.text.strip()
    except:
        print("Could not determine public IP. Please check manually at https://checkip.amazonaws.com")
        return None

# Example usage:
# my_ip = get_my_public_ip()
# if my_ip:
#     print(f"Your public IP: {my_ip}")
#     # sg_id = create_clustrix_security_group('vpc-xxxxxxxxx', my_ip)

AWS Security Checklist for Clustrix

Authentication & Access

  • Use IAM roles instead of access keys when possible

  • Restrict security groups to your IP address only

  • Regularly rotate SSH keys and access credentials

Network Security

  • Use private subnets for compute nodes when possible

  • Enable VPC Flow Logs for network monitoring

  • Use AWS Systems Manager Session Manager instead of direct SSH when possible

Data Protection

  • Use encrypted EBS volumes and S3 buckets

  • Enable CloudTrail for API logging

Monitoring & Management

  • Set up billing alerts to monitor costs

  • Tag all resources for cost tracking and management

Cost Optimization

[ ]:
# Import Clustrix cost monitoring for AWS
from clustrix import cost_tracking_decorator, get_cost_monitor, generate_cost_report, get_pricing_info

# Example 1: Cost tracking with AWS instances
@cost_tracking_decorator('aws', 'p3.2xlarge')
@cluster(cores=8, memory="60GB")
def aws_training_with_cost_tracking():
    """Example training function with AWS cost tracking."""
    import time
    import numpy as np

    print("Starting AWS training with cost monitoring...")
    time.sleep(3)  # Simulate training

    # Simulate GPU workload
    data = np.random.randn(2000, 2000)
    result = np.linalg.svd(data)

    print("Training completed!")
    return {'accuracy': 0.92, 'epochs': 50}

# Example 2: Compare AWS pricing
def compare_aws_pricing():
    """Compare AWS EC2 pricing for different instance types."""
    pricing = get_pricing_info('aws')
    if pricing:
        print("AWS EC2 On-Demand Pricing (USD/hour):")

        # Group by category
        gpu_instances = {k: v for k, v in pricing.items() if k.startswith(('p3', 'p4d', 'g4dn'))}
        compute_instances = {k: v for k, v in pricing.items() if k.startswith('c5')}
        memory_instances = {k: v for k, v in pricing.items() if k.startswith('r5')}

        print("\nGPU Instances:")
        for instance, price in sorted(gpu_instances.items(), key=lambda x: x[1]):
            print(f"  {instance:<20}: ${price:.3f}/hour")

        print("\nCompute Optimized:")
        for instance, price in sorted(compute_instances.items(), key=lambda x: x[1]):
            print(f"  {instance:<20}: ${price:.3f}/hour")

        print("\nMemory Optimized:")
        for instance, price in sorted(memory_instances.items(), key=lambda x: x[1]):
            print(f"  {instance:<20}: ${price:.3f}/hour")

# Example 3: AWS Spot vs On-Demand cost analysis
def aws_spot_cost_analysis():
    """Analyze potential savings with AWS Spot instances."""
    monitor = get_cost_monitor('aws')
    if monitor:
        print("AWS Spot Instance Savings Analysis:")
        print("-" * 40)

        instance_types = ['p3.2xlarge', 'p3.8xlarge', 'g4dn.xlarge', 'c5.large']

        for instance in instance_types:
            on_demand = monitor.estimate_cost(instance, 1.0, use_spot=False)
            spot = monitor.estimate_cost(instance, 1.0, use_spot=True)
            savings = ((on_demand.hourly_rate - spot.hourly_rate) / on_demand.hourly_rate) * 100

            print(f"{instance}:")
            print(f"  On-Demand: ${on_demand.hourly_rate:.3f}/hour")
            print(f"  Spot:      ${spot.hourly_rate:.3f}/hour")
            print(f"  Savings:   {savings:.1f}%")
            print()

# Example 4: AWS Batch cost estimation
def estimate_aws_batch_costs():
    """Estimate costs for AWS Batch workloads."""
    monitor = get_cost_monitor('aws')
    if monitor:
        batch_estimate = monitor.estimate_batch_cost(
            job_queue="clustrix-batch-queue",
            compute_environment="clustrix-compute-env",
            estimated_jobs=100,
            avg_job_duration_hours=0.25
        )

        print("AWS Batch Cost Estimation:")
        print(f"  Job Queue: {batch_estimate['job_queue']}")
        print(f"  Total Jobs: {batch_estimate['estimated_jobs']}")
        print(f"  Avg Duration: {batch_estimate['avg_job_duration_hours']} hours")
        print(f"  Total Compute Hours: {batch_estimate['total_compute_hours']}")
        print(f"  Estimated Cost: ${batch_estimate['estimated_cost']:.2f}")
        print(f"  Cost per Job: ${batch_estimate['cost_per_job']:.4f}")

# Example 5: Regional pricing comparison
def compare_aws_regions():
    """Compare AWS pricing across different regions."""
    monitor = get_cost_monitor('aws')
    if monitor:
        print("AWS Regional Pricing Comparison for p3.2xlarge:")
        print("-" * 50)

        regional_pricing = monitor.get_region_pricing_comparison('p3.2xlarge')
        for region, pricing_info in regional_pricing.items():
            print(f"{region}:")
            print(f"  On-Demand: ${pricing_info['on_demand_hourly']:.3f}/hour")
            print(f"  Est. Spot: ${pricing_info['estimated_spot_hourly']:.3f}/hour")
            print()

# Example 6: Real-time AWS cost monitoring
def monitor_aws_costs():
    """Monitor current AWS resource usage and costs."""
    report = generate_cost_report('aws', 'p3.2xlarge')
    if report:
        print("Current AWS Resource Status:")
        print(f"  CPU Usage: {report['resource_usage']['cpu_percent']:.1f}%")
        print(f"  Memory Usage: {report['resource_usage']['memory_percent']:.1f}%")
        if report['resource_usage']['gpu_stats']:
            print(f"  GPU Count: {len(report['resource_usage']['gpu_stats'])}")
        print(f"  Hourly Rate: ${report['cost_estimate']['hourly_rate']:.3f}")

        if report['recommendations']:
            print("\nCost Optimization Recommendations:")
            for rec in report['recommendations']:
                print(f"  • {rec}")

# Run examples
print("AWS Cost Monitoring Examples:")
print("=" * 40)

print("\n1. AWS Pricing Comparison:")
compare_aws_pricing()

print("\n2. Spot Instance Savings Analysis:")
aws_spot_cost_analysis()

print("\n3. AWS Batch Cost Estimation:")
estimate_aws_batch_costs()

print("\n4. Regional Pricing Comparison:")
compare_aws_regions()

print("\n5. Current AWS Status:")
monitor_aws_costs()

print("\n✅ AWS cost monitoring examples ready!")
print("💡 Use @cost_tracking_decorator('aws', 'instance_type') for automatic cost tracking")

AWS Cost Optimization for Clustrix

1. Instance Selection

  • Use Spot Instances for non-critical workloads (up to 90% savings)

  • Choose right-sized instances (don’t over-provision)

  • Consider AMD instances (often cheaper than Intel)

2. Storage Optimization

  • Use S3 Intelligent Tiering for data

  • Delete temporary files and logs regularly

  • Use gp3 EBS volumes instead of gp2

3. Network Efficiency

  • Use same AZ for compute and storage to avoid data transfer costs

  • Minimize cross-region data transfer

4. Smart Scheduling

  • Use scheduled scaling for predictable workloads

  • Terminate instances when not in use

  • Use AWS Lambda for small, short-running tasks

5. Monitoring & Control

  • Set up cost alerts and budgets

  • Use AWS Cost Explorer to analyze spending

  • Monitor with CloudWatch to optimize resource usage

Resource Cleanup

[ ]:
def cleanup_aws_resources(instance_ids=None, security_group_ids=None):
    """
    Clean up AWS resources to avoid ongoing charges.

    Args:
        instance_ids: List of EC2 instance IDs to terminate
        security_group_ids: List of security group IDs to delete
    """
    ec2 = boto3.client('ec2')

    try:
        # Terminate instances
        if instance_ids:
            response = ec2.terminate_instances(InstanceIds=instance_ids)
            print(f"⏳ Terminating instances: {instance_ids}")

            # Wait for termination
            waiter = ec2.get_waiter('instance_terminated')
            waiter.wait(InstanceIds=instance_ids)
            print("✓ Instances terminated.")

        # Delete security groups
        if security_group_ids:
            for sg_id in security_group_ids:
                try:
                    ec2.delete_security_group(GroupId=sg_id)
                    print(f"✓ Deleted security group: {sg_id}")
                except Exception as e:
                    print(f"✗ Could not delete security group {sg_id}: {e}")

        print("✅ Cleanup completed!")

    except Exception as e:
        print(f"✗ Error during cleanup: {e}")

# Helper function to list your running instances
def list_running_instances():
    """List all running EC2 instances in your account."""
    ec2 = boto3.client('ec2')

    try:
        response = ec2.describe_instances(
            Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
        )

        instances = []
        for reservation in response['Reservations']:
            for instance in reservation['Instances']:
                name = next((tag['Value'] for tag in instance.get('Tags', []) if tag['Key'] == 'Name'), 'No Name')
                instances.append({
                    'InstanceId': instance['InstanceId'],
                    'Name': name,
                    'InstanceType': instance['InstanceType'],
                    'PublicIpAddress': instance.get('PublicIpAddress', 'No Public IP')
                })

        if instances:
            print("Running instances:")
            for inst in instances:
                print(f"  {inst['InstanceId']} ({inst['Name']}) - {inst['InstanceType']} - {inst['PublicIpAddress']}")
        else:
            print("No running instances found.")

        return instances

    except Exception as e:
        print(f"✗ Error listing instances: {e}")
        return []

# Example cleanup (uncomment and modify as needed)
# instances = list_running_instances()
# cleanup_aws_resources(
#     instance_ids=['i-1234567890abcdef0'],
#     security_group_ids=['sg-1234567890abcdef0']
# )

⚠️ Important: Clean Up Resources

Always remember to clean up AWS resources when you’re done to avoid ongoing charges! The cleanup function above helps automate this process.

Advanced Example: Distributed Machine Learning

[ ]:
@cluster(cores=4, memory="8GB", time="00:30:00")
def distributed_model_training(data_params, model_params):
    """
    Train a machine learning model on AWS with data from S3.

    Args:
        data_params: Dictionary with S3 bucket and key information
        model_params: Dictionary with model hyperparameters

    Returns:
        Dictionary with training results and model location
    """
    import numpy as np
    import boto3
    import pickle
    import io
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score
    from sklearn.model_selection import train_test_split

    # Download training data from S3
    s3 = boto3.client('s3')
    response = s3.get_object(
        Bucket=data_params['bucket'],
        Key=data_params['training_data_key']
    )
    data = pickle.loads(response['Body'].read())

    X, y = data['features'], data['labels']
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    # Train model
    model = RandomForestClassifier(**model_params)
    model.fit(X_train, y_train)

    # Evaluate
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)

    # Save model to S3
    model_buffer = io.BytesIO()
    pickle.dump(model, model_buffer)
    model_buffer.seek(0)

    s3.put_object(
        Bucket=data_params['bucket'],
        Key=data_params['model_output_key'],
        Body=model_buffer.getvalue()
    )

    return {
        'accuracy': accuracy,
        'model_location': f"s3://{data_params['bucket']}/{data_params['model_output_key']}",
        'training_samples': len(X_train),
        'test_samples': len(X_test)
    }

# Example usage:
# data_config = {
#     'bucket': 'your-ml-bucket',
#     'training_data_key': 'datasets/training_data.pkl',
#     'model_output_key': 'models/random_forest_model.pkl'
# }
#
# model_config = {
#     'n_estimators': 100,
#     'max_depth': 10,
#     'random_state': 42,
#     'n_jobs': -1
# }
#
# result = distributed_model_training(data_config, model_config)
# print(f"✓ Model trained with accuracy: {result['accuracy']:.4f}")
# print(f"✓ Model saved to: {result['model_location']}")
# print(f"✓ Training samples: {result['training_samples']:,}")
# print(f"✓ Test samples: {result['test_samples']:,}")

Summary

This tutorial covered:

  1. Setup: AWS credentials and Clustrix installation

  2. EC2 Integration: Direct instance configuration

  3. AWS Batch: Managed job scheduling

  4. ParallelCluster: HPC-optimized clusters

  5. S3 Integration: Data storage and retrieval

  6. Security: Best practices for safe deployment

  7. Cost Optimization: Strategies to minimize expenses

  8. Resource Management: Proper cleanup procedures

Next Steps

  • Set up your AWS credentials and test the basic configuration

  • Start with a simple EC2 instance for initial testing

  • Consider ParallelCluster for production HPC workloads

  • Implement proper monitoring and cost controls

  • Explore AWS Spot instances for cost-effective batch processing

Resources

Remember: Always monitor your AWS costs and clean up resources when not in use!