Skip to content

Implement streaming group by leaf stage operator for multi-stage engine#18035

Merged
yashmayya merged 3 commits intoapache:masterfrom
yashmayya:streaming-group-by
Apr 1, 2026
Merged

Implement streaming group by leaf stage operator for multi-stage engine#18035
yashmayya merged 3 commits intoapache:masterfrom
yashmayya:streaming-group-by

Conversation

@yashmayya
Copy link
Copy Markdown
Contributor

  • Adds StreamingGroupByCombineOperator, a new combine operator that flushes partial group-by results when accumulated groups reach a configurable threshold, instead of buffering all groups in memory before returning. This bounds server memory usage for high-cardinality group-by queries on MSE leaf stages while still performing partial aggregation to reduce data volume.
  • This offers a middle ground between skipping leaf stage group by (is_skip_leaf_stage_group_by) and full leaf stage group by.
  • Introduces the streamingGroupByFlushThreshold query option (e.g., SET streamingGroupByFlushThreshold = 5000) to control the flush threshold. When unset or 0, the existing GroupByCombineOperator is used.
  • Note that we disable trimming and result size bounding on the indexed table because the streaming operator is managing memory bounding externally via the flush threshold. When the table reaches the flush threshold, the operator drains and replaces it with a fresh table. Allowing the IndexedTable to independently trim or cap records would silently drop groups before flushing, producing incorrect partial aggregates that the downstream FINAL stage cannot recover from.
  • Correctness is maintained because hash exchange routes the same group key to the same FINAL worker and AggregationFunction.merge() is associative, so partial results from multiple flushes merge correctly.

@yashmayya yashmayya added enhancement Improvement to existing functionality multi-stage Related to the multi-stage query engine labels Mar 30, 2026
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Mar 30, 2026

Codecov Report

❌ Patch coverage is 69.29825% with 35 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.74%. Comparing base (6e13cc9) to head (4945229).

Files with missing lines Patch % Lines
...tor/streaming/StreamingGroupByCombineOperator.java 68.75% 13 Missing and 12 partials ⚠️
...va/org/apache/pinot/core/plan/CombinePlanNode.java 76.00% 2 Missing and 4 partials ⚠️
...pinot/core/plan/maker/InstancePlanMakerImplV2.java 50.00% 1 Missing and 1 partial ⚠️
...pinot/core/query/request/context/QueryContext.java 33.33% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18035      +/-   ##
============================================
- Coverage     63.78%   63.74%   -0.05%     
  Complexity     1538     1538              
============================================
  Files          3160     3161       +1     
  Lines        190752   190845      +93     
  Branches      29281    29301      +20     
============================================
- Hits         121677   121648      -29     
- Misses        59534    59639     +105     
- Partials       9541     9558      +17     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.71% <69.29%> (+0.01%) ⬆️
java-21 63.70% <69.29%> (-0.06%) ⬇️
temurin 63.74% <69.29%> (-0.05%) ⬇️
unittests 63.73% <69.29%> (-0.05%) ⬇️
unittests1 56.24% <69.29%> (-0.03%) ⬇️
unittests2 34.13% <0.00%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@yashmayya yashmayya marked this pull request as ready for review March 31, 2026 00:28
@yashmayya yashmayya requested a review from Copilot March 31, 2026 00:29
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an opt-in “streaming” group-by combine operator for multi-stage engine leaf stages that periodically flushes partial aggregates once the number of accumulated groups reaches a configurable threshold, bounding combine-stage memory usage.

Changes:

  • Introduces streamingGroupByFlushThreshold query option and wires it into QueryContext.
  • Selects StreamingGroupByCombineOperator in CombinePlanNode when streaming is enabled and the flush threshold is set.
  • Adds unit and integration tests to validate correctness and multi-flush behavior.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java Adds the new query option key constant for streaming group-by flushing.
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java Adds parsing helper for streamingGroupByFlushThreshold.
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java Stores the parsed flush threshold in query context.
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java Populates QueryContext from query options.
pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java Switches to streaming group-by combine operator when applicable.
pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java Implements the flushing/streaming combine operator for group-by.
pinot-core/src/test/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperatorTest.java Unit tests for multi-block flushing, correctness, and COUNT(*).
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java Integration test comparing streaming vs baseline group-by results.

