diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py index f49bfb48da..f060056196 100644 --- a/pyiceberg/io/pyiceberg_core.py +++ b/pyiceberg/io/pyiceberg_core.py @@ -146,6 +146,16 @@ def _read_field_ids( return ids +def can_read_projected_schema_with_pyiceberg_core( + schema: Schema, + projected_schema: Schema, + row_filter: BooleanExpression, + case_sensitive: bool, +) -> bool: + """Return whether pyiceberg-core can read exactly the requested projection for this filter.""" + return _expression_field_ids(row_filter, schema, case_sensitive).issubset(projected_schema.field_ids) + + _UNARY_METHODS: dict[type[BooleanExpression], str] = { IsNull: "is_null", NotNull: "is_not_null", @@ -291,3 +301,30 @@ def file_scan_task_to_pyiceberg_core( name_mapping=_model_json(name_mapping) if name_mapping is not None else None, case_sensitive=case_sensitive, ) + + +def arrow_batch_reader_from_pyiceberg_core( + file_io: FileIO, + tasks: Iterable[FileScanTask], + schema: Schema, + projected_schema: Schema, + partition_specs: dict[int, PartitionSpec], + name_mapping: NameMapping | None, + case_sensitive: bool = True, +) -> Any: + """Read PyIceberg scan tasks through pyiceberg-core's ArrowReader.""" + core_tasks = [ + file_scan_task_to_pyiceberg_core( + task, + schema, + projected_schema, + partition_spec=partition_specs.get(task.file.spec_id), + name_mapping=name_mapping, + case_sensitive=case_sensitive, + project_field_ids=list(projected_schema.field_ids), + ) + for task in tasks + ] + + reader = _core_module("scan").ArrowReader(file_io_to_pyiceberg_core(file_io)) + return reader.read(schema_to_pyiceberg_core(projected_schema), core_tasks) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 64ad10050d..ad45eac2c7 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -110,6 +110,7 @@ ALWAYS_TRUE = AlwaysTrue() DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" +PYICEBERG_RUST_ARROW_SCAN = "PYICEBERG_RUST_ARROW_SCAN" @dataclass() @@ -2242,9 +2243,36 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow - target_schema = schema_to_pyarrow(self.projection()) + projected_schema = self.projection() + if self.limit is None and os.environ.get(PYICEBERG_RUST_ARROW_SCAN, "").lower() in {"1", "true", "yes"}: + from pyiceberg.io.pyiceberg_core import ( + arrow_batch_reader_from_pyiceberg_core, + can_read_projected_schema_with_pyiceberg_core, + ) + + if can_read_projected_schema_with_pyiceberg_core( + self.table_metadata.schema(), projected_schema, self.row_filter, self.case_sensitive + ): + try: + return arrow_batch_reader_from_pyiceberg_core( + self.io, + self.plan_files(), + self.table_metadata.schema(), + projected_schema, + self.table_metadata.specs(), + self.table_metadata.name_mapping(), + self.case_sensitive, + ) + except (ModuleNotFoundError, NotImplementedError, ValueError) as exc: + warnings.warn( + f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", + RuntimeWarning, + stacklevel=2, + ) + + target_schema = schema_to_pyarrow(projected_schema) batches = ArrowScan( - self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + self.table_metadata, self.io, projected_schema, self.row_filter, self.case_sensitive, self.limit ).to_record_batches(self.plan_files()) return pa.RecordBatchReader.from_batches( diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py index 0e1fd0f95b..8d516d96cf 100644 --- a/tests/io/test_pyiceberg_core.py +++ b/tests/io/test_pyiceberg_core.py @@ -26,6 +26,7 @@ from pyiceberg.expressions import And, EqualTo, IsNull, StartsWith from pyiceberg.io import FileIO, InputFile, OutputFile from pyiceberg.io.pyiceberg_core import ( + can_read_projected_schema_with_pyiceberg_core, delete_file_to_pyiceberg_core, expression_to_pyiceberg_core, file_io_to_pyiceberg_core, @@ -295,6 +296,13 @@ def test_file_scan_task_to_pyiceberg_core_adds_filter_only_field_to_read_project assert converted.kwargs["project_field_ids"] == [1, 2] +def test_can_read_projected_schema_with_pyiceberg_core_requires_filter_fields(simple_schema: Schema) -> None: + projected_schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=3) + + assert not can_read_projected_schema_with_pyiceberg_core(simple_schema, projected_schema, EqualTo("data", "abc"), True) + assert can_read_projected_schema_with_pyiceberg_core(simple_schema, projected_schema, EqualTo("id", 1), True) + + def test_file_scan_task_to_pyiceberg_core_requires_partition_spec_for_partitioned_task(simple_schema: Schema) -> None: data_file = DataFile.from_args( content=DataFileContent.DATA,