roastcoffea.instrumentation

Instrumentation context managers for fine-grained tracking.

Provides track_time(), track_memory(), and track_bytes() context managers for detailed profiling within processor methods.

Functions

track_bytes(processor_self, events, section_name)

Context manager to track bytes read from filehandle for a named operation.

track_memory(processor_self, section_name)

Context manager to track memory usage for a named operation.

track_time(processor_self, section_name)

Context manager to track timing for a named operation.

roastcoffea.instrumentation.track_time(processor_self, section_name)[source]

Context manager to track timing for a named operation.

Measures wall time for a specific operation within processor.process(). Useful for identifying bottlenecks and understanding where time is spent.

Works in distributed Dask mode by writing directly to the processor instance’s metrics container, which is then injected into the output by the @track_metrics decorator.

Parameters:
  • processor_self (ProcessorABC) – The processor instance (self)

  • section_name (str) – Name of the operation (e.g., “jet_selection”, “histogram_filling”)

Yields:

None

Return type:

Generator[None, None, None]

Usage:

from roastcoffea import track_metrics, track_time

class MyProcessor(processor.ProcessorABC):
    @track_metrics
    def process(self, events):
        with track_time(self, "jet_selection"):
            jets = events.Jet[events.Jet.pt > 30]

        with track_time(self, "event_selection"):
            selected = events[ak.num(jets) >= 2]

        return {"sum": len(events)}

Note

Timing metrics are automatically attached to the current chunk if used within a @track_metrics decorated function. If no collection is active, this context manager is a no-op.

roastcoffea.instrumentation.track_memory(processor_self, section_name)[source]

Context manager to track memory usage for a named operation.

Measures memory delta (before/after) for a specific operation. Useful for identifying memory-intensive operations.

Works in distributed Dask mode by writing directly to the processor instance’s metrics container, which is then injected into the output by the @track_metrics decorator.

Parameters:
  • processor_self (ProcessorABC) – The processor instance (self)

  • section_name (str) – Name of the operation (e.g., “load_jets”, “apply_corrections”)

Yields:

None

Return type:

Generator[None, None, None]

Usage:

from roastcoffea import track_metrics, track_memory

class MyProcessor(processor.ProcessorABC):
    @track_metrics
    def process(self, events):
        with track_memory(self, "load_all_branches"):
            jets = events.Jet
            electrons = events.Electron
            muons = events.Muon

        return {"sum": len(events)}

Note

Requires psutil package. If not available, memory tracking will be skipped gracefully (returns 0.0 for measurements).

Note

Memory metrics are automatically attached to the current chunk if used within a @track_metrics decorated function. If no collection is active, this context manager is a no-op.

roastcoffea.instrumentation.track_bytes(processor_self, events, section_name)[source]

Context manager to track bytes read from filehandle for a named operation.

Measures the number of bytes read from the file source during a specific operation. Useful for identifying I/O-intensive operations and understanding data access patterns.

Works in distributed Dask mode by writing directly to the processor instance’s metrics container, which is then injected into the output by the @track_metrics decorator.

Parameters:
  • processor_self (ProcessorABC) – The processor instance (self)

  • events (Array) – Events object with metadata containing filehandle

  • section_name (str) – Name of the operation (e.g., “load_jets”, “read_systematics”)

Yields:

None

Return type:

Generator[None, None, None]

Usage:

from roastcoffea import track_metrics, track_bytes

class MyProcessor(processor.ProcessorABC):
    @track_metrics
    def process(self, events):
        with track_bytes(self, events, "jet_loading"):
            jets = events.Jet  # Lazy loading triggers file reads

        with track_bytes(self, events, "muon_loading"):
            muons = events.Muon

        return {"sum": len(events)}

Note

Requires the file_handle to be available via events.attrs[“@events_factory”].file_handle with access to file_handle.file.source.num_requested_bytes. This is available when using the modified coffea version with file handle exposure.

Note

Byte metrics are automatically attached to the current chunk if used within a @track_metrics decorated function. If no collection is active or no filehandle is available, this context manager is a no-op.