stouputils.parallel module#
This module provides utility functions for parallel processing, such as:
multiprocessing(): Execute a function in parallel using multiprocessing
multithreading(): Execute a function in parallel using multithreading
I highly encourage you to read the function docstrings to understand when to use each method.
- multiprocessing(
- func: Callable[[T], R],
- args: list[T],
- use_starmap: bool = False,
- chunksize: int = 1,
- desc: str = '',
- max_workers: int = 4,
- delay_first_calls: float = 0,
- color: str = '\x1b[95m',
- bar_format: str = '{l_bar}{bar}\x1b[95m| {n_fmt}/{total_fmt} [{rate_fmt}{postfix}, {elapsed}<{remaining}]\x1b[0m',
- ascii: bool = False,
Method to execute a function in parallel using multiprocessing, you should use it:
For CPU-bound operations where the GIL (Global Interpreter Lock) is a bottleneck.
When the task can be divided into smaller, independent sub-tasks that can be executed concurrently.
For computationally intensive tasks like scientific simulations, data analysis, or machine learning workloads.
- Parameters:
func (Callable) – Function to execute
args (list) – List of arguments to pass to the function
use_starmap (bool) – Whether to use starmap or not (Defaults to False): True means the function will be called like func(*args[i]) instead of func(args[i])
chunksize (int) – Number of arguments to process at a time (Defaults to 1 for proper progress bar display)
desc (str) – Description displayed in the progress bar (if not provided no progress bar will be displayed)
max_workers (int) – Number of workers to use (Defaults to CPU_COUNT)
delay_first_calls (float) – Apply i*delay_first_calls seconds delay to the first “max_workers” calls. For instance, the first process will be delayed by 0 seconds, the second by 1 second, etc. (Defaults to 0): This can be useful to avoid functions being called in the same second.
color (str) – Color of the progress bar (Defaults to MAGENTA)
bar_format (str) – Format of the progress bar (Defaults to BAR_FORMAT)
ascii (bool) – Whether to use ASCII or Unicode characters for the progress bar
- Returns:
Results of the function execution
- Return type:
list[object]
Examples
> multiprocessing(doctest_square, args=[1, 2, 3]) [1, 4, 9] > multiprocessing(int.__mul__, [(1,2), (3,4), (5,6)], use_starmap=True) [2, 12, 30] > # Will process in parallel with progress bar > multiprocessing(doctest_slow, range(10), desc="Processing") [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] > # Will process in parallel with progress bar and delay the first threads > multiprocessing( . doctest_slow, . range(10), . desc="Processing with delay", . max_workers=2, . delay_first_calls=0.6 . ) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- multithreading(
- func: Callable[[T], R],
- args: list[T],
- use_starmap: bool = False,
- desc: str = '',
- max_workers: int = 4,
- delay_first_calls: float = 0,
- color: str = '\x1b[95m',
- bar_format: str = '{l_bar}{bar}\x1b[95m| {n_fmt}/{total_fmt} [{rate_fmt}{postfix}, {elapsed}<{remaining}]\x1b[0m',
- ascii: bool = False,
Method to execute a function in parallel using multithreading, you should use it:
For I/O-bound operations where the GIL is not a bottleneck, such as network requests or disk operations.
When the task involves waiting for external resources, such as network responses or user input.
For operations that involve a lot of waiting, such as GUI event handling or handling user input.
- Parameters:
func (Callable) – Function to execute
args (list) – List of arguments to pass to the function
use_starmap (bool) – Whether to use starmap or not (Defaults to False): True means the function will be called like func(*args[i]) instead of func(args[i])
desc (str) – Description displayed in the progress bar (if not provided no progress bar will be displayed)
max_workers (int) – Number of workers to use (Defaults to CPU_COUNT)
delay_first_calls (float) – Apply i*delay_first_calls seconds delay to the first “max_workers” calls. For instance with value to 1, the first thread will be delayed by 0 seconds, the second by 1 second, etc. (Defaults to 0): This can be useful to avoid functions being called in the same second.
color (str) – Color of the progress bar (Defaults to MAGENTA)
bar_format (str) – Format of the progress bar (Defaults to BAR_FORMAT)
ascii (bool) – Whether to use ASCII or Unicode characters for the progress bar
- Returns:
Results of the function execution
- Return type:
list[object]
Examples
> multithreading(doctest_square, args=[1, 2, 3]) [1, 4, 9] > multithreading(int.__mul__, [(1,2), (3,4), (5,6)], use_starmap=True) [2, 12, 30] > # Will process in parallel with progress bar > multithreading(doctest_slow, range(10), desc="Threading") [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] > # Will process in parallel with progress bar and delay the first threads > multithreading( . doctest_slow, . range(10), . desc="Threading with delay", . max_workers=2, . delay_first_calls=0.6 . ) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- _starmap(
- args: tuple[Callable[[T], R], list[T]],
Private function to use starmap using args[0](*args[1])
- Parameters:
args (tuple) – Tuple containing the function and the arguments list to pass to the function
- Returns:
Result of the function execution
- Return type:
object
- _delayed_call(
- args: tuple[Callable[[T], R], float, T],
Private function to apply delay before calling the target function
- Parameters:
args (tuple) – Tuple containing the function, delay in seconds, and the argument to pass to the function
- Returns:
Result of the function execution
- Return type:
object
- _handle_parameters(
- func: Callable[[T], R],
- args: list[T],
- use_starmap: bool,
- delay_first_calls: float,
- max_workers: int,
- desc: str,
- color: str,
Private function to handle the parameters for multiprocessing or multithreading functions
- Parameters:
func (Callable) – Function to execute
args (list) – List of arguments to pass to the function
use_starmap (bool) – Whether to use starmap or not (Defaults to False): True means the function will be called like func(*args[i]) instead of func(args[i])
delay_first_calls (int) – Apply i*delay_first_calls seconds delay to the first “max_workers” calls. For instance, the first process will be delayed by 0 seconds, the second by 1 second, etc. (Defaults to 0): This can be useful to avoid functions being called in the same second.
max_workers (int) – Number of workers to use (Defaults to CPU_COUNT)
desc (str) – Description of the function execution displayed in the progress bar
color (str) – Color of the progress bar
- Returns:
Tuple containing the description, function, and arguments
- Return type:
tuple[str, Callable[[T], R], list[T]]