Architecture¶
roastcoffea has a modular design with three main components.
Components¶
1. Backends (roastcoffea.backends)¶
Backends handle metrics collection for specific executors:
DaskMetricsBackend: Dask distributed executor
Worker tracking via scheduler sampling
Dask Spans for fine metrics
Span creation and retrieval
Additional backends can be added to support other executors (e.g., TaskVine) by implementing the backend interface.
Interface:
class AbstractMetricsBackend:
def start_tracking(self, interval: float) -> None: ...
def stop_tracking(self) -> dict: ...
def create_span(self, name: str) -> Any: ...
def get_span_metrics(self, span_info: Any) -> dict: ...
def supports_fine_metrics(self) -> bool: ...
2. Aggregators (roastcoffea.aggregation)¶
Aggregators combine metrics from multiple sources:
MetricsAggregator: Main entry point
Workflow aggregation: Coffea report + wall clock timing
Worker aggregation: Resource utilization time-series
Fine metrics parsing: Dask Spans activity breakdown
Chunk aggregation: Per-chunk statistics
Efficiency calculation: Derived metrics (ratios, percentages)
Data flow:
Coffea Report ──┐
Wall Clock ─────┤
Worker Tracking ├──> MetricsAggregator ──> Unified dict
Dask Spans ─────┤
Chunk Metrics ──┘
3. Exporters (roastcoffea.export)¶
Exporters handle output:
Reporter: Rich tables (
format_*_table()functions)Measurements: Save/load to disk (JSON + metadata)
Instrumentation¶
Decorator (@track_metrics)¶
Wraps process() to collect chunk-level metrics:
Check
_roastcoffea_collect_metricsflagInitialize
_roastcoffea_current_chunkdictCapture timing and memory
Extract chunk metadata from events
Inject metrics as list into output
Context Managers¶
track_time() and track_memory() write to _roastcoffea_current_chunk:
with track_time(self, "section"):
... # your code here
# Writes to self._roastcoffea_current_chunk["timing"]["section"]
Distributed Mode¶
The list-based accumulator approach:
Each worker returns
{..., "__roastcoffea_metrics__": [chunk_data]}Coffea’s tree reduction concatenates:
[a] + [b] = [a, b]Final output has all chunks:
[chunk1, chunk2, ..., chunkN]MetricsCollector.extract_metrics_from_output()retrieves the list
This works because Coffea uses + operator for aggregation, and lists concatenate naturally.
Extensibility¶
To add a new backend:
Implement
AbstractMetricsBackendAdd backend name to
get_parser()inaggregation/backends.pyUpdate
MetricsCollectorto support the backend
To add new metrics:
Collect raw data in backend or decorator
Add aggregation logic in
aggregation/Update reporter to display new metrics
Next steps¶
See Advanced Usage for custom backends and instrumentation.
Browse the full API documentation for implementation details.