stouputils.mlflow.process_metrics_monitor module#
- class ProcessMetricsMonitor(
- pid: int | None = None,
- children: bool = True,
- sampling_interval: float = 10.0,
- samples_before_logging: int = 1,
- prefix: str = 'process/',
- verbose: bool = False,
- max_memory_megabytes: float | None = None,
- max_cpu_count: float | None = None,
Bases:
AbstractBothContextManager[ProcessMetricsMonitor]Monitor that collects CPU, memory, I/O, and thread metrics for a specific process and (optionally) all its children, then logs them to MLflow.
This is the per-process counterpart of MLflow’s built-in
log_system_metrics=Truewhich only captures system-wide metrics. Here every metric is scoped to the process tree rooted at pid.Metrics collected (all prefixed with
process/):cpu_usage_percentage- cumulative CPU % (sum over the tree)memory_rss_megabytes- resident set size in MBmemory_vms_megabytes- virtual memory size in MBmemory_uss_megabytes- unique set size in MB (Linux only, falls back to RSS)memory_usage_percentage- RSS as % of total available RAM (see max_memory_megabytes)num_threads- total thread count across the treenum_fds- total open file descriptors (Linux only, 0 on other OS)io_read_megabytes- cumulative bytes read in MB (since process start)io_write_megabytes- cumulative bytes written in MB (since process start)
- Parameters:
pid (int) – PID of the root process to monitor. Defaults to the current process (
os.getpid()).children (bool) – Whether to include child processes (recursively) in the metrics. Defaults to True.
sampling_interval (float) – Seconds between each sample collection. Defaults to 10.
samples_before_logging (int) – Number of samples to average before logging. Defaults to 1.
prefix (str) – Metric name prefix. Defaults to
"process/".verbose (bool) – Whether to log verbose debug messages. Defaults to False.
max_memory_megabytes (float) – Override the total memory in MB used to compute
memory_usage_percentage. Useful in containerized environments (e.g. Kubernetes pods) wherepsutilreports the host’s total RAM instead of the container’s limit. Defaults toNone(use system total).max_cpu_count (float) – Override the number of CPUs used to normalise
cpu_usage_percentage. For example, set to8.0when a pod is limited to 8 cores on a 128-core host. Defaults toNone(useos.cpu_count()).
Examples
> import mlflow > from stouputils.mlflow.process_metrics_monitor import ProcessMetricsMonitor > mlflow.set_experiment("my_experiment") > with mlflow.start_run(): . monitor = ProcessMetricsMonitor(pid=12345, children=True, sampling_interval=5) . monitor.start() . # ... do heavy work ... . monitor.finish()
Or as a context manager:
> import mlflow > from stouputils.mlflow.process_metrics_monitor import ProcessMetricsMonitor > mlflow.set_experiment("my_experiment") > with mlflow.start_run(): . with ProcessMetricsMonitor(pid=12345): . # ... do heavy work ... . pass
- pid: int#
PID of the root process to monitor.
- children: bool#
Whether to include child processes recursively.
- sampling_interval: float#
Seconds between each sample collection.
- samples_before_logging: int#
Number of samples to average before logging.
- prefix: str#
Metric name prefix.
- verbose: bool#
Whether to log verbose debug messages.
- max_memory_megabytes: float#
Total memory in MB used as the denominator for
memory_usage_percentage.
- max_cpu_count: float#
Number of CPUs used to normalise
cpu_usage_percentage(psutil returns per-core %).
- run_id: str | None#
MLflow run ID captured at start time, ensures metrics are logged to the correct run from the daemon thread.
- shutdown_event: Event#
Event used to signal the monitoring thread to stop.
- thread: Thread | None#
Reference to the monitoring daemon thread.
- step: int#
Current logging step counter.
- _abc_impl = <_abc._abc_data object>#
- samples: list[dict[str, float]]#
Buffer of collected metric samples waiting to be aggregated.
- processes: dict[int, Process]#
Persistent cache of monitored psutil.Process objects keyed by PID. Keeping the same objects across calls is required so that cpu_percent() has a non-zero interval to measure against (first call always returns 0).
- collect_once() dict[str, float][source]#
Collect one snapshot of metrics for the process tree.
- Returns:
A dictionary of metric names to values.
- Return type:
dict[str, float]
- aggregate(
- samples: list[dict[str, float]],
Average the collected samples.
- Parameters:
samples (list[dict[str, float]]) – List of metric dictionaries.
- Returns:
A dictionary of averaged metric values.
- Return type:
dict[str, float]