-
Notifications
You must be signed in to change notification settings - Fork 475
feat(datafusion): report IcebergTableScan metrics #2521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,10 +26,11 @@ use datafusion::error::Result as DFResult; | |
| use datafusion::execution::{SendableRecordBatchStream, TaskContext}; | ||
| use datafusion::physical_expr::EquivalenceProperties; | ||
| use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; | ||
| use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; | ||
| use datafusion::physical_plan::stream::RecordBatchStreamAdapter; | ||
| use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; | ||
| use datafusion::prelude::Expr; | ||
| use futures::{Stream, TryStreamExt}; | ||
| use futures::{Stream, StreamExt, TryStreamExt}; | ||
| use iceberg::expr::Predicate; | ||
| use iceberg::table::Table; | ||
|
|
||
|
|
@@ -53,6 +54,8 @@ pub struct IcebergTableScan { | |
| predicates: Option<Predicate>, | ||
| /// Optional limit on the number of rows to return | ||
| limit: Option<usize>, | ||
| /// Execution metrics for this scan. | ||
| metrics: ExecutionPlanMetricsSet, | ||
| } | ||
|
|
||
| impl IcebergTableScan { | ||
|
|
@@ -80,6 +83,7 @@ impl IcebergTableScan { | |
| projection, | ||
| predicates, | ||
| limit, | ||
| metrics: ExecutionPlanMetricsSet::new(), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -143,7 +147,7 @@ impl ExecutionPlan for IcebergTableScan { | |
|
|
||
| fn execute( | ||
| &self, | ||
| _partition: usize, | ||
| partition: usize, | ||
| _context: Arc<TaskContext>, | ||
| ) -> DFResult<SendableRecordBatchStream> { | ||
| let fut = get_batch_stream( | ||
|
|
@@ -174,11 +178,30 @@ impl ExecutionPlan for IcebergTableScan { | |
| Box::pin(stream) | ||
| }; | ||
|
|
||
| let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); | ||
| let measured_stream = stream_with_baseline_metrics(limited_stream, baseline_metrics); | ||
|
|
||
| Ok(Box::pin(RecordBatchStreamAdapter::new( | ||
| self.schema(), | ||
| limited_stream, | ||
| measured_stream, | ||
| ))) | ||
| } | ||
|
|
||
| fn metrics(&self) -> Option<MetricsSet> { | ||
| Some(self.metrics.clone_inner()) | ||
| } | ||
|
|
||
| fn reset_state(self: Arc<Self>) -> DFResult<Arc<dyn ExecutionPlan>> { | ||
| Ok(Arc::new(Self { | ||
| table: self.table.clone(), | ||
| snapshot_id: self.snapshot_id, | ||
| plan_properties: Arc::clone(&self.plan_properties), | ||
| projection: self.projection.clone(), | ||
| predicates: self.predicates.clone(), | ||
| limit: self.limit, | ||
| metrics: ExecutionPlanMetricsSet::new(), | ||
| })) | ||
| } | ||
| } | ||
|
|
||
| impl DisplayAs for IcebergTableScan { | ||
|
|
@@ -237,6 +260,19 @@ async fn get_batch_stream( | |
| Ok(Box::pin(stream)) | ||
| } | ||
|
|
||
| fn stream_with_baseline_metrics( | ||
| mut stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>, | ||
| baseline_metrics: BaselineMetrics, | ||
| ) -> Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> { | ||
| futures::stream::poll_fn(move |cx| { | ||
| let baseline_metrics = baseline_metrics.clone(); | ||
| let _timer = baseline_metrics.elapsed_compute().timer(); | ||
| let poll = stream.as_mut().poll_next(cx); | ||
| baseline_metrics.record_poll(poll) | ||
| }) | ||
| .boxed() | ||
| } | ||
|
|
||
| fn get_column_names( | ||
| schema: ArrowSchemaRef, | ||
| projection: Option<&Vec<usize>>, | ||
|
|
@@ -247,3 +283,58 @@ fn get_column_names( | |
| .collect::<Vec<String>>() | ||
| }) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use std::sync::Arc; | ||
|
|
||
| use datafusion::arrow::array::Int64Array; | ||
| use datafusion::arrow::datatypes::{ | ||
| DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, | ||
| }; | ||
| use datafusion::arrow::record_batch::RecordBatch; | ||
| use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; | ||
| use futures::StreamExt; | ||
|
|
||
| use super::stream_with_baseline_metrics; | ||
|
|
||
| #[test] | ||
| fn stream_with_baseline_metrics_records_rows_and_compute() { | ||
| let metrics = ExecutionPlanMetricsSet::new(); | ||
| let baseline_metrics = BaselineMetrics::new(&metrics, 0); | ||
| let batch = make_batch(); | ||
| let stream = Box::pin(futures::stream::iter([Ok(batch)])); | ||
| let mut stream = stream_with_baseline_metrics(stream, baseline_metrics); | ||
|
|
||
| futures::executor::block_on(async { | ||
| let batch = stream | ||
| .next() | ||
| .await | ||
| .expect("stream should return one item") | ||
| .expect("stream item should be valid"); | ||
| assert_eq!(batch.num_rows(), 3); | ||
| assert!(stream.next().await.is_none()); | ||
| }); | ||
|
|
||
| let metrics = metrics.clone_inner(); | ||
| assert_eq!(metrics.output_rows(), Some(3)); | ||
| assert!( | ||
| metrics.elapsed_compute().is_some_and(|elapsed| elapsed > 0), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| "elapsed_compute should be recorded" | ||
| ); | ||
| } | ||
|
|
||
| fn make_batch() -> RecordBatch { | ||
| let schema = make_arrow_schema(); | ||
| let values = Arc::new(Int64Array::from(vec![1, 2, 3])); | ||
| RecordBatch::try_new(schema, vec![values]).unwrap() | ||
| } | ||
|
|
||
| fn make_arrow_schema() -> ArrowSchemaRef { | ||
| Arc::new(ArrowSchema::new(vec![Field::new( | ||
| "id", | ||
| DataType::Int64, | ||
| false, | ||
| )])) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think
baseline_metrics.clone()is needed.BaselineMetrics::elapsed_compute()andrecord_poll()both take&self, so the captured value can be used directly. The clone is cheap (Arc/Count copies) but it's dead weight on every poll and reads as if there's a borrow problem to work around. Suggest: