roastcoffea.decorator

Decorator for chunk-level metrics tracking.

Provides @track_metrics decorator for automatic chunk boundary detection and metrics collection in processor.process() methods.

Functions

track_metrics(func)

Decorator to track metrics for processor.process() method.

roastcoffea.decorator.track_metrics(func)[source]

Decorator to track metrics for processor.process() method.

Automatically captures chunk-level metrics including: - Wall time (start, end, duration) - Memory usage (before/after) - Bytes read from file source (if available) - Chunk metadata (dataset, file, entry range if available) - Per-chunk branch access metrics (accessed_branches, accessed_bytes, percentages) - Fine-grained timing/memory/bytes sections from context managers

The decorator works in distributed Dask mode by injecting metrics directly into the output dictionary as a list. Coffea’s tree reduction naturally concatenates these lists across chunks.

Usage:

from coffea import processor
from roastcoffea import track_metrics, track_time, track_memory, track_bytes

class MyProcessor(processor.ProcessorABC):
    @track_metrics
    def process(self, events):
        with track_time(self, "jet_selection"):
            jets = events.Jet[events.Jet.pt > 30]

        with track_memory(self, "histogram_filling"):
            # ... fill histograms

        with track_bytes(self, events, "muon_loading"):
            muons = events.Muon

        return {"sum": len(events)}

Note

The decorator requires an active MetricsCollector context. The collector sets _roastcoffea_collect_metrics = True on the processor instance to enable collection.

Note

Metrics are injected as: output[“__roastcoffea_metrics__”] = [chunk_metrics] The list format allows natural concatenation during Coffea’s tree reduction.

Parameters:

func (Callable)

Return type:

Callable