Quickstart

Installation

pip install roastcoffea

Or with pixi:

pixi add roastcoffea

Basic Usage

Wrap your Coffea workflow with MetricsCollector:

from coffea import processor
from coffea.nanoevents import NanoAODSchema
from dask.distributed import Client
from roastcoffea import MetricsCollector


# Your processor
class MyProcessor(processor.ProcessorABC):
    def process(self, events):
        jets = events.Jet
        selected = jets[jets.pt > 30]
        return {"sum": len(events), "njets": len(selected)}

    def postprocess(self, accumulator):
        return accumulator


# Setup
client = Client()
my_processor = MyProcessor()

# Collect metrics
with MetricsCollector(client, processor_instance=my_processor) as collector:
    executor = processor.DaskExecutor(client=client)
    runner = processor.Runner(
        executor=executor,
        schema=NanoAODSchema,
        chunksize=100_000,
        savemetrics=True,
    )

    output, report = runner(fileset, processor_instance=my_processor)
    collector.set_coffea_report(report)

# View results
collector.print_summary()

The summary includes throughput, resource usage, timing, and CPU/IO breakdown.

Next steps

📖 Tutorial

Step through Tutorial for chunk tracking and fine-grained profiling examples.

💡 Concepts

Read Core Concepts to understand what each metric means and how they’re calculated.

📓 Examples

Check the example notebooks for complete workflows.