Source code for roastcoffea.aggregation.backends.dask
"""Dask-specific aggregation parsers.
Parses Dask scheduler tracking data and (v0.2+) fine metrics
into standardized worker metrics dictionaries.
"""
from __future__ import annotations
import datetime
from typing import Any
import numpy as np
from roastcoffea.aggregation.backends.base import AbstractTrackingDataParser
[docs]
class DaskTrackingDataParser(AbstractTrackingDataParser):
"""Parser for Dask scheduler tracking data."""
[docs]
def parse_tracking_data(self, tracking_data: dict[str, Any]) -> dict[str, Any]:
"""Parse Dask scheduler tracking data into aggregated metrics.
Parameters
----------
tracking_data : dict
Raw tracking data from DaskMetricsBackend.stop_tracking()
Returns
-------
dict
Aggregated worker metrics
"""
worker_counts = tracking_data.get("worker_counts", {})
worker_memory = tracking_data.get("worker_memory", {})
worker_cores = tracking_data.get("worker_cores", {})
# Calculate worker metrics
avg_workers = calculate_time_averaged_workers(worker_counts)
peak_workers = max(worker_counts.values()) if worker_counts else 0
# Calculate total cores and cores per worker from per-worker core tracking
total_cores = None
cores_per_worker = None
if worker_cores:
# Sum cores across all workers (use latest value for each worker)
cores_sum = 0
core_counts = []
for _worker_id, timeline in worker_cores.items():
if timeline:
# Use the latest (or any) core count for this worker
# Cores don't change over time, so any value is fine
worker_core_count = timeline[-1][1]
cores_sum += worker_core_count
core_counts.append(worker_core_count)
if cores_sum > 0:
total_cores = float(cores_sum)
# Calculate average cores per worker
cores_per_worker = float(np.mean(core_counts)) if core_counts else None
# Calculate memory metrics
peak_memory_bytes = calculate_peak_memory(worker_memory)
avg_memory_per_worker_bytes = calculate_average_memory_per_worker(worker_memory)
return {
"avg_workers": avg_workers,
"peak_workers": peak_workers,
"total_cores": total_cores,
"cores_per_worker": cores_per_worker,
"peak_memory_bytes": peak_memory_bytes,
"avg_memory_per_worker_bytes": avg_memory_per_worker_bytes,
}
[docs]
def calculate_time_averaged_workers(
worker_counts: dict[datetime.datetime, int],
) -> float:
"""Calculate time-weighted average worker count.
Uses trapezoidal integration to compute the average number of workers
weighted by the time each count was active.
Parameters
----------
worker_counts : dict
Mapping from datetime to worker count
Returns
-------
float
Time-averaged worker count
"""
if not worker_counts:
return 0.0
if len(worker_counts) < 2:
return float(next(iter(worker_counts.values())))
# Sort by timestamp
sorted_items = sorted(worker_counts.items())
timestamps = [t for t, _ in sorted_items]
counts = [c for _, c in sorted_items]
# Convert to seconds since first sample
t0 = timestamps[0]
times = np.array([(t - t0).total_seconds() for t in timestamps])
worker_array = np.array(counts, dtype=float)
# Calculate time intervals
delta_t = np.diff(times)
# Trapezoidal integration: area = (y1 + y2) / 2 * delta_t
workers_times_time = [
(worker_array[i] + worker_array[i + 1]) / 2 * delta_t[i]
for i in range(len(delta_t))
]
# Time-weighted average
total_time = times[-1] - times[0]
return sum(workers_times_time) / total_time
[docs]
def calculate_peak_memory(worker_memory: dict[str, list[tuple]]) -> float:
"""Calculate peak memory usage across all workers.
Parameters
----------
worker_memory : dict
Dictionary from tracking data: worker_id -> [(timestamp, memory_bytes), ...]
Returns
-------
float
Maximum memory usage observed
"""
if not worker_memory:
return 0.0
all_memory_values = []
for _worker_id, timeline in worker_memory.items():
for _timestamp, memory_bytes in timeline:
all_memory_values.append(memory_bytes)
return max(all_memory_values) if all_memory_values else 0.0
[docs]
def calculate_average_memory_per_worker(
worker_memory: dict[str, list[tuple]],
) -> float:
"""Calculate time-weighted average memory per worker.
Computes time-weighted average for each worker, then averages across workers.
Parameters
----------
worker_memory : dict
Dictionary from tracking data: worker_id -> [(timestamp, memory_bytes), ...]
Returns
-------
float
Average memory per worker
"""
if not worker_memory:
return 0.0
worker_averages = []
for _worker_id, timeline in worker_memory.items():
if len(timeline) < 2:
if timeline:
worker_averages.append(timeline[0][1])
continue
# Sort by timestamp
sorted_timeline = sorted(timeline, key=lambda x: x[0])
# Extract timestamps and memory values
timestamps = [t for t, m in sorted_timeline]
memory_values = [m for t, m in sorted_timeline]
# Convert to seconds since first sample
t0 = timestamps[0]
times = np.array([(t - t0).total_seconds() for t in timestamps])
memory = np.array(memory_values, dtype=float)
# Calculate time intervals
delta_t = np.diff(times)
# Trapezoidal integration
memory_times_time = [
(memory[i] + memory[i + 1]) / 2 * delta_t[i] for i in range(len(delta_t))
]
# Time-weighted average for this worker
total_time = times[-1] - times[0]
worker_avg = sum(memory_times_time) / total_time
worker_averages.append(worker_avg)
# Average across all workers
return float(np.mean(worker_averages)) if worker_averages else 0.0