Source code for roastcoffea.aggregation.efficiency
"""Efficiency metrics calculation.
Calculates core efficiency, speedup factors, and per-core event rates
from workflow and worker metrics.
"""
from __future__ import annotations
from typing import Any
[docs]
def calculate_efficiency_metrics(
workflow_metrics: dict[str, Any],
worker_metrics: dict[str, Any],
) -> dict[str, Any]:
"""Calculate efficiency metrics from workflow and worker data.
Parameters
----------
workflow_metrics : dict
Workflow metrics from aggregate_workflow_metrics()
worker_metrics : dict
Worker metrics from parse_tracking_data()
Returns
-------
dict
Efficiency metrics
"""
elapsed_time_seconds = workflow_metrics.get("elapsed_time_seconds", 0)
total_cpu_time = workflow_metrics.get("total_cpu_time", 0)
total_events = workflow_metrics.get("total_events", 0)
total_cores = worker_metrics.get("total_cores")
# Calculate core efficiency
core_efficiency = None
if total_cores is not None:
if elapsed_time_seconds > 0:
total_available_time = total_cores * elapsed_time_seconds
core_efficiency = (
total_cpu_time / total_available_time
if total_available_time > 0
else 0.0
)
else:
core_efficiency = 0.0
# Calculate speedup factor
speedup_factor = (
total_cpu_time / elapsed_time_seconds if elapsed_time_seconds > 0 else 0.0
)
# Calculate event rate per core (in kHz for consistency)
event_rate_core_khz = None
if total_cores is not None:
if elapsed_time_seconds > 0:
event_rate_core_khz = (
total_events / (elapsed_time_seconds * total_cores)
) / 1000
else:
event_rate_core_khz = 0.0
return {
"core_efficiency": core_efficiency,
"speedup_factor": speedup_factor,
"event_rate_core_khz": event_rate_core_khz,
}