Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 94 additions & 3 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use datafusion::error::Result as DFResult;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
use datafusion::prelude::Expr;
use futures::{Stream, TryStreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use iceberg::expr::Predicate;
use iceberg::table::Table;

Expand All @@ -53,6 +54,8 @@ pub struct IcebergTableScan {
predicates: Option<Predicate>,
/// Optional limit on the number of rows to return
limit: Option<usize>,
/// Execution metrics for this scan.
metrics: ExecutionPlanMetricsSet,
}

impl IcebergTableScan {
Expand Down Expand Up @@ -80,6 +83,7 @@ impl IcebergTableScan {
projection,
predicates,
limit,
metrics: ExecutionPlanMetricsSet::new(),
}
}

Expand Down Expand Up @@ -143,7 +147,7 @@ impl ExecutionPlan for IcebergTableScan {

fn execute(
&self,
_partition: usize,
partition: usize,
_context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let fut = get_batch_stream(
Expand Down Expand Up @@ -174,11 +178,30 @@ impl ExecutionPlan for IcebergTableScan {
Box::pin(stream)
};

let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let measured_stream = stream_with_baseline_metrics(limited_stream, baseline_metrics);

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
limited_stream,
measured_stream,
)))
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn reset_state(self: Arc<Self>) -> DFResult<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self {
table: self.table.clone(),
snapshot_id: self.snapshot_id,
plan_properties: Arc::clone(&self.plan_properties),
projection: self.projection.clone(),
predicates: self.predicates.clone(),
limit: self.limit,
metrics: ExecutionPlanMetricsSet::new(),
}))
}
}

impl DisplayAs for IcebergTableScan {
Expand Down Expand Up @@ -237,6 +260,19 @@ async fn get_batch_stream(
Ok(Box::pin(stream))
}

fn stream_with_baseline_metrics(
mut stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>,
baseline_metrics: BaselineMetrics,
) -> Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> {
futures::stream::poll_fn(move |cx| {
let baseline_metrics = baseline_metrics.clone();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think baseline_metrics.clone() is needed. BaselineMetrics::elapsed_compute() and record_poll() both take &self, so the captured value can be used directly. The clone is cheap (Arc/Count copies) but it's dead weight on every poll and reads as if there's a borrow problem to work around. Suggest:

futures::stream::poll_fn(move |cx| {
    let _timer = baseline_metrics.elapsed_compute().timer();
    let poll = stream.as_mut().poll_next(cx);
    baseline_metrics.record_poll(poll)
})

let _timer = baseline_metrics.elapsed_compute().timer();
let poll = stream.as_mut().poll_next(cx);
baseline_metrics.record_poll(poll)
})
.boxed()
}

fn get_column_names(
schema: ArrowSchemaRef,
projection: Option<&Vec<usize>>,
Expand All @@ -247,3 +283,58 @@ fn get_column_names(
.collect::<Vec<String>>()
})
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{
DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use futures::StreamExt;

use super::stream_with_baseline_metrics;

#[test]
fn stream_with_baseline_metrics_records_rows_and_compute() {
let metrics = ExecutionPlanMetricsSet::new();
let baseline_metrics = BaselineMetrics::new(&metrics, 0);
let batch = make_batch();
let stream = Box::pin(futures::stream::iter([Ok(batch)]));
let mut stream = stream_with_baseline_metrics(stream, baseline_metrics);

futures::executor::block_on(async {
let batch = stream
.next()
.await
.expect("stream should return one item")
.expect("stream item should be valid");
assert_eq!(batch.num_rows(), 3);
assert!(stream.next().await.is_none());
});

let metrics = metrics.clone_inner();
assert_eq!(metrics.output_rows(), Some(3));
assert!(
metrics.elapsed_compute().is_some_and(|elapsed| elapsed > 0),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream_with_baseline_metrics_records_rows_and_compute asserts output_rows and elapsed_compute. The PR description also lists output_batches, output_bytes, and completion timestamps as exposed. Those are in fact recorded — BaselineMetrics::record_poll → batch.record_output(...) updates output_batches and output_bytes (see datafusion/physical-expr-common/src/metrics/baseline.rs:331) — so adding assert!(metrics.output_batches() == Some(1)) and assert!(metrics.output_bytes().is_some_and(|b| b > 0)) is a cheap regression guard that matches the documented contract.

"elapsed_compute should be recorded"
);
}

fn make_batch() -> RecordBatch {
let schema = make_arrow_schema();
let values = Arc::new(Int64Array::from(vec![1, 2, 3]));
RecordBatch::try_new(schema, vec![values]).unwrap()
}

fn make_arrow_schema() -> ArrowSchemaRef {
Arc::new(ArrowSchema::new(vec![Field::new(
"id",
DataType::Int64,
false,
)]))
}
}
35 changes: 35 additions & 0 deletions crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,41 @@ mod tests {
assert!(physical_plan.is_ok());
}

#[tokio::test]
async fn test_catalog_backed_provider_scan_reports_metrics() {
use datafusion::datasource::TableProvider;

let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
let table_provider =
IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
.await
.unwrap();

let ctx = SessionContext::new();
let scan_plan = table_provider
.scan(&ctx.state(), None, &[], None)
.await
.unwrap();
let batches = datafusion::physical_plan::collect(Arc::clone(&scan_plan), ctx.task_ctx())
.await
.unwrap();
let output_rows = batches.iter().map(|batch| batch.num_rows()).sum();

let metrics = scan_plan.metrics().expect("scan should expose metrics");
assert_eq!(metrics.output_rows(), Some(output_rows));
assert!(
metrics.elapsed_compute().is_some_and(|elapsed| elapsed > 0),
"elapsed_compute should be recorded"
);

let reset_plan = scan_plan.reset_state().unwrap();
let metrics = reset_plan
.metrics()
.expect("reset scan should expose metrics");
assert_eq!(metrics.output_rows(), None);
assert_eq!(metrics.elapsed_compute(), None);
}

// Tests for IcebergTableProvider

#[tokio::test]
Expand Down
Loading