Source code for roastcoffea.aggregation.fine_metrics
"""Parse Dask Spans fine-grained performance metrics.
Dask Spans provide detailed breakdown of task activity via cumulative_worker_metrics.
This module parses those metrics into a standardized format.
"""
from __future__ import annotations
from typing import Any
[docs]
def parse_fine_metrics(
cumulative_worker_metrics: dict[tuple[str, ...], Any],
processor_name: str | None = None,
) -> dict[str, Any]:
"""Parse Dask Spans cumulative_worker_metrics into fine metrics.
Parameters
----------
cumulative_worker_metrics : dict
Raw metrics from span.cumulative_worker_metrics with tuple keys like:
('execute', task_prefix, activity, unit) -> value
Activities include: thread-cpu, thread-noncpu, disk-read, disk-write,
compress, decompress, serialize, deserialize
processor_name : str, optional
Name of processor class to filter metrics for. If provided, only metrics
from this processor are included in processor_* fields, and other metrics
go into overhead_* fields.
Returns
-------
dict
Parsed fine metrics with keys:
- processor_cpu_time_seconds: CPU time in processor
- processor_io_wait_time_seconds: I/O and waiting time in processor (I/O, GIL, blocking)
- processor_cpu_percent: CPU / (CPU + I/O wait) x 100 for processor
- processor_io_wait_percent: I/O wait / (CPU + I/O wait) x 100 for processor
- overhead_cpu_time_seconds: CPU time in Dask overhead (if processor_name given)
- overhead_io_wait_time_seconds: I/O and waiting time in Dask overhead
- disk_read_bytes: Bytes read from disk
- disk_write_bytes: Bytes written to disk
- decompression_time_seconds: Time spent decompressing
- compression_time_seconds: Time spent compressing
- deserialization_time_seconds: Time spent deserializing
- serialization_time_seconds: Time spent serializing
- total_serialization_overhead_seconds: Sum of serialize + deserialize
- total_compression_overhead_seconds: Sum of compress + decompress
"""
# Aggregate metrics by activity type
# Metrics have keys like: ('execute', task_prefix, activity, unit)
processor_cpu = 0.0
processor_io_wait = 0.0
overhead_cpu = 0.0
overhead_io_wait = 0.0
disk_read = 0
disk_write = 0
memory_read = 0
decompress_time = 0.0
compress_time = 0.0
deserialize_time = 0.0
serialize_time = 0.0
for key, value in cumulative_worker_metrics.items():
if len(key) < 3:
continue
# Extract components from tuple key
# Format: (context, task_prefix, activity, unit)
task_prefix = key[1]
activity = key[2]
unit = key[3] if len(key) > 3 else None
# Determine if this is processor work or overhead
is_processor = (processor_name is None) or (task_prefix == processor_name)
if activity == "thread-cpu":
if is_processor:
processor_cpu += value
else:
overhead_cpu += value
elif activity == "thread-noncpu":
if is_processor:
processor_io_wait += value
else:
overhead_io_wait += value
elif activity == "disk-read" and unit == "bytes":
disk_read += value
elif activity == "disk-write" and unit == "bytes":
disk_write += value
elif activity == "memory-read" and unit == "bytes":
memory_read += value
elif activity == "decompress":
decompress_time += value
elif activity == "compress":
compress_time += value
elif activity == "deserialize":
deserialize_time += value
elif activity == "serialize":
serialize_time += value
# Calculate percentages for processor
processor_total = processor_cpu + processor_io_wait
processor_cpu_pct = (
(processor_cpu / processor_total * 100) if processor_total > 0 else 0.0
)
processor_io_wait_pct = (
(processor_io_wait / processor_total * 100) if processor_total > 0 else 0.0
)
# Calculate overhead totals
total_serialization_overhead = serialize_time + deserialize_time
total_compression_overhead = compress_time + decompress_time
return {
# Processor time breakdown
"processor_cpu_time_seconds": processor_cpu,
"processor_io_wait_time_seconds": processor_io_wait,
"processor_cpu_percent": processor_cpu_pct,
"processor_io_wait_percent": processor_io_wait_pct,
# Dask overhead (only populated if processor_name given)
"overhead_cpu_time_seconds": overhead_cpu,
"overhead_io_wait_time_seconds": overhead_io_wait,
# Data volume from Dask Spans
"total_bytes_memory_read": memory_read, # In-memory data access tracked by Dask
"disk_read_bytes": disk_read,
"disk_write_bytes": disk_write,
# Compression overhead
"decompression_time_seconds": decompress_time,
"compression_time_seconds": compress_time,
"total_compression_overhead_seconds": total_compression_overhead,
# Serialization overhead
"deserialization_time_seconds": deserialize_time,
"serialization_time_seconds": serialize_time,
"total_serialization_overhead_seconds": total_serialization_overhead,
}