Source code for roastcoffea.aggregation.core

"""Core metrics aggregator combining all aggregation modules."""

from __future__ import annotations

from typing import Any

from roastcoffea.aggregation.backends import get_parser
from roastcoffea.aggregation.branch_coverage import aggregate_branch_coverage
from roastcoffea.aggregation.chunk import aggregate_chunk_metrics, build_chunk_info
from roastcoffea.aggregation.efficiency import calculate_efficiency_metrics
from roastcoffea.aggregation.fine_metrics import parse_fine_metrics
from roastcoffea.aggregation.workflow import aggregate_workflow_metrics


[docs] class MetricsAggregator: """Main aggregator combining workflow, worker, and efficiency metrics.""" def __init__(self, backend: str) -> None: """Initialize aggregator for specific backend. Parameters ---------- backend : str Backend name ("dask", "taskvine", etc.) Raises ------ ValueError If backend is not supported """ self.backend = backend self.parser = get_parser(backend)
[docs] def aggregate( self, coffea_report: dict[str, Any], tracking_data: dict[str, Any] | None, t_start: float, t_end: float, custom_metrics: dict[str, Any] | None = None, span_metrics: dict[tuple[str, ...], Any] | None = None, processor_name: str | None = None, chunk_metrics: list[dict[str, Any]] | None = None, section_metrics: list[dict[str, Any]] | None = None, ) -> dict[str, Any]: """Aggregate all metrics from workflow run. Parameters ---------- coffea_report : dict Coffea report tracking_data : dict, optional Backend tracking data t_start : float Start time t_end : float End time custom_metrics : dict, optional Per-dataset metrics span_metrics : dict, optional Dask Spans cumulative_worker_metrics processor_name : str, optional Name of processor class for filtering fine metrics chunk_metrics : list of dict, optional Per-chunk metrics from @track_metrics decorator section_metrics : list of dict, optional Section metrics from track_section() and track_memory() Returns ------- dict Combined metrics """ # Aggregate workflow metrics workflow_metrics = aggregate_workflow_metrics( coffea_report=coffea_report, t_start=t_start, t_end=t_end, custom_metrics=custom_metrics, ) # Parse worker metrics if tracking data available worker_metrics = {} if tracking_data is not None: worker_metrics = self.parser.parse_tracking_data(tracking_data) # Parse fine metrics from Spans if available fine_metrics = {} if span_metrics: fine_metrics = parse_fine_metrics( span_metrics, processor_name=processor_name ) # Update compression metrics with real data from Spans # Don't calculate compression ratio - the two metrics measure different things: # - Coffea bytesread: compressed bytes from file # - Dask memory-read: incomplete tracking of in-memory access # We don't have enough information to compute a valid compression ratio # Aggregate chunk metrics if available chunk_agg_metrics = {} if chunk_metrics: chunk_agg_metrics = aggregate_chunk_metrics( chunk_metrics=chunk_metrics, section_metrics=section_metrics, ) # Build chunk_info for throughput plotting # This transforms chunk metrics into the format expected by plot_throughput_timeline() chunk_info = build_chunk_info(chunk_metrics) if chunk_info: # Add to chunk_agg_metrics instead of modifying coffea_report chunk_agg_metrics["chunk_info"] = chunk_info # Aggregate branch coverage and data access metrics branch_coverage_metrics = aggregate_branch_coverage( chunk_metrics=chunk_metrics, coffea_report=coffea_report, ) # Calculate efficiency metrics efficiency_metrics = calculate_efficiency_metrics( workflow_metrics=workflow_metrics, worker_metrics=worker_metrics, ) # Combine all metrics combined_metrics = {} combined_metrics.update(workflow_metrics) combined_metrics.update(worker_metrics) combined_metrics.update(efficiency_metrics) combined_metrics.update(fine_metrics) combined_metrics.update(chunk_agg_metrics) combined_metrics.update(branch_coverage_metrics) # Preserve raw tracking data for visualization combined_metrics["tracking_data"] = tracking_data # Preserve raw metrics for detailed analysis and visualization if chunk_metrics: combined_metrics["raw_chunk_metrics"] = chunk_metrics if section_metrics: combined_metrics["raw_section_metrics"] = section_metrics if span_metrics: combined_metrics["raw_span_metrics"] = span_metrics return combined_metrics