stouputils.parallel module#

This module provides utility functions for parallel processing, such as:

  • multiprocessing(): Execute a function in parallel using multiprocessing

  • multithreading(): Execute a function in parallel using multithreading

  • run_in_subprocess(): Execute a function in a subprocess with args and kwargs

I highly encourage you to read the function docstrings to understand when to use each method.

stouputils parallel examples
multiprocessing(
func: Callable[[...], R] | list[Callable[[...], R]],
args: list[T],
use_starmap: bool = False,
chunksize: int = 1,
desc: str = '',
max_workers: int = 4,
delay_first_calls: float = 0,
color: str = '\x1b[95m',
bar_format: str = '{l_bar}{bar}\x1b[95m| {n_fmt}/{total_fmt} [{rate_fmt}{postfix}, {elapsed}<{remaining}]\x1b[0m',
ascii: bool = False,
) list[R][source]#

Method to execute a function in parallel using multiprocessing

  • For CPU-bound operations where the GIL (Global Interpreter Lock) is a bottleneck.

  • When the task can be divided into smaller, independent sub-tasks that can be executed concurrently.

  • For computationally intensive tasks like scientific simulations, data analysis, or machine learning workloads.

Parameters:
  • func (Callable | list[Callable]) – Function to execute, or list of functions (one per argument)

  • args (list) – List of arguments to pass to the function(s)

  • use_starmap (bool) – Whether to use starmap or not (Defaults to False): True means the function will be called like func(*args[i]) instead of func(args[i])

  • chunksize (int) – Number of arguments to process at a time (Defaults to 1 for proper progress bar display)

  • desc (str) – Description displayed in the progress bar (if not provided no progress bar will be displayed)

  • max_workers (int) – Number of workers to use (Defaults to CPU_COUNT), -1 means CPU_COUNT

  • delay_first_calls (float) – Apply i*delay_first_calls seconds delay to the first “max_workers” calls. For instance, the first process will be delayed by 0 seconds, the second by 1 second, etc. (Defaults to 0): This can be useful to avoid functions being called in the same second.

  • color (str) – Color of the progress bar (Defaults to MAGENTA)

  • bar_format (str) – Format of the progress bar (Defaults to BAR_FORMAT)

  • ascii (bool) – Whether to use ASCII or Unicode characters for the progress bar

Returns:

Results of the function execution

Return type:

list[object]

Examples

> multiprocessing(doctest_square, args=[1, 2, 3])
[1, 4, 9]

> multiprocessing(int.__mul__, [(1,2), (3,4), (5,6)], use_starmap=True)
[2, 12, 30]

> # Using a list of functions (one per argument)
> multiprocessing([doctest_square, doctest_square, doctest_square], [1, 2, 3])
[1, 4, 9]

> # Will process in parallel with progress bar
> multiprocessing(doctest_slow, range(10), desc="Processing")
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

> # Will process in parallel with progress bar and delay the first threads
> multiprocessing(
.     doctest_slow,
.     range(10),
.     desc="Processing with delay",
.     max_workers=2,
.     delay_first_calls=0.6
. )
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
multithreading(
func: Callable[[...], R] | list[Callable[[...], R]],
args: list[T],
use_starmap: bool = False,
desc: str = '',
max_workers: int = 4,
delay_first_calls: float = 0,
color: str = '\x1b[95m',
bar_format: str = '{l_bar}{bar}\x1b[95m| {n_fmt}/{total_fmt} [{rate_fmt}{postfix}, {elapsed}<{remaining}]\x1b[0m',
ascii: bool = False,
) list[R][source]#

Method to execute a function in parallel using multithreading, you should use it:

  • For I/O-bound operations where the GIL is not a bottleneck, such as network requests or disk operations.

  • When the task involves waiting for external resources, such as network responses or user input.

  • For operations that involve a lot of waiting, such as GUI event handling or handling user input.

Parameters:
  • func (Callable | list[Callable]) – Function to execute, or list of functions (one per argument)

  • args (list) – List of arguments to pass to the function(s)

  • use_starmap (bool) – Whether to use starmap or not (Defaults to False): True means the function will be called like func(*args[i]) instead of func(args[i])

  • desc (str) – Description displayed in the progress bar (if not provided no progress bar will be displayed)

  • max_workers (int) – Number of workers to use (Defaults to CPU_COUNT), -1 means CPU_COUNT

  • delay_first_calls (float) – Apply i*delay_first_calls seconds delay to the first “max_workers” calls. For instance with value to 1, the first thread will be delayed by 0 seconds, the second by 1 second, etc. (Defaults to 0): This can be useful to avoid functions being called in the same second.

  • color (str) – Color of the progress bar (Defaults to MAGENTA)

  • bar_format (str) – Format of the progress bar (Defaults to BAR_FORMAT)

  • ascii (bool) – Whether to use ASCII or Unicode characters for the progress bar

