feat: Add direct native shuffle execution optimization [experimental]#3230
feat: Add direct native shuffle execution optimization [experimental]#3230andygrove wants to merge 16 commits intoapache:mainfrom
Conversation
This PR introduces an experimental optimization that allows the native shuffle writer to directly execute the child native plan instead of reading intermediate batches via JNI. This avoids the JNI round-trip for single-source native plans. Current flow: Native Plan → ColumnarBatch → JNI → ScanExec → ShuffleWriterExec Optimized flow: Native Plan → (directly in native) → ShuffleWriterExec The optimization is: - Disabled by default (spark.comet.exec.shuffle.directNative.enabled=false) - Only applies to CometNativeShuffle (not columnar JVM shuffle) - Only applies to single-source native scans (CometNativeScanExec) - Does not apply to RangePartitioning (requires sampling) Changes: - CometShuffleDependency: Added childNativePlan field to pass native plan - CometShuffleExchangeExec: Added detection logic for single-source native plans - CometShuffleManager: Pass native plan to shuffle writer - CometNativeShuffleWriter: Use child native plan directly when available - CometConf: Added COMET_SHUFFLE_DIRECT_NATIVE_ENABLED config option - CometDirectNativeShuffleSuite: Comprehensive test suite with 15 tests Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3230 +/- ##
============================================
+ Coverage 56.12% 59.96% +3.83%
- Complexity 976 1433 +457
============================================
Files 119 170 +51
Lines 11743 15819 +4076
Branches 2251 2616 +365
============================================
+ Hits 6591 9486 +2895
- Misses 4012 5008 +996
- Partials 1140 1325 +185 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Subqueries (e.g., bloom filters with might_contain) are registered with the parent execution context ID. Direct native shuffle creates a new execution context with a different ID, causing subquery lookup to fail with "Subquery X not found for plan Y" errors. This change detects ScalarSubquery expressions in the child plan and falls back to the standard execution path when present. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
@viirya I'm curious what you think about this idea. Does it make sense? |
|
Looks like So this idea is to avoid the JVM/JNI overhead on the input. It sounds good. |
Resolve merge conflict in CometNativeShuffleWriter.scala by keeping the refactored inputOperator pattern and adding RoundRobinPartitioning support from main. Enable direct native shuffle by default. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Thanks for taking a look! I will keep experimenting with this. |
Resolve merge conflicts in CometConf.scala and CometNativeShuffleWriter.scala. Keep both COMET_SHUFFLE_DIRECT_NATIVE_ENABLED and COMET_SHUFFLE_DIRECT_READ_ENABLED configs. Preserve refactored inputOperator pattern in shuffle writer and apply upstream bug fix (.max -> .min for write buffer size clamping).
…oaden to support any native scan inputs
Fall back to normal execution when numPartitions is 0 (empty table with no data files) to avoid IllegalArgumentException from sparkContext.parallelize requiring positive partition count.
This reverts commit 9040f8d.
Summary
This PR introduces an experimental optimization that allows the native shuffle writer to directly execute the child native plan instead of reading intermediate batches via JNI. This avoids the JNI round-trip for single-source native plans.
It also avoids double allocating the memory pool for the task, as described in #3921.
Current flow:
Optimized flow:
Configuration
The optimization is controlled by a new config option:
spark.comet.exec.shuffle.directNative.enabled(default:false)Scope
The optimization currently applies when:
spark.comet.shuffle.mode=native)CometNativeScanExecRangePartitioning(which requires sampling)Changes
CometShuffleDependency.scalachildNativePlanfield to pass native plan to writerCometShuffleExchangeExec.scalaCometShuffleManager.scalaCometNativeShuffleWriter.scalaCometConf.scalaCOMET_SHUFFLE_DIRECT_NATIVE_ENABLEDconfig optionCometDirectNativeShuffleSuite.scalaTest plan
CometDirectNativeShuffleSuitewith 15 tests covering:CometNativeShuffleSuitetests still pass (16/16)🤖 Generated with Claude Code