Source code for roastcoffea.visualization.plots.cpu

"""CPU utilization plots.

Visualizations for CPU usage and worker task metrics.
"""

from __future__ import annotations

from pathlib import Path
from typing import Any

import matplotlib.pyplot as plt
import numpy as np

from roastcoffea.visualization.utils import (
    add_worker_count_annotation,
    finalize_timeline_plot,
    setup_timeline_axes,
    validate_tracking_data,
)


[docs] def plot_occupancy_timeline( tracking_data: dict[str, Any] | None, output_path: Path | None = None, figsize: tuple[int, int] = (12, 6), title: str = "Worker Occupancy Over Time", max_legend_entries: int = 5, ) -> tuple[plt.Figure, plt.Axes]: """Plot worker occupancy (task saturation) over time. Occupancy is a metric from Dask scheduler indicating how saturated a worker is with tasks. 0.0 = idle, higher values = more saturated. Parameters ---------- tracking_data : dict or None Tracking data with worker_occupancy output_path : Path, optional Save path figsize : tuple Figure size title : str Plot title max_legend_entries : int, optional Maximum number of workers to show in legend. Default is 5. Returns ------- fig, ax : Figure and Axes Matplotlib figure and axes Raises ------ ValueError If tracking_data is None or missing occupancy data """ worker_occupancy = validate_tracking_data( tracking_data, "worker_occupancy", "No worker occupancy data available" ) fig, ax = plt.subplots(figsize=figsize) for worker_id, timeline in worker_occupancy.items(): if timeline: timestamps = [t for t, _ in timeline] values = [val for _, val in timeline] ax.plot(timestamps, values, label=worker_id, alpha=0.7, linewidth=2) setup_timeline_axes(ax, ylabel="Occupancy (saturation)", title=title) num_workers = len(worker_occupancy) if num_workers <= max_legend_entries: ax.legend(loc="upper left", bbox_to_anchor=(1.05, 1), fontsize=8) else: add_worker_count_annotation(ax, num_workers) finalize_timeline_plot(fig, ax, output_path) return fig, ax
[docs] def plot_executing_tasks_timeline( tracking_data: dict[str, Any] | None, output_path: Path | None = None, figsize: tuple[int, int] = (12, 6), title: str = "Executing Tasks Per Worker Over Time", max_legend_entries: int = 5, ) -> tuple[plt.Figure, plt.Axes]: """Plot number of executing tasks per worker over time. Executing tasks are tasks actually running (subset of active tasks). Parameters ---------- tracking_data : dict or None Tracking data with worker_executing output_path : Path, optional Save path figsize : tuple Figure size title : str Plot title max_legend_entries : int, optional Maximum number of workers to show in legend. Default is 5. Returns ------- fig, ax : Figure and Axes Matplotlib figure and axes Raises ------ ValueError If tracking_data is None or missing executing data """ worker_executing = validate_tracking_data( tracking_data, "worker_executing", "No worker executing tasks data available" ) fig, ax = plt.subplots(figsize=figsize) for worker_id, timeline in worker_executing.items(): if timeline: timestamps = [t for t, _ in timeline] values = [val for _, val in timeline] ax.plot(timestamps, values, label=worker_id, alpha=0.7, linewidth=2) setup_timeline_axes(ax, ylabel="Number of Executing Tasks", title=title) num_workers = len(worker_executing) if num_workers <= max_legend_entries: ax.legend(loc="upper left", bbox_to_anchor=(1.05, 1), fontsize=8) else: add_worker_count_annotation(ax, num_workers) finalize_timeline_plot(fig, ax, output_path) return fig, ax
[docs] def plot_cpu_utilization_per_worker_timeline( tracking_data: dict[str, Any] | None, output_path: Path | None = None, figsize: tuple[int, int] = (12, 6), title: str = "CPU Utilization Per Worker Over Time", max_legend_entries: int = 5, ) -> tuple[plt.Figure, plt.Axes]: """Plot CPU utilization percentage per worker over time. Shows actual CPU usage (0-100%) for each worker, providing insight into compute resource utilization. Parameters ---------- tracking_data : dict or None Tracking data with worker_cpu output_path : Path, optional Save path figsize : tuple Figure size title : str Plot title max_legend_entries : int, optional Maximum number of workers to show in legend. Default is 5. Returns ------- fig, ax : Figure and Axes Matplotlib figure and axes Raises ------ ValueError If tracking_data is None or missing CPU data """ worker_cpu = validate_tracking_data( tracking_data, "worker_cpu", "No worker CPU data available" ) fig, ax = plt.subplots(figsize=figsize) for worker_id, timeline in worker_cpu.items(): if timeline: timestamps = [t for t, _ in timeline] values = [val for _, val in timeline] ax.plot(timestamps, values, label=worker_id, alpha=0.7, linewidth=2) setup_timeline_axes(ax, ylabel="CPU Utilization (%)", title=title, ylim=(0, 100)) num_workers = len(worker_cpu) if num_workers <= max_legend_entries: ax.legend(loc="upper left", bbox_to_anchor=(1.05, 1), fontsize=8) else: add_worker_count_annotation(ax, num_workers) finalize_timeline_plot(fig, ax, output_path) return fig, ax
[docs] def plot_cpu_utilization_mean_timeline( tracking_data: dict[str, Any] | None, output_path: Path | None = None, figsize: tuple[int, int] = (10, 4), title: str = "CPU Utilization Over Time", ) -> tuple[plt.Figure, plt.Axes]: """Plot mean CPU utilization percentage over time with min-max band. Shows aggregated CPU usage across all workers, with mean line and shaded min-max range. Parameters ---------- tracking_data : dict or None Tracking data with worker_cpu output_path : Path, optional Save path figsize : tuple Figure size title : str Plot title Returns ------- fig, ax : Figure and Axes Matplotlib figure and axes Raises ------ ValueError If tracking_data is None or missing CPU data """ worker_cpu = validate_tracking_data( tracking_data, "worker_cpu", "No worker CPU data available" ) # Collect all unique timestamps all_timestamps = set() for worker_id in worker_cpu: for timestamp, _ in worker_cpu[worker_id]: all_timestamps.add(timestamp) sorted_timestamps = sorted(all_timestamps) # Calculate CPU utilization stats at each timestamp cpu_mean = [] cpu_min = [] cpu_max = [] for timestamp in sorted_timestamps: worker_values = [] for worker_id in worker_cpu: for t, cpu_value in worker_cpu[worker_id]: if t == timestamp: worker_values.append(cpu_value) break if worker_values: cpu_mean.append(np.mean(worker_values)) cpu_min.append(np.min(worker_values)) cpu_max.append(np.max(worker_values)) else: cpu_mean.append(0) cpu_min.append(0) cpu_max.append(0) fig, ax = plt.subplots(figsize=figsize) ax.plot(sorted_timestamps, cpu_mean, linewidth=2, label="Mean", color="C0") ax.fill_between( sorted_timestamps, cpu_min, cpu_max, alpha=0.3, label="Min-Max Range", color="C0", ) setup_timeline_axes(ax, ylabel="CPU Utilization (%)", title=title, ylim=(0, 100)) ax.legend() finalize_timeline_plot(fig, ax, output_path) return fig, ax