stouputils.mlflow.process_metrics_monitor module#

class ProcessMetricsMonitor(
pid: int | None = None,
children: bool = True,
sampling_interval: float = 10.0,
samples_before_logging: int = 1,
prefix: str = 'process/',
verbose: bool = False,
max_memory_megabytes: float | None = None,
max_cpu_count: float | None = None,
)[source]#

Bases: AbstractBothContextManager[ProcessMetricsMonitor]

Monitor that collects CPU, memory, I/O, and thread metrics for a specific process and (optionally) all its children, then logs them to MLflow.

This is the per-process counterpart of MLflow’s built-in log_system_metrics=True which only captures system-wide metrics. Here every metric is scoped to the process tree rooted at pid.

Metrics collected (all prefixed with process/):

  • cpu_usage_percentage - cumulative CPU % (sum over the tree)

  • memory_rss_megabytes - resident set size in MB

  • memory_vms_megabytes - virtual memory size in MB

  • memory_uss_megabytes - unique set size in MB (Linux only, falls back to RSS)

  • memory_usage_percentage - RSS as % of total available RAM (see max_memory_megabytes)

  • num_threads - total thread count across the tree

  • num_fds - total open file descriptors (Linux only, 0 on other OS)

  • io_read_megabytes - cumulative bytes read in MB (since process start)

  • io_write_megabytes - cumulative bytes written in MB (since process start)

Parameters:
  • pid (int) – PID of the root process to monitor. Defaults to the current process (os.getpid()).

  • children (bool) – Whether to include child processes (recursively) in the metrics. Defaults to True.

  • sampling_interval (float) – Seconds between each sample collection. Defaults to 10.

  • samples_before_logging (int) – Number of samples to average before logging. Defaults to 1.

  • prefix (str) – Metric name prefix. Defaults to "process/".

  • verbose (bool) – Whether to log verbose debug messages. Defaults to False.

  • max_memory_megabytes (float) – Override the total memory in MB used to compute memory_usage_percentage. Useful in containerized environments (e.g. Kubernetes pods) where psutil reports the host’s total RAM instead of the container’s limit. Defaults to None (use system total).

  • max_cpu_count (float) – Override the number of CPUs used to normalise cpu_usage_percentage. For example, set to 8.0 when a pod is limited to 8 cores on a 128-core host. Defaults to None (use os.cpu_count()).

Examples

> import mlflow
> from stouputils.mlflow.process_metrics_monitor import ProcessMetricsMonitor
> mlflow.set_experiment("my_experiment")
> with mlflow.start_run():
.     monitor = ProcessMetricsMonitor(pid=12345, children=True, sampling_interval=5)
.     monitor.start()
.     # ... do heavy work ...
.     monitor.finish()

Or as a context manager:

> import mlflow
> from stouputils.mlflow.process_metrics_monitor import ProcessMetricsMonitor
> mlflow.set_experiment("my_experiment")
> with mlflow.start_run():
.     with ProcessMetricsMonitor(pid=12345):
.         # ... do heavy work ...
.         pass
pid: int#

PID of the root process to monitor.

children: bool#

Whether to include child processes recursively.

sampling_interval: float#

Seconds between each sample collection.

samples_before_logging: int#

Number of samples to average before logging.

prefix: str#

Metric name prefix.

verbose: bool#

Whether to log verbose debug messages.

max_memory_megabytes: float#

Total memory in MB used as the denominator for memory_usage_percentage.

max_cpu_count: float#

Number of CPUs used to normalise cpu_usage_percentage (psutil returns per-core %).

run_id: str | None#

MLflow run ID captured at start time, ensures metrics are logged to the correct run from the daemon thread.

shutdown_event: Event#

Event used to signal the monitoring thread to stop.

thread: Thread | None#

Reference to the monitoring daemon thread.

step: int#

Current logging step counter.

_abc_impl = <_abc._abc_data object>#
samples: list[dict[str, float]]#

Buffer of collected metric samples waiting to be aggregated.

processes: dict[int, Process]#

Persistent cache of monitored psutil.Process objects keyed by PID. Keeping the same objects across calls is required so that cpu_percent() has a non-zero interval to measure against (first call always returns 0).

start() None[source]#

Start the background monitoring thread.

finish() None[source]#

Stop monitoring and flush remaining metrics to MLflow.

collect_once() dict[str, float][source]#

Collect one snapshot of metrics for the process tree.

Returns:

A dictionary of metric names to values.

Return type:

dict[str, float]

aggregate(
samples: list[dict[str, float]],
) dict[str, float][source]#

Average the collected samples.

Parameters:

samples (list[dict[str, float]]) – List of metric dictionaries.

Returns:

A dictionary of averaged metric values.

Return type:

dict[str, float]

publish(
metrics: dict[str, float],
) None[source]#

Log the aggregated metrics to the active MLflow run.

Parameters:

metrics (dict[str, float]) – Aggregated metric values.

monitor_loop() None[source]#

Main monitoring loop running in a daemon thread.

flush_remaining() None[source]#

Flush any buffered samples that haven’t been logged yet.