roastcoffea.decorator¶
Decorator for chunk-level metrics tracking.
Provides @track_metrics decorator for automatic chunk boundary detection and metrics collection in processor.process() methods.
Functions
|
Decorator to track metrics for processor.process() method. |
- roastcoffea.decorator.track_metrics(func)[source]¶
Decorator to track metrics for processor.process() method.
Automatically captures chunk-level metrics including: - Wall time (start, end, duration) - Memory usage (before/after) - Bytes read from file source (if available) - Chunk metadata (dataset, file, entry range if available) - Per-chunk branch access metrics (accessed_branches, accessed_bytes, percentages) - Fine-grained timing/memory/bytes sections from context managers
The decorator works in distributed Dask mode by injecting metrics directly into the output dictionary as a list. Coffea’s tree reduction naturally concatenates these lists across chunks.
Usage:
from coffea import processor from roastcoffea import track_metrics, track_time, track_memory, track_bytes class MyProcessor(processor.ProcessorABC): @track_metrics def process(self, events): with track_time(self, "jet_selection"): jets = events.Jet[events.Jet.pt > 30] with track_memory(self, "histogram_filling"): # ... fill histograms with track_bytes(self, events, "muon_loading"): muons = events.Muon return {"sum": len(events)}
Note
The decorator requires an active MetricsCollector context. The collector sets _roastcoffea_collect_metrics = True on the processor instance to enable collection.
Note
Metrics are injected as: output[“__roastcoffea_metrics__”] = [chunk_metrics] The list format allows natural concatenation during Coffea’s tree reduction.