Skip to content

Commit 168964e

Browse files
author
Babar
committed
eqd-1 final
1 parent 61c7f3e commit 168964e

2 files changed

Lines changed: 8 additions & 11 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2757,23 +2757,20 @@ def _apply_equality_deletes(data_table: pa.Table, delete_table: pa.Table, equali
27572757
for col in equality_columns:
27582758
data_type = data_table.schema.field(col).type
27592759
if delete_table[col].type != data_type:
2760-
delete_type_matched = delete_type_matched.set_column( delete_type_matched.schema.get_field_index(col), col, delete_table[col].cast(data_type) )
2760+
delete_type_matched = delete_type_matched.set_column(
2761+
delete_type_matched.schema.get_field_index(col),
2762+
col,
2763+
delete_table[col].cast(data_type)
2764+
)
27612765

27622766
delete_keys = set()
27632767
for row in delete_type_matched.to_pylist():
27642768
key = tuple(row[col] for col in equality_columns)
27652769
delete_keys.add(key)
27662770

2771+
column_arrays = [data_table[col] for col in equality_columns]
27672772

2768-
mask = []
2769-
for i in range(len(data_table)):
2770-
row_values = []
2771-
for col in equality_columns:
2772-
value = data_table[col][i].as_py()
2773-
row_values.append(value)
2774-
2775-
row_tuple = tuple(row_values)
2776-
mask.append(row_tuple not in delete_keys)
2773+
mask = [(tuple(arr[i].as_py() for arr in column_arrays) not in delete_keys) for i in range(len(data_table))]
27772774

27782775
result = data_table.filter(pa.array(mask))
27792776
return result

pyiceberg/table/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1691,7 +1691,7 @@ def _match_equality_deletes_to_data_file(data_entry: ManifestEntry, equality_del
16911691
relevant_entries = []
16921692
for entry in equality_delete_entries:
16931693
if entry.data_file.file_path == data_entry.data_file.file_path:
1694-
if (entry.sequence_number or INITIAL_SEQUENCE_NUMBER) < (data_entry.sequence_number or INITIAL_SEQUENCE_NUMBER):
1694+
if (entry.sequence_number - 1 or INITIAL_SEQUENCE_NUMBER) >= (data_entry.sequence_number or INITIAL_SEQUENCE_NUMBER):
16951695
if (entry.data_file.partition == data_entry.data_file.partition and entry.data_file.spec_id == data_entry.data_file.spec_id) or (entry.data_file.spec_id == UNPARTITIONED_PARTITION_SPEC.spec_id):
16961696
if entry.data_file.equality_ids is not None and data_entry.data_file.equality_ids is not None and set(entry.data_file.equality_ids) == set(data_entry.data_file.equality_ids):
16971697
relevant_entries.append(entry)

0 commit comments

Comments
 (0)