Skip to content

Commit 01fef8e

Browse files
committed
HIVE-25948: Iceberg: Enable cost-based selection between FanoutWriter and ClusteredWriter based on column stats NDV
1 parent 931d4bb commit 01fef8e

3 files changed

Lines changed: 540 additions & 35 deletions

File tree

iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ insert into tbl_src values (1, 'EUR', 10), (2, 'EUR', 10), (3, 'USD', 11), (4, '
2525
insert into tbl_src values (10, 'EUR', 12), (20, 'EUR', 11), (30, 'USD', 100), (40, 'EUR', 10), (50, 'HUF', 30), (60, 'USD', 12), (70, 'USD', 20), (80, 'PLN', 100), (90, 'PLN', 18), (100, 'CZK', 12), (110, NULL, NULL);
2626

2727
create external table tbl_target_identity (a int) partitioned by (ccy string) stored by iceberg stored as orc;
28+
-- threshold = 0 (default, cost-based): NDV of b (~5) < MAX_WRITERS -> no sort (FanoutWriter)
2829
explain insert overwrite table tbl_target_identity select a, b from tbl_src;
2930
insert overwrite table tbl_target_identity select a, b from tbl_src;
3031
select * from tbl_target_identity order by a, ccy;
3132

3233
--bucketed case - should invoke GenericUDFIcebergBucket to calculate buckets before sorting
3334
create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc;
35+
-- threshold = 0 (default, cost-based): NDV of b (~5) < MAX_WRITERS -> no sort (FanoutWriter)
3436
explain insert into table tbl_target_bucket select a, b from tbl_src;
3537
insert into table tbl_target_bucket select a, b from tbl_src;
3638
select * from tbl_target_bucket order by a, ccy;
@@ -151,4 +153,31 @@ tblproperties ('parquet.compression'='snappy','format-version'='2');
151153

152154
explain insert into tbl_hour_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018);
153155
insert into tbl_hour_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018);
154-
select * from tbl_hour_timestamp order by id, date_time_timestamp;
156+
select * from tbl_hour_timestamp order by id, date_time_timestamp;
157+
158+
-- threshold = -1: never sort -> FanoutWriter
159+
set hive.optimize.sort.dynamic.partition.threshold=-1;
160+
explain insert into tbl_target_identity select a, b from tbl_src;
161+
explain insert into tbl_target_bucket select a, b from tbl_src;
162+
163+
-- threshold = 1: always sort -> ClusteredWriter
164+
set hive.optimize.sort.dynamic.partition.threshold=1;
165+
explain insert into tbl_target_identity select a, b from tbl_src;
166+
explain insert into tbl_target_bucket select a, b from tbl_src;
167+
168+
-- threshold = 2: NDV of b (~5) > 2 -> sort (ClusteredWriter)
169+
set hive.optimize.sort.dynamic.partition.threshold=2;
170+
explain insert into tbl_target_identity select a, b from tbl_src;
171+
explain insert into tbl_target_bucket select a, b from tbl_src;
172+
173+
-- threshold = 100: NDV of b (~5) <= 100 -> no sort (FanoutWriter)
174+
set hive.optimize.sort.dynamic.partition.threshold=100;
175+
explain insert into tbl_target_identity select a, b from tbl_src;
176+
explain insert into tbl_target_bucket select a, b from tbl_src;
177+
178+
-- write.fanout.enabled=false: SerDe forces threshold=1 -> always ClusteredWriter
179+
set hive.optimize.sort.dynamic.partition.threshold=0;
180+
drop table if exists tbl_target_nofanout;
181+
create external table tbl_target_nofanout (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc
182+
tblproperties ('write.fanout.enabled'='false');
183+
explain insert into tbl_target_nofanout select a, b from tbl_src;

0 commit comments

Comments
 (0)