Source code for roastcoffea.aggregation.workflow
"""Workflow-level metrics aggregation.
Calculates throughput, data rates, event rates, compression ratios,
and overall timing metrics from coffea reports and tracking data.
"""
from __future__ import annotations
from typing import Any
[docs]
def aggregate_workflow_metrics(
coffea_report: dict[str, Any],
t_start: float,
t_end: float,
custom_metrics: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Calculate workflow metrics from Coffea report.
Parameters
----------
coffea_report : dict
Coffea report from Runner
t_start : float
Start time
t_end : float
End time
custom_metrics : dict, optional
Per-dataset custom metrics
Returns
-------
dict
Workflow metrics
"""
# Calculate elapsed time
elapsed_time_seconds = t_end - t_start
# Extract number of chunks from coffea report
num_chunks = coffea_report.get("chunks", 0)
# Combine coffea report and custom metrics into unified structure
# If custom_metrics provided, use those; otherwise use coffea report as "total"
combined_report = {}
if custom_metrics:
# Custom metrics provide the detailed breakdown
combined_report.update(custom_metrics)
elif "bytesread" in coffea_report:
# Convert coffea report to "total" dataset if no custom metrics
combined_report["total"] = {
"entries": coffea_report.get("entries", 0),
"duration": coffea_report.get("processtime", elapsed_time_seconds),
"performance_counters": {
"num_requested_bytes": coffea_report.get("bytesread", 0)
},
}
# Extract and aggregate metrics from combined report
total_bytes_read_coffea = 0
total_events = 0
total_cpu_time = 0
for _dataset_name, dataset_data in combined_report.items():
# Skip non-dataset entries
if not isinstance(dataset_data, dict):
continue
# Get performance counters
perf_counters = dataset_data.get("performance_counters", {})
total_bytes_read_coffea += perf_counters.get("num_requested_bytes", 0)
# Get events and duration
total_events += dataset_data.get("entries", 0)
total_cpu_time += dataset_data.get("duration", 0)
# Calculate throughput metrics (based on Coffea bytesread)
data_rate_gbps = (
(total_bytes_read_coffea * 8 / 1e9) / elapsed_time_seconds
if elapsed_time_seconds > 0
else 0
)
# Calculate event rate metrics
event_rate_elapsed_khz = (
(total_events / elapsed_time_seconds) / 1000 if elapsed_time_seconds > 0 else 0
)
event_rate_cpu_total_khz = (
(total_events / total_cpu_time) / 1000 if total_cpu_time > 0 else 0
)
# Calculate chunk-level metrics
avg_cpu_time_per_chunk = total_cpu_time / num_chunks if num_chunks > 0 else 0
# Build metrics dictionary
return {
# Throughput metrics
"data_rate_gbps": data_rate_gbps,
# Event processing metrics
"total_events": total_events,
"event_rate_elapsed_khz": event_rate_elapsed_khz,
"event_rate_cpu_total_khz": event_rate_cpu_total_khz,
# Timing metrics
"elapsed_time_seconds": elapsed_time_seconds,
"total_cpu_time": total_cpu_time,
"num_chunks": num_chunks,
"avg_cpu_time_per_chunk": avg_cpu_time_per_chunk,
# Data volume metrics
"total_bytes_read": total_bytes_read_coffea, # From Coffea's bytesread
}