Returns:

Results of the function execution

Return type:

list[object]

Examples

> multithreading(doctest_square, args=[1, 2, 3])
[1, 4, 9]

> multithreading(int.__mul__, [(1,2), (3,4), (5,6)], use_starmap=True)
[2, 12, 30]

> # Using a list of functions (one per argument)
> multithreading([doctest_square, doctest_square, doctest_square], [1, 2, 3])
[1, 4, 9]

> # Will process in parallel with progress bar
> multithreading(doctest_slow, range(10), desc="Threading")
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

> # Will process in parallel with progress bar and delay the first threads
> multithreading(
.     doctest_slow,
.     range(10),
.     desc="Threading with delay",
.     max_workers=2,
.     delay_first_calls=0.6
. )
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
run_in_subprocess(
func: Callable[[...], R],
*args: Any,
timeout: float | None = None,
**kwargs: Any,
) R[source]#

Execute a function in a subprocess with positional and keyword arguments.

This is useful when you need to run a function in isolation to avoid memory leaks, resource conflicts, or to ensure a clean execution environment. The subprocess will be created, run the function with the provided arguments, and return the result.

Parameters:
  • func (Callable) – The function to execute in a subprocess. (SHOULD BE A TOP-LEVEL FUNCTION TO BE PICKLABLE)

  • *args (Any) – Positional arguments to pass to the function.

  • timeout (float | None) – Maximum time in seconds to wait for the subprocess. If None, wait indefinitely. If the subprocess exceeds this time, it will be terminated.

  • **kwargs (Any) – Keyword arguments to pass to the function.

Returns:

The return value of the function.

Return type:

R

Raises:
  • RuntimeError – If the subprocess exits with a non-zero exit code or times out.

  • TimeoutError – If the subprocess exceeds the specified timeout.

Examples

> # Simple function execution
> run_in_subprocess(doctest_square, 5)
25

> # Function with multiple arguments
> def add(a: int, b: int) -> int:
.     return a + b
> run_in_subprocess(add, 10, 20)
30

> # Function with keyword arguments
> def greet(name: str, greeting: str = "Hello") -> str:
.     return f"{greeting}, {name}!"
> run_in_subprocess(greet, "World", greeting="Hi")
'Hi, World!'

> # With timeout to prevent hanging
> run_in_subprocess(some_gpu_func, data, timeout=300.0)
_subprocess_wrapper(
result_queue: Any,
func: Callable[[...], R],
args: tuple[Any, ...],
kwargs: dict[str, Any],
) None[source]#

Wrapper function to execute the target function and store the result in the queue.

Must be at module level to be pickable on Windows (spawn context).

Parameters:
  • result_queue (multiprocessing.Queue) – Queue to store the result or exception.

  • func (Callable) – The target function to execute.

  • args (tuple) – Positional arguments for the function.

  • kwargs (dict) – Keyword arguments for the function.

_starmap(
args: tuple[Callable[[T], R], list[T]],
) R[source]#

Private function to use starmap using args[0](*args[1])

Parameters:

args (tuple) – Tuple containing the function and the arguments list to pass to the function

Returns:

Result of the function execution

Return type:

object

_delayed_call(
args: tuple[Callable[[T], R], float, T],
) R[source]#

Private function to apply delay before calling the target function

Parameters:

args (tuple) – Tuple containing the function, delay in seconds, and the argument to pass to the function

Returns:

Result of the function execution

Return type:

object

_handle_parameters(
func: Callable[[T], R] | list[Callable[[T], R]],
args: list[T],
use_starmap: bool,
delay_first_calls: float,
max_workers: int,
desc: str,
color: str,
) tuple[str, Callable[[T], R], list[T]][source]#

Private function to handle the parameters for multiprocessing or multithreading functions

Parameters:
  • func (Callable | list[Callable]) – Function to execute, or list of functions (one per argument)

  • args (list) – List of arguments to pass to the function(s)

  • use_starmap (bool) – Whether to use starmap or not (Defaults to False): True means the function will be called like func(*args[i]) instead of func(args[i])

  • delay_first_calls (int) – Apply i*delay_first_calls seconds delay to the first “max_workers” calls. For instance, the first process will be delayed by 0 seconds, the second by 1 second, etc. (Defaults to 0): This can be useful to avoid functions being called in the same second.

  • max_workers (int) – Number of workers to use (Defaults to CPU_COUNT)

  • desc (str) – Description of the function execution displayed in the progress bar

  • color (str) – Color of the progress bar

Returns:

Tuple containing the description, function, and arguments

Return type:

tuple[str, Callable[[T], R], list[T]]