Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions pyiceberg/io/pyiceberg_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ def _read_field_ids(
return ids


def can_read_projected_schema_with_pyiceberg_core(
schema: Schema,
projected_schema: Schema,
row_filter: BooleanExpression,
case_sensitive: bool,
) -> bool:
"""Return whether pyiceberg-core can read exactly the requested projection for this filter."""
return _expression_field_ids(row_filter, schema, case_sensitive).issubset(projected_schema.field_ids)


_UNARY_METHODS: dict[type[BooleanExpression], str] = {
IsNull: "is_null",
NotNull: "is_not_null",
Expand Down Expand Up @@ -291,3 +301,30 @@ def file_scan_task_to_pyiceberg_core(
name_mapping=_model_json(name_mapping) if name_mapping is not None else None,
case_sensitive=case_sensitive,
)


def arrow_batch_reader_from_pyiceberg_core(
file_io: FileIO,
tasks: Iterable[FileScanTask],
schema: Schema,
projected_schema: Schema,
partition_specs: dict[int, PartitionSpec],
name_mapping: NameMapping | None,
case_sensitive: bool = True,
) -> Any:
"""Read PyIceberg scan tasks through pyiceberg-core's ArrowReader."""
core_tasks = [
file_scan_task_to_pyiceberg_core(
task,
schema,
projected_schema,
partition_spec=partition_specs.get(task.file.spec_id),
name_mapping=name_mapping,
case_sensitive=case_sensitive,
project_field_ids=list(projected_schema.field_ids),
)
for task in tasks
]

reader = _core_module("scan").ArrowReader(file_io_to_pyiceberg_core(file_io))
return reader.read(schema_to_pyiceberg_core(projected_schema), core_tasks)
32 changes: 30 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@

ALWAYS_TRUE = AlwaysTrue()
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"
PYICEBERG_RUST_ARROW_SCAN = "PYICEBERG_RUST_ARROW_SCAN"


@dataclass()
Expand Down Expand Up @@ -2242,9 +2243,36 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:

from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow

target_schema = schema_to_pyarrow(self.projection())
projected_schema = self.projection()
if self.limit is None and os.environ.get(PYICEBERG_RUST_ARROW_SCAN, "").lower() in {"1", "true", "yes"}:
from pyiceberg.io.pyiceberg_core import (
arrow_batch_reader_from_pyiceberg_core,
can_read_projected_schema_with_pyiceberg_core,
)

if can_read_projected_schema_with_pyiceberg_core(
self.table_metadata.schema(), projected_schema, self.row_filter, self.case_sensitive
):
try:
return arrow_batch_reader_from_pyiceberg_core(
self.io,
self.plan_files(),
self.table_metadata.schema(),
projected_schema,
self.table_metadata.specs(),
self.table_metadata.name_mapping(),
self.case_sensitive,
)
except (ModuleNotFoundError, NotImplementedError, ValueError) as exc:
warnings.warn(
f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}",
RuntimeWarning,
stacklevel=2,
)

target_schema = schema_to_pyarrow(projected_schema)
batches = ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
self.table_metadata, self.io, projected_schema, self.row_filter, self.case_sensitive, self.limit
).to_record_batches(self.plan_files())

return pa.RecordBatchReader.from_batches(
Expand Down
8 changes: 8 additions & 0 deletions tests/io/test_pyiceberg_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from pyiceberg.expressions import And, EqualTo, IsNull, StartsWith
from pyiceberg.io import FileIO, InputFile, OutputFile
from pyiceberg.io.pyiceberg_core import (
can_read_projected_schema_with_pyiceberg_core,
delete_file_to_pyiceberg_core,
expression_to_pyiceberg_core,
file_io_to_pyiceberg_core,
Expand Down Expand Up @@ -295,6 +296,13 @@ def test_file_scan_task_to_pyiceberg_core_adds_filter_only_field_to_read_project
assert converted.kwargs["project_field_ids"] == [1, 2]


def test_can_read_projected_schema_with_pyiceberg_core_requires_filter_fields(simple_schema: Schema) -> None:
projected_schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=3)

assert not can_read_projected_schema_with_pyiceberg_core(simple_schema, projected_schema, EqualTo("data", "abc"), True)
assert can_read_projected_schema_with_pyiceberg_core(simple_schema, projected_schema, EqualTo("id", 1), True)


def test_file_scan_task_to_pyiceberg_core_requires_partition_spec_for_partitioned_task(simple_schema: Schema) -> None:
data_file = DataFile.from_args(
content=DataFileContent.DATA,
Expand Down