Source code for roastcoffea.instrumentation

"""Instrumentation context managers for fine-grained tracking.

Provides track_time(), track_memory(), and track_bytes() context managers for
detailed profiling within processor methods.
"""

from __future__ import annotations

import time
from collections.abc import Generator
from contextlib import contextmanager

import awkward as ak
from coffea.processor import ProcessorABC


[docs] @contextmanager def track_time( processor_self: ProcessorABC, section_name: str ) -> Generator[None, None, None]: """Context manager to track timing for a named operation. Measures wall time for a specific operation within processor.process(). Useful for identifying bottlenecks and understanding where time is spent. Works in distributed Dask mode by writing directly to the processor instance's metrics container, which is then injected into the output by the @track_metrics decorator. Args: processor_self: The processor instance (self) section_name: Name of the operation (e.g., "jet_selection", "histogram_filling") Yields: None Usage:: from roastcoffea import track_metrics, track_time class MyProcessor(processor.ProcessorABC): @track_metrics def process(self, events): with track_time(self, "jet_selection"): jets = events.Jet[events.Jet.pt > 30] with track_time(self, "event_selection"): selected = events[ak.num(jets) >= 2] return {"sum": len(events)} Note: Timing metrics are automatically attached to the current chunk if used within a @track_metrics decorated function. If no collection is active, this context manager is a no-op. """ if processor_self and hasattr(processor_self, "_roastcoffea_current_chunk"): start_time = time.time() try: yield finally: duration = time.time() - start_time if "timing" not in processor_self._roastcoffea_current_chunk: processor_self._roastcoffea_current_chunk["timing"] = {} processor_self._roastcoffea_current_chunk["timing"][section_name] = duration else: # No collection active, just yield yield
[docs] @contextmanager def track_memory( processor_self: ProcessorABC, section_name: str ) -> Generator[None, None, None]: """Context manager to track memory usage for a named operation. Measures memory delta (before/after) for a specific operation. Useful for identifying memory-intensive operations. Works in distributed Dask mode by writing directly to the processor instance's metrics container, which is then injected into the output by the @track_metrics decorator. Args: processor_self: The processor instance (self) section_name: Name of the operation (e.g., "load_jets", "apply_corrections") Yields: None Usage:: from roastcoffea import track_metrics, track_memory class MyProcessor(processor.ProcessorABC): @track_metrics def process(self, events): with track_memory(self, "load_all_branches"): jets = events.Jet electrons = events.Electron muons = events.Muon return {"sum": len(events)} Note: Requires psutil package. If not available, memory tracking will be skipped gracefully (returns 0.0 for measurements). Note: Memory metrics are automatically attached to the current chunk if used within a @track_metrics decorated function. If no collection is active, this context manager is a no-op. """ if processor_self and hasattr(processor_self, "_roastcoffea_current_chunk"): try: import psutil process = psutil.Process() mem_before = process.memory_info().rss / 1024 / 1024 # MB except ImportError: mem_before = None try: yield finally: if mem_before is not None: try: import psutil process = psutil.Process() mem_after = process.memory_info().rss / 1024 / 1024 # MB delta_mb = mem_after - mem_before except Exception: delta_mb = 0.0 else: delta_mb = 0.0 if "memory" not in processor_self._roastcoffea_current_chunk: processor_self._roastcoffea_current_chunk["memory"] = {} processor_self._roastcoffea_current_chunk["memory"][section_name] = delta_mb else: # No collection active, just yield yield
[docs] @contextmanager def track_bytes( processor_self: ProcessorABC, events: ak.Array, section_name: str, ) -> Generator[None, None, None]: """Context manager to track bytes read from filehandle for a named operation. Measures the number of bytes read from the file source during a specific operation. Useful for identifying I/O-intensive operations and understanding data access patterns. Works in distributed Dask mode by writing directly to the processor instance's metrics container, which is then injected into the output by the @track_metrics decorator. Args: processor_self: The processor instance (self) events: Events object with metadata containing filehandle section_name: Name of the operation (e.g., "load_jets", "read_systematics") Yields: None Usage:: from roastcoffea import track_metrics, track_bytes class MyProcessor(processor.ProcessorABC): @track_metrics def process(self, events): with track_bytes(self, events, "jet_loading"): jets = events.Jet # Lazy loading triggers file reads with track_bytes(self, events, "muon_loading"): muons = events.Muon return {"sum": len(events)} Note: Requires the file_handle to be available via events.attrs["@events_factory"].file_handle with access to file_handle.file.source.num_requested_bytes. This is available when using the modified coffea version with file handle exposure. Note: Byte metrics are automatically attached to the current chunk if used within a @track_metrics decorated function. If no collection is active or no filehandle is available, this context manager is a no-op. """ if processor_self and hasattr(processor_self, "_roastcoffea_current_chunk"): # Check if file_handle is available for byte tracking (once) source = None try: factory = events.attrs.get("@events_factory") if factory and hasattr(factory, "file_handle"): file_handle = factory.file_handle if file_handle and hasattr(file_handle, "file"): source = file_handle.file.source if not hasattr(source, "num_requested_bytes"): source = None except Exception: source = None # Capture bytes at start if filehandle available bytes_before = 0 if source: try: bytes_before = source.num_requested_bytes except Exception: pass try: yield finally: # Capture bytes at end if filehandle available bytes_after = 0 if source: try: bytes_after = source.num_requested_bytes except Exception: pass bytes_delta = bytes_after - bytes_before if "bytes" not in processor_self._roastcoffea_current_chunk: processor_self._roastcoffea_current_chunk["bytes"] = {} processor_self._roastcoffea_current_chunk["bytes"][section_name] = ( bytes_delta ) else: # No collection active, just yield yield