Implement streaming group by leaf stage operator for multi-stage engine#18035
Implement streaming group by leaf stage operator for multi-stage engine#18035yashmayya merged 3 commits intoapache:masterfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18035 +/- ##
============================================
- Coverage 63.78% 63.74% -0.05%
Complexity 1538 1538
============================================
Files 3160 3161 +1
Lines 190752 190845 +93
Branches 29281 29301 +20
============================================
- Hits 121677 121648 -29
- Misses 59534 59639 +105
- Partials 9541 9558 +17
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Adds an opt-in “streaming” group-by combine operator for multi-stage engine leaf stages that periodically flushes partial aggregates once the number of accumulated groups reaches a configurable threshold, bounding combine-stage memory usage.
Changes:
- Introduces
streamingGroupByFlushThresholdquery option and wires it intoQueryContext. - Selects
StreamingGroupByCombineOperatorinCombinePlanNodewhen streaming is enabled and the flush threshold is set. - Adds unit and integration tests to validate correctness and multi-flush behavior.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java |
Adds the new query option key constant for streaming group-by flushing. |
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java |
Adds parsing helper for streamingGroupByFlushThreshold. |
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java |
Stores the parsed flush threshold in query context. |
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java |
Populates QueryContext from query options. |
pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java |
Switches to streaming group-by combine operator when applicable. |
pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java |
Implements the flushing/streaming combine operator for group-by. |
pinot-core/src/test/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperatorTest.java |
Unit tests for multi-block flushing, correctness, and COUNT(*). |
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java |
Integration test comparing streaming vs baseline group-by results. |
| _numOperatorsFinished++; | ||
| continue; | ||
| } | ||
| mergeBlock((GroupByResultsBlock) resultsBlock); | ||
| if (_indexedTable != null && _indexedTable.size() >= _flushThreshold) { | ||
| return flushTable(); | ||
| } |
There was a problem hiding this comment.
The flush threshold is only checked after an entire per-segment GroupByResultsBlock has been fully merged (i.e., after mergeBlock(...) returns). If a single segment block contains far more distinct groups than _flushThreshold, _indexedTable can still grow well beyond the intended bound before the flush happens, defeating the memory-bounding goal for high-cardinality queries. Consider flushing incrementally during merge (e.g., when the table reaches the threshold while iterating keys) and buffering any additional flushed blocks to be returned on subsequent nextBlock() calls.
There was a problem hiding this comment.
The number of groups a single segment block can contribute is already bounded by numGroupsLimit (default 100K). That limit is enforced in the segment-level GroupKeyGenerator before results ever reach the combine operator. So the worst-case overshoot after merging one block is numGroupsLimit, which is an existing, accepted per-segment memory bound. I don't think adding mid merge flush here is worth the additional complexity.
|
Things to discuss:
|
gortiz
left a comment
There was a problem hiding this comment.
I've added a couple of comments, but I think the current state is good enough to be merged. We can discuss and later work on the two details I mentioned
Good point, there is indeed nothing enforcing that the streaming threshold is smaller (which is what I think you meant instead of larger) than the num groups limit, so we should wire that metadata in. In fact, we should be doing that regardless of the streaming threshold because the num groups limit can be breached even within a single segment so we should be carrying forward that metadata. And while we do explicitly disable group trimming in the streaming combine operator (because it doesn't make sense to have both a streaming flush threshold and trimming IMO), we still need to forward that metadata from the individual segment level operator blocks. I've made these changes.
Yeah you're right - this is optimized for the expectation that the user sets the right thresholds for a particular query. Maybe we can revisit this in a future patch to find a more optimal initial allocation in case the streaming flush threshold is set to a super large value? |
d6d3543 to
4945229
Compare
|
Documentation PR opened: pinot-contrib/pinot-docs#608 This documents the new |
…ng group by (apache/pinot#18035) (#608) Co-authored-by: Pinot Docs Bot <docs-bot@pinot-contrib.org>
StreamingGroupByCombineOperator, a new combine operator that flushes partial group-by results when accumulated groups reach a configurable threshold, instead of buffering all groups in memory before returning. This bounds server memory usage for high-cardinality group-by queries on MSE leaf stages while still performing partial aggregation to reduce data volume.is_skip_leaf_stage_group_by) and full leaf stage group by.streamingGroupByFlushThresholdquery option (e.g.,SET streamingGroupByFlushThreshold = 5000) to control the flush threshold. When unset or 0, the existingGroupByCombineOperatoris used.IndexedTableto independently trim or cap records would silently drop groups before flushing, producing incorrect partial aggregates that the downstreamFINALstage cannot recover from.FINALworker andAggregationFunction.merge()is associative, so partial results from multiple flushes merge correctly.