Comment on lines +124 to +130
_numOperatorsFinished++;
continue;
}
mergeBlock((GroupByResultsBlock) resultsBlock);
if (_indexedTable != null && _indexedTable.size() >= _flushThreshold) {
return flushTable();
}
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flush threshold is only checked after an entire per-segment GroupByResultsBlock has been fully merged (i.e., after mergeBlock(...) returns). If a single segment block contains far more distinct groups than _flushThreshold, _indexedTable can still grow well beyond the intended bound before the flush happens, defeating the memory-bounding goal for high-cardinality queries. Consider flushing incrementally during merge (e.g., when the table reaches the threshold while iterating keys) and buffering any additional flushed blocks to be returned on subsequent nextBlock() calls.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of groups a single segment block can contribute is already bounded by numGroupsLimit (default 100K). That limit is enforced in the segment-level GroupKeyGenerator before results ever reach the combine operator. So the worst-case overshoot after merging one block is numGroupsLimit, which is an existing, accepted per-segment memory bound. I don't think adding mid merge flush here is worth the additional complexity.

@gortiz
Copy link
Copy Markdown
Contributor

gortiz commented Apr 1, 2026

Things to discuss:

  • StreamingGroupByCombineOperator does not carry groupsTrimmed / numGroupsLimitReached / numGroupsWarningLimitReached into emitted blocks. I guess the streaming threashold is expected to be larger than numGroupsLimit, but AFAIS there is nothing enforncing that.

  • We now use the threshold to pre-allocate stuff. I think that is fine, but the side effect is that a large threshold means we are going to use more memory than without streaming, right?

Copy link
Copy Markdown
Contributor

@gortiz gortiz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a couple of comments, but I think the current state is good enough to be merged. We can discuss and later work on the two details I mentioned

@yashmayya
Copy link
Copy Markdown
Contributor Author

StreamingGroupByCombineOperator does not carry groupsTrimmed / numGroupsLimitReached / numGroupsWarningLimitReached into emitted blocks. I guess the streaming threashold is expected to be larger than numGroupsLimit, but AFAIS there is nothing enforncing that.

Good point, there is indeed nothing enforcing that the streaming threshold is smaller (which is what I think you meant instead of larger) than the num groups limit, so we should wire that metadata in. In fact, we should be doing that regardless of the streaming threshold because the num groups limit can be breached even within a single segment so we should be carrying forward that metadata. And while we do explicitly disable group trimming in the streaming combine operator (because it doesn't make sense to have both a streaming flush threshold and trimming IMO), we still need to forward that metadata from the individual segment level operator blocks. I've made these changes.

We now use the threshold to pre-allocate stuff. I think that is fine, but the side effect is that a large threshold means we are going to use more memory than without streaming, right?

Yeah you're right - this is optimized for the expectation that the user sets the right thresholds for a particular query. Maybe we can revisit this in a future patch to find a more optimal initial allocation in case the streaming flush threshold is set to a super large value?

@yashmayya yashmayya force-pushed the streaming-group-by branch from d6d3543 to 4945229 Compare April 1, 2026 21:17
@yashmayya yashmayya merged commit 1c9d455 into apache:master Apr 1, 2026
16 checks passed
xiangfu0 pushed a commit to pinot-contrib/pinot-docs that referenced this pull request Apr 1, 2026
@xiangfu0
Copy link
Copy Markdown
Contributor

xiangfu0 commented Apr 1, 2026

Documentation PR opened: pinot-contrib/pinot-docs#608

This documents the new streamingGroupByFlushThreshold query option introduced in this PR.

xiangfu0 added a commit to pinot-contrib/pinot-docs that referenced this pull request Apr 1, 2026
…ng group by (apache/pinot#18035) (#608)

Co-authored-by: Pinot Docs Bot <docs-bot@pinot-contrib.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Improvement to existing functionality multi-stage Related to the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants