Local Executor API

The Local Executor enables multi-core parallel execution on the local machine using multiprocessing or threading.

Local parallel execution using multiprocessing and threading.

class clustrix.local_executor.LocalExecutor(max_workers=None, use_threads=False)[source]

Bases: object

Execute functions locally using multiprocessing or threading.

__init__(max_workers=None, use_threads=False)[source]

Initialize local executor.

Parameters:
  • max_workers (Optional[int]) – Maximum number of worker processes/threads

  • use_threads (bool) – If True, use ThreadPoolExecutor, else ProcessPoolExecutor

__enter__()[source]

Context manager entry.

__exit__(exc_type, exc_val, exc_tb)[source]

Context manager exit.

execute_single(func, args, kwargs)[source]

Execute a single function call locally.

Parameters:
  • func (Callable) – Function to execute

  • args (tuple) – Positional arguments

  • kwargs (dict) – Keyword arguments

Return type:

Any

Returns:

Function result

execute_parallel(func, work_chunks, timeout=None)[source]

Execute function in parallel with different work chunks.

Parameters:
  • func (Callable) – Function to execute

  • work_chunks (List[Dict[str, Any]]) – List of work chunks, each containing args and kwargs

  • timeout (Optional[float]) – Timeout for each task in seconds

Return type:

List[Any]

Returns:

List of results in order of work_chunks

execute_loop_parallel(func, loop_var, iterable, func_args=(), func_kwargs=None, chunk_size=None)[source]

Execute a function in parallel over an iterable using intelligent chunking.

This method implements a sophisticated chunk-based parallelization strategy that handles the complex challenge of distributing iterable items across multiple workers while preserving order and handling functions that expect individual items rather than chunks.

Algorithm:

  1. Chunking: Splits the iterable into approximately equal chunks based on worker count and optional chunk_size parameter.

  2. Chunk Processing: Creates a wrapper function that processes each chunk by iterating over individual items and calling the original function.

  3. Result Flattening: Collects results from all chunks and flattens them while preserving the original iteration order.

Key Innovation: The chunk processor wrapper bridges the gap between chunk-based parallel execution and functions that expect individual items.

Example Workflow: ` range(5) with chunk_size=2 chunks: [[0,1], [2,3], [4]] Each chunk processed: func(args, loop_var=0), func(args, loop_var=1), etc. Results flattened: [result0, result1, result2, result3, result4] `

Parameters:
  • func (Callable) – Function to execute for each item. The function will be called with func_args + the item as the loop_var keyword argument.

  • loop_var (str) – Name of the keyword argument that will receive each item from the iterable.

  • iterable (Union[range, List, tuple]) – Iterable to process in parallel (range, list, tuple, etc.).

  • func_args (tuple) – Additional positional arguments passed to func for each call.

  • func_kwargs (Optional[Dict[Any, Any]]) – Additional keyword arguments passed to func for each call. Note: loop_var will be added/overridden in these kwargs.

  • chunk_size (Optional[int]) – Optional size of each work chunk. If None, automatically calculated as len(items) // max_workers.

Returns:

Results from function execution in the same order as the

original iterable. Each element corresponds to func() called with the respective item from the iterable.

Return type:

List[Any]

Examples

>>> executor = LocalExecutor(max_workers=4)
>>> def square(x):
...     return x ** 2
>>>
>>> # Parallel computation over range
>>> results = executor.execute_loop_parallel(
...     square, 'x', range(10), chunk_size=3
... )
>>> # Results: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> # With additional arguments
>>> def power(base, x, exp=2):
...     return (base + x) ** exp
>>> results = executor.execute_loop_parallel(
...     power, 'x', [1, 2, 3], func_args=(10,), func_kwargs={'exp': 3}
... )
>>> # Results: [11^3, 12^3, 13^3] = [1331, 1728, 2197]
Raises:

Exception – Any exception raised by the function during execution.

Note

  • Empty iterables return empty lists immediately

  • Single-item iterables are executed directly (no parallelization overhead)

  • Chunk size is automatically bounded by the number of available workers

  • The method preserves the original order of results

clustrix.local_executor.choose_executor_type(func, args, kwargs)[source]

Intelligently choose between ThreadPoolExecutor and ProcessPoolExecutor based on function characteristics.

This function implements a sophisticated analysis to determine the optimal execution model for parallel function execution. The decision tree prioritizes pickling constraints first, then analyzes function characteristics to detect I/O-bound vs CPU-bound workloads.

