Advanced Usage¶
Custom Backends¶
To support a new executor (e.g., TaskVine), implement AbstractMetricsBackend:
from roastcoffea.backends.base import AbstractMetricsBackend
class TaskVineMetricsBackend(AbstractMetricsBackend):
def __init__(self, taskvine_manager):
self.manager = taskvine_manager
def start_tracking(self, interval: float) -> None:
# Start collecting TaskVine metrics
pass
def stop_tracking(self) -> dict:
# Return collected metrics
return {"taskvine_metrics": ...}
def create_span(self, name: str) -> Any:
# Optional: return None if spans not supported
return None
def get_span_metrics(self, span_info: Any) -> dict:
return {}
def supports_fine_metrics(self) -> bool:
return False
Register it in aggregation/backends.py:
def get_parser(backend: str):
if backend == "dask":
return DaskMetricsParser()
elif backend == "taskvine":
return TaskVineMetricsParser()
# ...
Custom Instrumentation¶
Beyond track_time() and track_memory(), you can add custom metrics:
from roastcoffea import track_metrics
class CustomProcessor(processor.ProcessorABC):
@track_metrics
def process(self, events):
# Track custom metric
if hasattr(self, "_roastcoffea_current_chunk"):
self._roastcoffea_current_chunk["custom_count"] = 42
return {"sum": len(events)}
def postprocess(self, accumulator):
return accumulator
The custom field will appear in collector.chunk_metrics.
Disabling Metrics Collection¶
Control collection with the flag:
processor._roastcoffea_collect_metrics = False # Disable
processor._roastcoffea_collect_metrics = True # Enable
MetricsCollector sets this automatically in __enter__ and __exit__.
Worker Tracking Interval¶
Adjust sampling rate:
with MetricsCollector(client, worker_tracking_interval=0.5) as collector:
# Sample every 0.5 seconds instead of 1.0
...
Lower intervals give finer time resolution but slightly higher overhead.
Accessing Raw Data¶
Get unprocessed metrics:
# Raw chunk data
for chunk in collector.chunk_metrics:
print(chunk["duration"], chunk["num_events"])
# Raw worker tracking
if collector.tracking_data:
print(collector.tracking_data["worker_counts"]) # {timestamp: count}
# Raw Dask Spans
if collector.span_metrics:
print(collector.span_metrics) # cumulative_worker_metrics dict
Extending Aggregation¶
Add derived metrics in aggregation/efficiency.py:
def calculate_efficiency_metrics(workflow_metrics, worker_metrics):
# ... existing metrics ...
# Add custom metric
metrics["custom_efficiency"] = (
workflow_metrics["total_events"] / worker_metrics["avg_workers"]
)
return metrics
Next steps¶
🏗️ System design
Read Architecture to understand the component structure.
📊 Metrics reference
See Performance Metrics Reference for the complete list of available metrics.