Source code for roastcoffea.aggregation.chunk

"""Chunk-level metrics aggregation."""

from __future__ import annotations

from typing import Any, cast


[docs] def aggregate_chunk_metrics( chunk_metrics: list[dict[str, Any]] | None, section_metrics: list[dict[str, Any]] | None = None, ) -> dict[str, Any]: """Aggregate chunk-level metrics. Parameters ---------- chunk_metrics : list of dict, optional List of per-chunk metrics from @track_metrics decorator section_metrics : list of dict, optional List of section metrics from track_section() and track_memory() Returns ------- dict Aggregated chunk metrics including: - Number of chunks processed - Timing statistics (mean, min, max, std) - Memory statistics - Per-dataset breakdown - Section timing breakdown """ result: dict[str, Any] = {} if not chunk_metrics: result["num_chunks"] = 0 return result result["num_chunks"] = len(chunk_metrics) # Filter successful chunks (no errors) successful_chunks = [c for c in chunk_metrics if "error" not in c] failed_chunks = [c for c in chunk_metrics if "error" in c] result["num_successful_chunks"] = len(successful_chunks) result["num_failed_chunks"] = len(failed_chunks) if not successful_chunks: return result # Timing statistics durations = [c["duration"] for c in successful_chunks] result["chunk_duration_mean"] = sum(durations) / len(durations) result["chunk_duration_min"] = min(durations) result["chunk_duration_max"] = max(durations) if len(durations) > 1: mean = result["chunk_duration_mean"] variance = sum((d - mean) ** 2 for d in durations) / (len(durations) - 1) result["chunk_duration_std"] = variance**0.5 else: result["chunk_duration_std"] = 0.0 # Memory statistics (if available) mem_deltas = [c["mem_delta_mb"] for c in successful_chunks if "mem_delta_mb" in c] if mem_deltas: result["chunk_mem_delta_mean_mb"] = sum(mem_deltas) / len(mem_deltas) result["chunk_mem_delta_min_mb"] = min(mem_deltas) result["chunk_mem_delta_max_mb"] = max(mem_deltas) if len(mem_deltas) > 1: mean_mem = result["chunk_mem_delta_mean_mb"] variance_mem = sum((m - mean_mem) ** 2 for m in mem_deltas) / ( len(mem_deltas) - 1 ) result["chunk_mem_delta_std_mb"] = variance_mem**0.5 else: result["chunk_mem_delta_std_mb"] = 0.0 # Event statistics (if available) event_counts = [c["num_events"] for c in successful_chunks if "num_events" in c] if event_counts: result["total_events_from_chunks"] = sum(event_counts) result["chunk_events_mean"] = sum(event_counts) / len(event_counts) result["chunk_events_min"] = min(event_counts) result["chunk_events_max"] = max(event_counts) # Per-dataset breakdown datasets = {} for chunk in successful_chunks: dataset = chunk.get("dataset", "unknown") if dataset not in datasets: datasets[dataset] = { "num_chunks": 0, "total_duration": 0.0, "total_events": 0, } datasets[dataset]["num_chunks"] += 1 datasets[dataset]["total_duration"] += chunk["duration"] if "num_events" in chunk: datasets[dataset]["total_events"] += chunk["num_events"] # Calculate per-dataset averages for _dataset, data in datasets.items(): if data["num_chunks"] > 0: data["mean_duration"] = data["total_duration"] / data["num_chunks"] if data["total_events"] > 0: data["mean_events_per_chunk"] = data["total_events"] / data["num_chunks"] result["per_dataset"] = datasets # Section timing breakdown (if available) if section_metrics: sections = {} for section in section_metrics: name = section.get("name", "unknown") if name not in sections: sections[name] = { "count": 0, "total_duration": 0.0, "type": section.get("type", "section"), } sections[name]["count"] += 1 sections[name]["total_duration"] += section.get("duration", 0.0) # Add memory stats for memory tracking if section.get("type") == "memory" and "mem_delta_mb" in section: if "mem_deltas" not in sections[name]: sections[name]["mem_deltas"] = [] sections[name]["mem_deltas"].append(section["mem_delta_mb"]) # Calculate averages for _name, data in sections.items(): if data["count"] > 0: data["mean_duration"] = data["total_duration"] / data["count"] # Memory averages if "mem_deltas" in data: mem_deltas_list = cast(list[float], data["mem_deltas"]) data["mean_mem_delta_mb"] = sum(mem_deltas_list) / len(mem_deltas_list) data["max_mem_delta_mb"] = max(mem_deltas_list) data["min_mem_delta_mb"] = min(mem_deltas_list) del data["mem_deltas"] # Remove raw list result["sections"] = sections return result
[docs] def build_chunk_info(chunk_metrics: list[dict[str, Any]]) -> dict[tuple, tuple]: """Build chunk_info dict from chunk metrics for throughput plotting. Transforms chunk-level metrics collected by @track_metrics into the format expected by plot_throughput_timeline(). Parameters ---------- chunk_metrics : list of dict List of chunk metrics dicts from @track_metrics decorator. Each dict contains: file, entry_start, entry_stop, t_start, t_end, bytes_read Returns ------- dict Dictionary mapping chunk keys to timing/bytes data: {(filename, entry_start, entry_stop): (t_start, t_end, bytes_read)} Examples -------- >>> chunk_metrics = [ ... {"file": "data.root", "entry_start": 0, "entry_stop": 1000, ... "t_start": 1.0, "t_end": 2.5, "bytes_read": 50000}, ... ] >>> chunk_info = build_chunk_info(chunk_metrics) >>> chunk_info {('data.root', 0, 1000): (1.0, 2.5, 50000)} Notes ----- - Chunks without file/entry metadata are skipped. - Chunks without bytes_read default to 0 bytes. """ chunk_info = {} for chunk in chunk_metrics: # Extract required fields filename = chunk.get("file") entry_start = chunk.get("entry_start") entry_stop = chunk.get("entry_stop") t_start = chunk.get("t_start") t_end = chunk.get("t_end") bytes_read = chunk.get("bytes_read", 0) # Skip chunks without essential metadata if filename is None or entry_start is None or entry_stop is None: continue if t_start is None or t_end is None: continue # Build chunk key and value key = (filename, entry_start, entry_stop) value = (t_start, t_end, bytes_read) chunk_info[key] = value return chunk_info