roastcoffea.collector

Metrics collector context manager.

Main entry point for comprehensive metrics collection.

Classes

MetricsCollector(client[, backend, ...])

Context manager for collecting workflow metrics.

class roastcoffea.collector.MetricsCollector(client, backend='dask', track_workers=True, worker_tracking_interval=1.0, processor_instance=None)[source]

Bases: object

Context manager for collecting workflow metrics.

This is the main user-facing API for metrics collection.

Parameters:
  • client (distributed.Client) – Dask distributed client

  • backend (str, optional) – Backend name (default: “dask”)

  • track_workers (bool, optional) – Enable worker tracking (default: True)

  • worker_tracking_interval (float, optional) – Sampling interval in seconds (default: 1.0)

  • processor_instance (coffea.processor.ProcessorABC, optional) – Coffea processor instance. If provided, fine metrics will separate processor work from Dask overhead. Without this, all activities (including Dask internals) are aggregated together.

Examples

>>> from dask.distributed import Client
>>> from roastcoffea.collector import MetricsCollector
>>>
>>> # Basic usage (aggregates all activities)
>>> client = Client()
>>> with MetricsCollector(client) as collector:
...     output, report = runner(...)
...     collector.set_coffea_report(report)
>>>
>>> # Recommended: separate processor from overhead
>>> processor = MyProcessor()
>>> with MetricsCollector(client, processor_instance=processor) as collector:
...     output, report = runner(fileset, processor_instance=processor)
...     collector.set_coffea_report(report)
>>>
>>> metrics = collector.metrics
>>> print(f"Throughput: {metrics['data_rate_gbps']:.2f} Gbps")
>>> print(f"Processor CPU: {metrics['processor_cpu_time_seconds']:.2f}s")
>>> print(f"Dask overhead: {metrics['overhead_cpu_time_seconds']:.2f}s")
record_chunk_metrics(chunk_data)[source]

Record metrics for a single chunk.

Called by @track_metrics decorator.

Parameters:

chunk_data (dict) – Chunk metrics including timing, memory, metadata

Return type:

None

record_section_metrics(section_data)[source]

Record metrics for a section or memory tracking.

Called by track_section() and track_memory() context managers.

Parameters:

section_data (dict) – Section metrics including timing, memory, metadata

Return type:

None

extract_metrics_from_output(output)[source]

Extract chunk metrics from Coffea output.

The @track_metrics decorator injects metrics as a list into the output: output[“__roastcoffea_metrics__”] = [chunk_metrics]

Coffea’s tree reduction naturally concatenates these lists across chunks. This method extracts and stores the concatenated list.

Parameters:

output (dict) – Output dictionary from Coffea workflow

Return type:

None

set_coffea_report(report, custom_metrics=None)[source]

Set Coffea report and optionally aggregate immediately.

Parameters:
  • report (dict) – Coffea report from Runner

  • custom_metrics (dict, optional) – Per-dataset custom metrics

Return type:

None

get_metrics()[source]

Get aggregated metrics.

Returns:

Aggregated metrics dictionary

Return type:

dict

Raises:

RuntimeError – If metrics aggregation failed

save_measurement(output_dir, measurement_name=None)[source]

Save measurement to disk.

Parameters:
  • output_dir (Path) – Output directory

  • measurement_name (str, optional) – Measurement name

Returns:

Path to measurement directory

Return type:

Path

print_summary()[source]

Print Rich table summary of metrics.

Return type:

None