Skip to content

feat(datafusion): report IcebergTableScan metrics#2521

Open
geoffreyclaude wants to merge 1 commit into
apache:mainfrom
geoffreyclaude:fix/iceberg-scan-metrics
Open

feat(datafusion): report IcebergTableScan metrics#2521
geoffreyclaude wants to merge 1 commit into
apache:mainfrom
geoffreyclaude:fix/iceberg-scan-metrics

Conversation

@geoffreyclaude
Copy link
Copy Markdown

@geoffreyclaude geoffreyclaude commented May 28, 2026

Which issue does this PR close?

What changes are included in this PR?

IcebergTableScan now owns a DataFusion ExecutionPlanMetricsSet, returns it from ExecutionPlan::metrics(), and resets it from ExecutionPlan::reset_state().

The scan output stream is wrapped in a small poll_fn adapter that records BaselineMetrics while the Iceberg stream is polled. This exposes the standard DataFusion operator metrics such as elapsed_compute, output_rows, output_batches, output_bytes, and completion timestamps in EXPLAIN ANALYZE.

Focused tests cover the metrics wrapper directly and the catalog-backed provider execution path, including reset-state behavior.

Are these changes tested?

  • cargo test -p iceberg-datafusion stream_with_baseline_metrics_records_rows_and_compute --locked
  • cargo test -p iceberg-datafusion test_catalog_backed_provider_scan_reports_metrics --locked
  • cargo check -p iceberg-datafusion --locked
  • cargo test -p iceberg-datafusion --locked
  • cargo clippy -p iceberg-datafusion --all-targets --locked

@geoffreyclaude geoffreyclaude force-pushed the fix/iceberg-scan-metrics branch from 9f036f5 to fe4a322 Compare May 28, 2026 08:25
@geoffreyclaude geoffreyclaude marked this pull request as ready for review May 28, 2026 08:33
baseline_metrics: BaselineMetrics,
) -> Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> {
futures::stream::poll_fn(move |cx| {
let baseline_metrics = baseline_metrics.clone();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think baseline_metrics.clone() is needed. BaselineMetrics::elapsed_compute() and record_poll() both take &self, so the captured value can be used directly. The clone is cheap (Arc/Count copies) but it's dead weight on every poll and reads as if there's a borrow problem to work around. Suggest:

futures::stream::poll_fn(move |cx| {
    let _timer = baseline_metrics.elapsed_compute().timer();
    let poll = stream.as_mut().poll_next(cx);
    baseline_metrics.record_poll(poll)
})

let metrics = metrics.clone_inner();
assert_eq!(metrics.output_rows(), Some(3));
assert!(
metrics.elapsed_compute().is_some_and(|elapsed| elapsed > 0),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream_with_baseline_metrics_records_rows_and_compute asserts output_rows and elapsed_compute. The PR description also lists output_batches, output_bytes, and completion timestamps as exposed. Those are in fact recorded — BaselineMetrics::record_poll → batch.record_output(...) updates output_batches and output_bytes (see datafusion/physical-expr-common/src/metrics/baseline.rs:331) — so adding assert!(metrics.output_batches() == Some(1)) and assert!(metrics.output_bytes().is_some_and(|b| b > 0)) is a cheap regression guard that matches the documented contract.

Copy link
Copy Markdown
Collaborator

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestions, thanks for tackling this @geoffreyclaude!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Report DataFusion operator metrics in IcebergTableScan

2 participants