From fe4a3228672c10be1ca4db5399c88b6f446b29d6 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Thu, 28 May 2026 10:02:40 +0200 Subject: [PATCH] feat(datafusion): report IcebergTableScan metrics --- .../datafusion/src/physical_plan/scan.rs | 97 ++++++++++++++++++- .../integrations/datafusion/src/table/mod.rs | 35 +++++++ 2 files changed, 129 insertions(+), 3 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 36539ae503..f5e309aa6e 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -26,10 +26,11 @@ use datafusion::error::Result as DFResult; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; use datafusion::prelude::Expr; -use futures::{Stream, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use iceberg::expr::Predicate; use iceberg::table::Table; @@ -53,6 +54,8 @@ pub struct IcebergTableScan { predicates: Option, /// Optional limit on the number of rows to return limit: Option, + /// Execution metrics for this scan. + metrics: ExecutionPlanMetricsSet, } impl IcebergTableScan { @@ -80,6 +83,7 @@ impl IcebergTableScan { projection, predicates, limit, + metrics: ExecutionPlanMetricsSet::new(), } } @@ -143,7 +147,7 @@ impl ExecutionPlan for IcebergTableScan { fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> DFResult { let fut = get_batch_stream( @@ -174,11 +178,30 @@ impl ExecutionPlan for IcebergTableScan { Box::pin(stream) }; + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + let measured_stream = stream_with_baseline_metrics(limited_stream, baseline_metrics); + Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), - limited_stream, + measured_stream, ))) } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn reset_state(self: Arc) -> DFResult> { + Ok(Arc::new(Self { + table: self.table.clone(), + snapshot_id: self.snapshot_id, + plan_properties: Arc::clone(&self.plan_properties), + projection: self.projection.clone(), + predicates: self.predicates.clone(), + limit: self.limit, + metrics: ExecutionPlanMetricsSet::new(), + })) + } } impl DisplayAs for IcebergTableScan { @@ -237,6 +260,19 @@ async fn get_batch_stream( Ok(Box::pin(stream)) } +fn stream_with_baseline_metrics( + mut stream: Pin> + Send>>, + baseline_metrics: BaselineMetrics, +) -> Pin> + Send>> { + futures::stream::poll_fn(move |cx| { + let baseline_metrics = baseline_metrics.clone(); + let _timer = baseline_metrics.elapsed_compute().timer(); + let poll = stream.as_mut().poll_next(cx); + baseline_metrics.record_poll(poll) + }) + .boxed() +} + fn get_column_names( schema: ArrowSchemaRef, projection: Option<&Vec>, @@ -247,3 +283,58 @@ fn get_column_names( .collect::>() }) } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion::arrow::array::Int64Array; + use datafusion::arrow::datatypes::{ + DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, + }; + use datafusion::arrow::record_batch::RecordBatch; + use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; + use futures::StreamExt; + + use super::stream_with_baseline_metrics; + + #[test] + fn stream_with_baseline_metrics_records_rows_and_compute() { + let metrics = ExecutionPlanMetricsSet::new(); + let baseline_metrics = BaselineMetrics::new(&metrics, 0); + let batch = make_batch(); + let stream = Box::pin(futures::stream::iter([Ok(batch)])); + let mut stream = stream_with_baseline_metrics(stream, baseline_metrics); + + futures::executor::block_on(async { + let batch = stream + .next() + .await + .expect("stream should return one item") + .expect("stream item should be valid"); + assert_eq!(batch.num_rows(), 3); + assert!(stream.next().await.is_none()); + }); + + let metrics = metrics.clone_inner(); + assert_eq!(metrics.output_rows(), Some(3)); + assert!( + metrics.elapsed_compute().is_some_and(|elapsed| elapsed > 0), + "elapsed_compute should be recorded" + ); + } + + fn make_batch() -> RecordBatch { + let schema = make_arrow_schema(); + let values = Arc::new(Int64Array::from(vec![1, 2, 3])); + RecordBatch::try_new(schema, vec![values]).unwrap() + } + + fn make_arrow_schema() -> ArrowSchemaRef { + Arc::new(ArrowSchema::new(vec![Field::new( + "id", + DataType::Int64, + false, + )])) + } +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 75b7988d8d..f72556a06a 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -519,6 +519,41 @@ mod tests { assert!(physical_plan.is_ok()); } + #[tokio::test] + async fn test_catalog_backed_provider_scan_reports_metrics() { + use datafusion::datasource::TableProvider; + + let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; + let table_provider = + IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) + .await + .unwrap(); + + let ctx = SessionContext::new(); + let scan_plan = table_provider + .scan(&ctx.state(), None, &[], None) + .await + .unwrap(); + let batches = datafusion::physical_plan::collect(Arc::clone(&scan_plan), ctx.task_ctx()) + .await + .unwrap(); + let output_rows = batches.iter().map(|batch| batch.num_rows()).sum(); + + let metrics = scan_plan.metrics().expect("scan should expose metrics"); + assert_eq!(metrics.output_rows(), Some(output_rows)); + assert!( + metrics.elapsed_compute().is_some_and(|elapsed| elapsed > 0), + "elapsed_compute should be recorded" + ); + + let reset_plan = scan_plan.reset_state().unwrap(); + let metrics = reset_plan + .metrics() + .expect("reset scan should expose metrics"); + assert_eq!(metrics.output_rows(), None); + assert_eq!(metrics.elapsed_compute(), None); + } + // Tests for IcebergTableProvider #[tokio::test]