Decision Logic (in priority order):

  1. Pickling Check (Highest Priority): Functions and arguments must be picklable for multiprocessing. If any component fails pickling, threads are used.

  2. I/O Detection: Source code analysis looks for common I/O patterns that benefit from threads due to GIL release during I/O operations.

  3. Default: CPU-bound tasks default to processes for true parallelism.

Key Insight: inspect.getsource(lambda x: x) succeeds but pickle.dumps(lambda x: x) fails, so pickling must be checked before source analysis.

Parameters:
  • func (Callable) – Function to analyze for execution characteristics

  • args (tuple) – Positional arguments that will be passed to the function

  • kwargs (dict) – Keyword arguments that will be passed to the function

Returns:

True to use ThreadPoolExecutor (for I/O-bound or unpicklable functions),

False to use ProcessPoolExecutor (for CPU-bound, picklable functions)

Return type:

bool

Examples

>>> # Lambda function (unpicklable) -> threads
>>> choose_executor_type(lambda x: x*2, (5,), {})
True
>>> # I/O function -> threads
>>> def io_func(filename):
...     with open(filename, 'r') as f:
...         return f.read()
>>> choose_executor_type(io_func, ("file.txt",), {})
True
>>> # CPU function -> processes
>>> def cpu_func(n):
...     return sum(i**2 for i in range(n))
>>> choose_executor_type(cpu_func, (1000,), {})
False

Note

This function performs static analysis and may not catch all edge cases. The analysis is designed to be conservative - when in doubt, it defaults to the safer option for the detected pattern.

clustrix.local_executor.create_local_executor(max_workers=None, use_threads=None, func=None, args=(), kwargs=None)[source]

Create a LocalExecutor with appropriate settings.

Parameters:
  • max_workers (Optional[int]) – Maximum number of workers

  • use_threads (Optional[bool]) – Force thread or process usage

  • func (Optional[Callable]) – Function to analyze for executor type selection

  • args (tuple) – Function arguments for analysis

  • kwargs (Optional[Dict[Any, Any]]) – Function keyword arguments for analysis

Return type:

LocalExecutor

Returns:

Configured LocalExecutor

Usage Examples

Basic Local Execution

from clustrix.local_executor import LocalExecutor

def compute_square(x):
    return x ** 2

with LocalExecutor(max_workers=4) as executor:
    # Execute single function
    result = executor.execute_single(compute_square, (5,), {})
    print(result)  # 25

    # Execute multiple work chunks
    work_chunks = [
        {'args': (i,), 'kwargs': {}}
        for i in range(10)
    ]
    results = executor.execute_parallel(compute_square, work_chunks)
    print(results)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Auto-Detection of Executor Type

from clustrix.local_executor import create_local_executor

# I/O-bound function - will use threads
def download_file(url):
    import requests
    return requests.get(url).content

# CPU-bound function - will use processes
def compute_heavy(n):
    return sum(i**2 for i in range(n))

# Auto-detect appropriate executor
io_executor = create_local_executor(func=download_file)
cpu_executor = create_local_executor(func=compute_heavy)

Loop Parallelization

from clustrix.local_executor import LocalExecutor

def process_data(data_chunk):
    return [x * 2 for x in data_chunk]

data = list(range(100))

with LocalExecutor(max_workers=4) as executor:
    results = executor.execute_loop_parallel(
        func=process_data,
        loop_var='data_chunk',
        iterable=data,
        chunk_size=25
    )
    print(len(results))  # 200

Choosing Threads vs Processes

The executor automatically chooses between threads and processes based on:

Use Threads When: - Function performs I/O operations (file, network, database) - Objects cannot be pickled (closures, lambdas) - Shared memory access is needed

Use Processes When: - Function is CPU-intensive - True parallelism is required - Objects can be pickled safely

You can override the auto-detection:

# Force thread usage
thread_executor = LocalExecutor(use_threads=True)

# Force process usage
process_executor = LocalExecutor(use_threads=False)

Performance Considerations

Process Pool Overhead: - Higher memory usage (separate Python interpreters) - Serialization overhead for arguments and results - Slower startup time

Thread Pool Overhead: - Shared memory (lower memory usage) - GIL limitations for CPU-bound tasks - Faster startup time

Optimal Worker Count: - CPU-bound: os.cpu_count() - I/O-bound: os.cpu_count() * 2-4 - Custom: Based on your specific workload