Local Executor API¶
The Local Executor enables multi-core parallel execution on the local machine using multiprocessing or threading.
Local parallel execution using multiprocessing and threading.
- class clustrix.local_executor.LocalExecutor(max_workers=None, use_threads=False)[source]¶
Bases:
objectExecute functions locally using multiprocessing or threading.
- execute_parallel(func, work_chunks, timeout=None)[source]¶
Execute function in parallel with different work chunks.
- execute_loop_parallel(func, loop_var, iterable, func_args=(), func_kwargs=None, chunk_size=None)[source]¶
Execute a function in parallel over an iterable using intelligent chunking.
This method implements a sophisticated chunk-based parallelization strategy that handles the complex challenge of distributing iterable items across multiple workers while preserving order and handling functions that expect individual items rather than chunks.
Algorithm:
Chunking: Splits the iterable into approximately equal chunks based on worker count and optional chunk_size parameter.
Chunk Processing: Creates a wrapper function that processes each chunk by iterating over individual items and calling the original function.
Result Flattening: Collects results from all chunks and flattens them while preserving the original iteration order.
Key Innovation: The chunk processor wrapper bridges the gap between chunk-based parallel execution and functions that expect individual items.
Example Workflow:
` range(5) with chunk_size=2 → chunks: [[0,1], [2,3], [4]] Each chunk processed: func(args, loop_var=0), func(args, loop_var=1), etc. Results flattened: [result0, result1, result2, result3, result4] `- Parameters:
func (
Callable) – Function to execute for each item. The function will be called with func_args + the item as the loop_var keyword argument.loop_var (
str) – Name of the keyword argument that will receive each item from the iterable.iterable (
Union[range,List,tuple]) – Iterable to process in parallel (range, list, tuple, etc.).func_args (
tuple) – Additional positional arguments passed to func for each call.func_kwargs (
Optional[Dict[Any,Any]]) – Additional keyword arguments passed to func for each call. Note: loop_var will be added/overridden in these kwargs.chunk_size (
Optional[int]) – Optional size of each work chunk. If None, automatically calculated as len(items) // max_workers.
- Returns:
- Results from function execution in the same order as the
original iterable. Each element corresponds to func() called with the respective item from the iterable.
- Return type:
List[Any]
Examples
>>> executor = LocalExecutor(max_workers=4) >>> def square(x): ... return x ** 2 >>> >>> # Parallel computation over range >>> results = executor.execute_loop_parallel( ... square, 'x', range(10), chunk_size=3 ... ) >>> # Results: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> # With additional arguments >>> def power(base, x, exp=2): ... return (base + x) ** exp >>> results = executor.execute_loop_parallel( ... power, 'x', [1, 2, 3], func_args=(10,), func_kwargs={'exp': 3} ... ) >>> # Results: [11^3, 12^3, 13^3] = [1331, 1728, 2197]
- Raises:
Exception – Any exception raised by the function during execution.
Note
Empty iterables return empty lists immediately
Single-item iterables are executed directly (no parallelization overhead)
Chunk size is automatically bounded by the number of available workers
The method preserves the original order of results
- clustrix.local_executor.choose_executor_type(func, args, kwargs)[source]¶
Intelligently choose between ThreadPoolExecutor and ProcessPoolExecutor based on function characteristics.
This function implements a sophisticated analysis to determine the optimal execution model for parallel function execution. The decision tree prioritizes pickling constraints first, then analyzes function characteristics to detect I/O-bound vs CPU-bound workloads.
Decision Logic (in priority order):
Pickling Check (Highest Priority): Functions and arguments must be picklable for multiprocessing. If any component fails pickling, threads are used.
I/O Detection: Source code analysis looks for common I/O patterns that benefit from threads due to GIL release during I/O operations.
Default: CPU-bound tasks default to processes for true parallelism.
Key Insight: inspect.getsource(lambda x: x) succeeds but pickle.dumps(lambda x: x) fails, so pickling must be checked before source analysis.
- Parameters:
- Returns:
- True to use ThreadPoolExecutor (for I/O-bound or unpicklable functions),
False to use ProcessPoolExecutor (for CPU-bound, picklable functions)
- Return type:
Examples
>>> # Lambda function (unpicklable) -> threads >>> choose_executor_type(lambda x: x*2, (5,), {}) True
>>> # I/O function -> threads >>> def io_func(filename): ... with open(filename, 'r') as f: ... return f.read() >>> choose_executor_type(io_func, ("file.txt",), {}) True
>>> # CPU function -> processes >>> def cpu_func(n): ... return sum(i**2 for i in range(n)) >>> choose_executor_type(cpu_func, (1000,), {}) False
Note
This function performs static analysis and may not catch all edge cases. The analysis is designed to be conservative - when in doubt, it defaults to the safer option for the detected pattern.
- clustrix.local_executor.create_local_executor(max_workers=None, use_threads=None, func=None, args=(), kwargs=None)[source]¶
Create a LocalExecutor with appropriate settings.
- Parameters:
- Return type:
- Returns:
Configured LocalExecutor
Usage Examples¶
Basic Local Execution¶
from clustrix.local_executor import LocalExecutor
def compute_square(x):
return x ** 2
with LocalExecutor(max_workers=4) as executor:
# Execute single function
result = executor.execute_single(compute_square, (5,), {})
print(result) # 25
# Execute multiple work chunks
work_chunks = [
{'args': (i,), 'kwargs': {}}
for i in range(10)
]
results = executor.execute_parallel(compute_square, work_chunks)
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Auto-Detection of Executor Type¶
from clustrix.local_executor import create_local_executor
# I/O-bound function - will use threads
def download_file(url):
import requests
return requests.get(url).content
# CPU-bound function - will use processes
def compute_heavy(n):
return sum(i**2 for i in range(n))
# Auto-detect appropriate executor
io_executor = create_local_executor(func=download_file)
cpu_executor = create_local_executor(func=compute_heavy)
Loop Parallelization¶
from clustrix.local_executor import LocalExecutor
def process_data(data_chunk):
return [x * 2 for x in data_chunk]
data = list(range(100))
with LocalExecutor(max_workers=4) as executor:
results = executor.execute_loop_parallel(
func=process_data,
loop_var='data_chunk',
iterable=data,
chunk_size=25
)
print(len(results)) # 200
Choosing Threads vs Processes¶
The executor automatically chooses between threads and processes based on:
Use Threads When: - Function performs I/O operations (file, network, database) - Objects cannot be pickled (closures, lambdas) - Shared memory access is needed
Use Processes When: - Function is CPU-intensive - True parallelism is required - Objects can be pickled safely
You can override the auto-detection:
# Force thread usage
thread_executor = LocalExecutor(use_threads=True)
# Force process usage
process_executor = LocalExecutor(use_threads=False)
Performance Considerations¶
Process Pool Overhead: - Higher memory usage (separate Python interpreters) - Serialization overhead for arguments and results - Slower startup time
Thread Pool Overhead: - Shared memory (lower memory usage) - GIL limitations for CPU-bound tasks - Faster startup time
Optimal Worker Count:
- CPU-bound: os.cpu_count()
- I/O-bound: os.cpu_count() * 2-4
- Custom: Based on your specific workload