Source code for stouputils.parallel.common


# Imports
import os
import time
from collections.abc import Callable
from typing import cast

# Constants
CPU_COUNT: int = cast(int, os.cpu_count())


# "Private" function to wrap function execution with nice priority (must be at module level for pickling)
[docs] def nice_wrapper[T, R](args: tuple[int, Callable[[T], R], T]) -> R: """ Wrapper that applies nice priority then executes the function. Args: args (tuple): Tuple containing (nice_value, func, arg) Returns: R: Result of the function execution """ nice_value, func, arg = args set_process_priority(nice_value) return func(arg)
# "Private" function to set process priority (must be at module level for pickling on Windows)
[docs] def set_process_priority(nice_value: int) -> None: """ Set the priority of the current process. Args: nice_value (int): Unix-style priority value (-20 to 19) """ try: import sys if sys.platform == "win32": # Map Unix nice values to Windows priority classes # -20 to -10: HIGH, -9 to -1: ABOVE_NORMAL, 0: NORMAL, 1-9: BELOW_NORMAL, 10-19: IDLE import ctypes # Windows priority class constants if nice_value <= -10: priority = 0x00000080 # HIGH_PRIORITY_CLASS elif nice_value < 0: priority = 0x00008000 # ABOVE_NORMAL_PRIORITY_CLASS elif nice_value == 0: priority = 0x00000020 # NORMAL_PRIORITY_CLASS elif nice_value < 10: priority = 0x00004000 # BELOW_NORMAL_PRIORITY_CLASS else: priority = 0x00000040 # IDLE_PRIORITY_CLASS kernel32 = ctypes.windll.kernel32 handle = kernel32.GetCurrentProcess() kernel32.SetPriorityClass(handle, priority) else: # Unix-like systems os.nice(nice_value) except Exception: pass # Silently ignore if we can't set priority
# "Private" function to use starmap using args[0](*args[1])
[docs] def starmap[T, R](args: tuple[Callable[[T], R], list[T]]) -> R: r""" Private function to use starmap using args[0](\*args[1]) Args: args (tuple): Tuple containing the function and the arguments list to pass to the function Returns: object: Result of the function execution """ func, arguments = args return func(*arguments)
# "Private" function to apply delay before calling the target function
[docs] def delayed_call[T, R](args: tuple[Callable[[T], R], float, T]) -> R: """ Private function to apply delay before calling the target function Args: args (tuple): Tuple containing the function, delay in seconds, and the argument to pass to the function Returns: object: Result of the function execution """ func, delay, arg = args time.sleep(delay) return func(arg)
# "Private" function to handle parameters for multiprocessing or multithreading functions
[docs] def handle_parameters[T, R]( func: Callable[[T], R] | list[Callable[[T], R]], args: list[T], use_starmap: bool, delay_first_calls: float, max_workers: int, desc: str, color: str ) -> tuple[str, Callable[[T], R], list[T]]: r""" Private function to handle the parameters for multiprocessing or multithreading functions Args: func (Callable | list[Callable]): Function to execute, or list of functions (one per argument) args (list): List of arguments to pass to the function(s) use_starmap (bool): Whether to use starmap or not (Defaults to False): True means the function will be called like func(\*args[i]) instead of func(args[i]) delay_first_calls (int): Apply i*delay_first_calls seconds delay to the first "max_workers" calls. For instance, the first process will be delayed by 0 seconds, the second by 1 second, etc. (Defaults to 0): This can be useful to avoid functions being called in the same second. max_workers (int): Number of workers to use desc (str): Description of the function execution displayed in the progress bar color (str): Color of the progress bar Returns: tuple[str, Callable[[T], R], list[T]]: Tuple containing the description, function, and arguments """ desc = color + desc # Handle list of functions: validate and convert to starmap format if isinstance(func, list): func = cast(list[Callable[[T], R]], func) assert len(func) == len(args), f"Length mismatch: {len(func)} functions but {len(args)} arguments" args = [(f, arg if use_starmap else (arg,)) for f, arg in zip(func, args, strict=False)] # type: ignore func = starmap # type: ignore # If use_starmap is True, we use the _starmap function elif use_starmap: args = [(func, arg) for arg in args] # type: ignore func = starmap # type: ignore # Prepare delayed function calls if delay_first_calls is set if delay_first_calls > 0: args = [ (func, i * delay_first_calls if i < max_workers else 0, arg) # type: ignore for i, arg in enumerate(args) ] func = delayed_call # type: ignore return desc, func, args # type: ignore