roastcoffea.collector¶
Metrics collector context manager.
Main entry point for comprehensive metrics collection.
Classes
|
Context manager for collecting workflow metrics. |
- class roastcoffea.collector.MetricsCollector(client, backend='dask', track_workers=True, worker_tracking_interval=1.0, processor_instance=None)[source]¶
Bases:
objectContext manager for collecting workflow metrics.
This is the main user-facing API for metrics collection.
- Parameters:
client (distributed.Client) – Dask distributed client
backend (str, optional) – Backend name (default: “dask”)
track_workers (bool, optional) – Enable worker tracking (default: True)
worker_tracking_interval (float, optional) – Sampling interval in seconds (default: 1.0)
processor_instance (coffea.processor.ProcessorABC, optional) – Coffea processor instance. If provided, fine metrics will separate processor work from Dask overhead. Without this, all activities (including Dask internals) are aggregated together.
Examples
>>> from dask.distributed import Client >>> from roastcoffea.collector import MetricsCollector >>> >>> # Basic usage (aggregates all activities) >>> client = Client() >>> with MetricsCollector(client) as collector: ... output, report = runner(...) ... collector.set_coffea_report(report) >>> >>> # Recommended: separate processor from overhead >>> processor = MyProcessor() >>> with MetricsCollector(client, processor_instance=processor) as collector: ... output, report = runner(fileset, processor_instance=processor) ... collector.set_coffea_report(report) >>> >>> metrics = collector.metrics >>> print(f"Throughput: {metrics['data_rate_gbps']:.2f} Gbps") >>> print(f"Processor CPU: {metrics['processor_cpu_time_seconds']:.2f}s") >>> print(f"Dask overhead: {metrics['overhead_cpu_time_seconds']:.2f}s")
- record_chunk_metrics(chunk_data)[source]¶
Record metrics for a single chunk.
Called by @track_metrics decorator.
- Parameters:
chunk_data (dict) – Chunk metrics including timing, memory, metadata
- Return type:
None
- record_section_metrics(section_data)[source]¶
Record metrics for a section or memory tracking.
Called by track_section() and track_memory() context managers.
- Parameters:
section_data (dict) – Section metrics including timing, memory, metadata
- Return type:
None
- extract_metrics_from_output(output)[source]¶
Extract chunk metrics from Coffea output.
The @track_metrics decorator injects metrics as a list into the output: output[“__roastcoffea_metrics__”] = [chunk_metrics]
Coffea’s tree reduction naturally concatenates these lists across chunks. This method extracts and stores the concatenated list.
- Parameters:
output (dict) – Output dictionary from Coffea workflow
- Return type:
None
- set_coffea_report(report, custom_metrics=None)[source]¶
Set Coffea report and optionally aggregate immediately.
- get_metrics()[source]¶
Get aggregated metrics.
- Returns:
Aggregated metrics dictionary
- Return type:
- Raises:
RuntimeError – If metrics aggregation failed