Source code for roastcoffea.export.reporter
"""Rich table formatting for metrics reporting."""
from __future__ import annotations
from typing import Any
from rich.table import Table
def _format_bytes(num_bytes: float) -> str:
"""Format bytes in human-readable units."""
for unit in ["B", "KB", "MB", "GB", "TB"]:
if abs(num_bytes) < 1024.0:
return f"{num_bytes:.2f} {unit}"
num_bytes /= 1024.0
return f"{num_bytes:.2f} PB"
def _format_time(seconds: float) -> str:
"""Format time in human-readable units."""
if seconds < 60:
return f"{seconds:.1f}s"
minutes = int(seconds // 60)
remaining_seconds = int(seconds % 60)
if minutes < 60:
return f"{minutes}m {remaining_seconds}s"
hours = minutes // 60
remaining_minutes = minutes % 60
return f"{hours}h {remaining_minutes}m {remaining_seconds}s"
[docs]
def format_throughput_table(metrics: dict[str, Any]) -> Table:
"""Format throughput metrics as Rich table.
Parameters
----------
metrics : dict
Metrics dictionary
Returns
-------
Table
Rich table
"""
table = Table(
title="Throughput Metrics", show_header=True, header_style="bold cyan"
)
table.add_column("Metric", style="cyan", no_wrap=True)
table.add_column("Value", style="magenta")
# Data rate
data_rate_gbps = metrics.get("data_rate_gbps", 0)
data_rate_mbps = (data_rate_gbps * 1000) / 8 # Convert Gbps to MB/s
table.add_row(
"Data Rate",
f"{data_rate_gbps:.2f} Gbps ({data_rate_mbps:.1f} MB/s)",
)
# Data volume from Coffea
bytes_coffea = metrics.get("total_bytes_read", 0)
if bytes_coffea:
table.add_row(
"Total Bytes Read (Coffea)",
f"{_format_bytes(bytes_coffea)}",
)
# Data volume from Dask Spans (if available)
bytes_dask = metrics.get("total_bytes_memory_read", 0)
if bytes_dask:
table.add_row(
"Memory Read (Dask Spans)",
f"{_format_bytes(bytes_dask)}",
)
return table
[docs]
def format_event_processing_table(metrics: dict[str, Any]) -> Table:
"""Format event processing metrics as Rich table.
Parameters
----------
metrics : dict
Metrics dictionary
Returns
-------
Table
Rich table
"""
table = Table(
title="Event Processing Metrics", show_header=True, header_style="bold cyan"
)
table.add_column("Metric", style="cyan", no_wrap=True)
table.add_column("Value", style="magenta")
# Total events
total_events = metrics.get("total_events", 0)
table.add_row("Total Events", f"{total_events:,}")
# Event rates
elapsed_khz = metrics.get("event_rate_elapsed_khz", 0)
table.add_row("Event Rate (Elapsed Time)", f"{elapsed_khz:.1f} kHz")
cpu_total_khz = metrics.get("event_rate_cpu_total_khz", 0)
table.add_row("Event Rate (Total CPU)", f"{cpu_total_khz:.1f} kHz")
# Core-averaged rate (may be None if no worker data)
core_khz = metrics.get("event_rate_core_khz")
if core_khz is not None:
table.add_row("Event Rate (Core-Averaged)", f"{core_khz:.1f} kHz/core")
else:
table.add_row("Event Rate (Core-Averaged)", "[dim]N/A (no worker data)[/dim]")
# Efficiency ratio
if elapsed_khz and cpu_total_khz:
efficiency_ratio = elapsed_khz / cpu_total_khz
table.add_row("Efficiency Ratio", f"{efficiency_ratio:.1%}")
return table
[docs]
def format_resources_table(metrics: dict[str, Any]) -> Table:
"""Format resource utilization metrics as Rich table.
Parameters
----------
metrics : dict
Metrics dictionary
Returns
-------
Table
Rich table
"""
table = Table(
title="Resource Utilization", show_header=True, header_style="bold cyan"
)
table.add_column("Metric", style="cyan", no_wrap=True)
table.add_column("Value", style="magenta")
# Worker metrics
avg_workers = metrics.get("avg_workers")
if avg_workers is not None:
table.add_row("Workers (Time-Averaged)", f"{avg_workers:.1f}")
else:
table.add_row("Workers (Time-Averaged)", "[dim]N/A (no worker tracking)[/dim]")
peak_workers = metrics.get("peak_workers")
if peak_workers is not None:
table.add_row("Peak Workers", f"{peak_workers}")
else:
table.add_row("Peak Workers", "[dim]N/A (no worker tracking)[/dim]")
# Core metrics
cores_per_worker = metrics.get("cores_per_worker")
if cores_per_worker is not None:
table.add_row("Cores per Worker", f"{cores_per_worker:.1f}")
else:
table.add_row("Cores per Worker", "[dim]N/A (no worker tracking)[/dim]")
total_cores = metrics.get("total_cores")
if total_cores is not None:
table.add_row("Total Cores", f"{total_cores:.0f}")
else:
table.add_row("Total Cores", "[dim]N/A (no worker tracking)[/dim]")
# Efficiency
core_efficiency = metrics.get("core_efficiency")
if core_efficiency is not None:
table.add_row("Core Efficiency", f"{core_efficiency:.1%}")
else:
table.add_row("Core Efficiency", "[dim]N/A (no worker tracking)[/dim]")
# Speedup
speedup = metrics.get("speedup_factor")
if speedup is not None:
table.add_row("Speedup Factor", f"{speedup:.1f}x")
else:
table.add_row("Speedup Factor", "[dim]N/A (no worker tracking)[/dim]")
# Memory metrics
peak_memory = metrics.get("peak_memory_bytes")
if peak_memory is not None:
table.add_row("Peak Memory (per worker)", _format_bytes(peak_memory))
else:
table.add_row("Peak Memory (per worker)", "[dim]N/A (no worker tracking)[/dim]")
avg_memory = metrics.get("avg_memory_per_worker_bytes")
if avg_memory is not None:
table.add_row("Avg Memory (per worker)", _format_bytes(avg_memory))
else:
table.add_row("Avg Memory (per worker)", "[dim]N/A (no worker tracking)[/dim]")
return table
[docs]
def format_timing_table(metrics: dict[str, Any]) -> Table:
"""Format timing metrics as Rich table.
Parameters
----------
metrics : dict
Metrics dictionary
Returns
-------
Table
Rich table
"""
table = Table(title="Timing Breakdown", show_header=True, header_style="bold cyan")
table.add_column("Metric", style="cyan", no_wrap=True)
table.add_column("Value", style="magenta")
# Elapsed time
elapsed_time = metrics.get("elapsed_time_seconds", 0)
table.add_row("Elapsed Time", _format_time(elapsed_time))
# CPU time
cpu_time = metrics.get("total_cpu_time", 0)
table.add_row("Total CPU Time", _format_time(cpu_time))
# Chunk metrics
num_chunks = metrics.get("num_chunks", 0)
if num_chunks > 0:
table.add_row("Number of Chunks", f"{num_chunks:,}")
avg_cpu_per_chunk = metrics.get("avg_cpu_time_per_chunk", 0)
table.add_row("Avg CPU Time/Chunk", _format_time(avg_cpu_per_chunk))
return table
[docs]
def format_fine_metrics_table(metrics: dict[str, Any]) -> Table | None:
"""Format fine-grained metrics from Dask Spans as Rich table.
Parameters
----------
metrics : dict
Metrics dictionary
Returns
-------
Table or None
Rich table if fine metrics available, None otherwise
"""
# Check if any fine metrics are available
processor_cpu = metrics.get("processor_cpu_time_seconds")
processor_io_wait = metrics.get("processor_io_wait_time_seconds")
overhead_cpu = metrics.get("overhead_cpu_time_seconds")
overhead_io_wait = metrics.get("overhead_io_wait_time_seconds")
if processor_cpu is None and processor_io_wait is None:
return None
table = Table(
title="Fine Metrics (from Dask Spans)",
show_header=True,
header_style="bold cyan",
)
table.add_column("Metric", style="cyan", no_wrap=True)
table.add_column("Value", style="magenta")
# Processor CPU vs I/O wait breakdown
if processor_cpu is not None:
table.add_row("Processor CPU Time", _format_time(processor_cpu))
if processor_io_wait is not None:
table.add_row("Processor I/O & Waiting Time", _format_time(processor_io_wait))
processor_cpu_pct = metrics.get("processor_cpu_percent")
processor_io_wait_pct = metrics.get("processor_io_wait_percent")
if processor_cpu_pct is not None and processor_io_wait_pct is not None:
table.add_row(" CPU %", f"{processor_cpu_pct:.1f}%")
table.add_row(" I/O & Wait %", f"{processor_io_wait_pct:.1f}%")
# Dask overhead (if separated)
if overhead_cpu is not None and overhead_cpu > 0:
table.add_row("Dask Overhead CPU Time", _format_time(overhead_cpu))
if overhead_io_wait is not None and overhead_io_wait > 0:
table.add_row(
"Dask Overhead I/O & Waiting Time", _format_time(overhead_io_wait)
)
# Disk I/O
disk_read = metrics.get("disk_read_bytes")
disk_write = metrics.get("disk_write_bytes")
if disk_read is not None and disk_read > 0:
table.add_row("Disk Read", _format_bytes(disk_read))
if disk_write is not None and disk_write > 0:
table.add_row("Disk Write", _format_bytes(disk_write))
# Compression overhead
compress_time = metrics.get("compression_time_seconds")
decompress_time = metrics.get("decompression_time_seconds")
total_compression = metrics.get("total_compression_overhead_seconds")
if total_compression is not None and total_compression > 0:
table.add_row("Compression Overhead", _format_time(total_compression))
if compress_time is not None and compress_time > 0:
table.add_row(" • Compress", _format_time(compress_time))
if decompress_time is not None and decompress_time > 0:
table.add_row(" • Decompress", _format_time(decompress_time))
# Serialization overhead
serialize_time = metrics.get("serialization_time_seconds")
deserialize_time = metrics.get("deserialization_time_seconds")
total_serialization = metrics.get("total_serialization_overhead_seconds")
if total_serialization is not None and total_serialization > 0:
table.add_row("Serialization Overhead", _format_time(total_serialization))
if serialize_time is not None and serialize_time > 0:
table.add_row(" • Serialize", _format_time(serialize_time))
if deserialize_time is not None and deserialize_time > 0:
table.add_row(" • Deserialize", _format_time(deserialize_time))
return table
[docs]
def format_chunk_metrics_table(metrics: dict[str, Any]) -> Table | None:
"""Format chunk-level metrics as Rich table.
Parameters
----------
metrics : dict
Metrics dictionary
Returns
-------
Table or None
Rich table, or None if no chunk metrics available
"""
num_chunks = metrics.get("num_chunks", 0)
if num_chunks == 0:
return None
table = Table(
title="Chunk Metrics",
show_header=True,
header_style="bold cyan",
)
table.add_column("Metric", style="cyan", no_wrap=True)
table.add_column("Value", style="magenta")
# Basic stats
table.add_row("Total Chunks", str(num_chunks))
num_successful = metrics.get("num_successful_chunks", num_chunks)
num_failed = metrics.get("num_failed_chunks", 0)
if num_failed > 0:
table.add_row(" • Successful", str(num_successful))
table.add_row(" • Failed", str(num_failed), style="red")
# Timing statistics
mean_duration = metrics.get("chunk_duration_mean")
if mean_duration is not None:
table.add_row("Mean Chunk Time", _format_time(mean_duration))
min_duration = metrics.get("chunk_duration_min")
max_duration = metrics.get("chunk_duration_max")
std_duration = metrics.get("chunk_duration_std")
if min_duration is not None:
table.add_row(" • Min", _format_time(min_duration))
if max_duration is not None:
table.add_row(" • Max", _format_time(max_duration))
if std_duration is not None and std_duration > 0:
table.add_row(" • Std Dev", _format_time(std_duration))
# Memory statistics
mean_mem = metrics.get("chunk_mem_delta_mean_mb")
if mean_mem is not None:
table.add_row("Mean Memory Delta", f"{mean_mem:.1f} MB")
max_mem = metrics.get("chunk_mem_delta_max_mb")
min_mem = metrics.get("chunk_mem_delta_min_mb")
if min_mem is not None:
table.add_row(" • Min", f"{min_mem:.1f} MB")
if max_mem is not None:
table.add_row(" • Max", f"{max_mem:.1f} MB")
# Event statistics
chunk_events_mean = metrics.get("chunk_events_mean")
if chunk_events_mean is not None:
table.add_row("Mean Events/Chunk", f"{chunk_events_mean:.0f}")
min_events = metrics.get("chunk_events_min")
max_events = metrics.get("chunk_events_max")
if min_events is not None:
table.add_row(" • Min", f"{min_events:.0f}")
if max_events is not None:
table.add_row(" • Max", f"{max_events:.0f}")
# Per-dataset breakdown
per_dataset = metrics.get("per_dataset")
if per_dataset and len(per_dataset) > 1:
table.add_row("", "") # Spacer
table.add_row("Per-Dataset Breakdown", "", style="bold")
for dataset, data in per_dataset.items():
num_dataset_chunks = data.get("num_chunks", 0)
mean_time = data.get("mean_duration", 0)
table.add_row(
f" {dataset}",
f"{num_dataset_chunks} chunks, {_format_time(mean_time)} avg",
)
# Section timing breakdown
sections = metrics.get("sections")
if sections:
table.add_row("", "") # Spacer
table.add_row("Section Timing", "", style="bold")
# Sort by total duration (most expensive first)
sorted_sections = sorted(
sections.items(),
key=lambda x: x[1].get("total_duration", 0),
reverse=True,
)
for name, data in sorted_sections[:5]: # Top 5 sections
mean_time = data.get("mean_duration", 0)
count = data.get("count", 0)
section_type = data.get("type", "section")
if section_type == "memory":
mean_mem = data.get("mean_mem_delta_mb", 0)
table.add_row(
f" {name}",
f"{_format_time(mean_time)} ({count}x), {mean_mem:.1f} MB avg",
)
else:
table.add_row(f" {name}", f"{_format_time(mean_time)} ({count}x)")
return table