feat: Add sort-based shuffle repartitioning mode [do not merge - for discussion only]#3940
Closed
andygrove wants to merge 7 commits intoapache:mainfrom
Closed
feat: Add sort-based shuffle repartitioning mode [do not merge - for discussion only]#3940andygrove wants to merge 7 commits intoapache:mainfrom
andygrove wants to merge 7 commits intoapache:mainfrom
Conversation
Add a new shuffle partitioner that processes each batch immediately by sorting rows by partition ID using counting sort, slicing the sorted batch at partition boundaries, and writing each slice to per-partition spill files. This avoids per-partition builder memory overhead compared to the existing MultiPartitionShuffleRepartitioner. Also add spill_batch method to PartitionWriter for writing individual batches to spill files.
Add sort_based parameter to ShuffleWriterExec and external_shuffle to enable sort-based partitioning as an alternative to the default multi-partition hash repartitioner. When sort_based is true and more than one partition is requested, the SortBasedPartitioner is used. Add test cases for sort-based shuffle covering basic operation, larger and smaller batch sizes, and large numbers of partitions.
Add COMET_SHUFFLE_SORT_BASED config option and pass it through protobuf to the native ShuffleWriterExec, replacing the hardcoded `false` value.
…oner Replace per-call BufBatchWriter creation with persistent per-partition writers that keep BatchCoalescer state across calls. This allows small partition slices to be coalesced to batch_size before encoding, dramatically reducing per-block IPC schema overhead.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Related to #887 (memory over-reservation when running native shuffle write)
Rationale for this change
The current
MultiPartitionShuffleRepartitioner(buffered mode) buffers all input batches in memory before writing partitioned output. While memory-efficient per-partition (no per-partition builders), it can consume significant memory when the total input data is large.The
ImmediateModePartitionerfrom #3845 takes a different approach with per-partition Arrow builders, but as discussed in that PR, this causes memory to scale linearly with the number of output partitions — problematic for the common case of 1000+ partitions.This PR introduces a third approach: sort-based repartitioning. For each input batch, it:
taketo reorder the batch by partitionBufBatchWriters withBatchCoalescerThis avoids both buffering all input (like buffered mode) and per-partition Arrow builders (like immediate mode).
Benchmark Results (macOS M3 Ultra)
200 partitions, 4 GB memory limit, 100M rows, lz4:
800 partitions, 4 GB memory limit, 100M rows, lz4:
At moderate partition counts (200), sort-based is 36% faster with 41% less memory. At 800 partitions, the per-partition spill files and write buffers cause higher memory usage. This trade-off should be documented in the tuning guide.
What changes are included in this PR?
SortBasedPartitionerimplementingShufflePartitionertraitPartitionSpillWriterwith persistentBufBatchWriterper partition for batch coalescingsort_based: boolparameter onShuffleWriterExecspark.comet.exec.shuffle.sort_based(default: false)ShuffleWritermessage--mode sortinshuffle_benchspill_batchmethod onPartitionWriter(for potential future use)How are these changes tested?
test_sort_based_basic,test_sort_based_insert_larger_batch,test_sort_based_insert_smaller_batch,test_sort_based_large_number_of_partitions) covering Hash, Range, and RoundRobin partitioning