feat(datafusion): report IcebergTableScan metrics#2521
Conversation
9f036f5 to
fe4a322
Compare
| baseline_metrics: BaselineMetrics, | ||
| ) -> Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> { | ||
| futures::stream::poll_fn(move |cx| { | ||
| let baseline_metrics = baseline_metrics.clone(); |
There was a problem hiding this comment.
I don't think baseline_metrics.clone() is needed. BaselineMetrics::elapsed_compute() and record_poll() both take &self, so the captured value can be used directly. The clone is cheap (Arc/Count copies) but it's dead weight on every poll and reads as if there's a borrow problem to work around. Suggest:
futures::stream::poll_fn(move |cx| {
let _timer = baseline_metrics.elapsed_compute().timer();
let poll = stream.as_mut().poll_next(cx);
baseline_metrics.record_poll(poll)
})| let metrics = metrics.clone_inner(); | ||
| assert_eq!(metrics.output_rows(), Some(3)); | ||
| assert!( | ||
| metrics.elapsed_compute().is_some_and(|elapsed| elapsed > 0), |
There was a problem hiding this comment.
stream_with_baseline_metrics_records_rows_and_compute asserts output_rows and elapsed_compute. The PR description also lists output_batches, output_bytes, and completion timestamps as exposed. Those are in fact recorded — BaselineMetrics::record_poll → batch.record_output(...) updates output_batches and output_bytes (see datafusion/physical-expr-common/src/metrics/baseline.rs:331) — so adding assert!(metrics.output_batches() == Some(1)) and assert!(metrics.output_bytes().is_some_and(|b| b > 0)) is a cheap regression guard that matches the documented contract.
mbutrovich
left a comment
There was a problem hiding this comment.
Minor suggestions, thanks for tackling this @geoffreyclaude!
Which issue does this PR close?
What changes are included in this PR?
IcebergTableScannow owns a DataFusionExecutionPlanMetricsSet, returns it fromExecutionPlan::metrics(), and resets it fromExecutionPlan::reset_state().The scan output stream is wrapped in a small
poll_fnadapter that recordsBaselineMetricswhile the Iceberg stream is polled. This exposes the standard DataFusion operator metrics such aselapsed_compute,output_rows,output_batches,output_bytes, and completion timestamps inEXPLAIN ANALYZE.Focused tests cover the metrics wrapper directly and the catalog-backed provider execution path, including reset-state behavior.
Are these changes tested?
cargo test -p iceberg-datafusion stream_with_baseline_metrics_records_rows_and_compute --lockedcargo test -p iceberg-datafusion test_catalog_backed_provider_scan_reports_metrics --lockedcargo check -p iceberg-datafusion --lockedcargo test -p iceberg-datafusion --lockedcargo clippy -p iceberg-datafusion --all-targets --locked