Skip to content

refactor: implement generic shuffle interface for flight shuffle#6456

Merged
ohbh merged 7 commits intomainfrom
oh/generic_shuffle_interface
Mar 27, 2026
Merged

refactor: implement generic shuffle interface for flight shuffle#6456
ohbh merged 7 commits intomainfrom
oh/generic_shuffle_interface

Conversation

@ohbh
Copy link
Copy Markdown
Contributor

@ohbh ohbh commented Mar 23, 2026

Summary

This refactor starts turning shuffle into a generic interface instead of a Flight-specific special case.

The main motivation is to make shuffle and intermediate storage backends pluggable at the plan layer and task-output layer, so Flight is no longer wired through bespoke plan nodes and task input plumbing.

What Changed

At a high level, this PR:

  • Replaces FlightShuffleNode with generic ShuffleNode
  • Replaces FlightShuffleWrite / FlightShuffleRead with generic ShuffleWrite / ShuffleRead local plan nodes parameterized by backend enums
  • Introduces a TaskOutput enum so tasks can return either:
    • ordinary materialized partitions
    • shuffle-write metadata
  • Adapts Flight shuffle to this model end-to-end:
    • shuffle writes return FlightShufflePartitionRef metadata
    • the scheduler / pipeline layer consumes those refs to build downstream shuffle reads
    • the old Input::FlightShuffle path is removed

Why This Matters

The net effect is that Flight shuffle now fits the same conceptual model we want for future backends:

  • the plan describes a generic shuffle read/write operator
  • workers return backend-specific partition refs
  • downstream stages reconstruct reads from that metadata
  • shuffle no longer relies on a Flight-only input side channel

This lays the groundwork for supporting additional shuffle backends without repeating the same bespoke integration pattern.

@ohbh ohbh requested a review from srilman March 23, 2026 21:51
@ohbh ohbh requested a review from a team as a code owner March 23, 2026 21:51
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Mar 23, 2026

Greptile Summary

This PR refactors the Flight shuffle integration to fit a generic shuffle model: FlightShuffleWrite/FlightShuffleRead plan nodes are replaced by backend-parameterised ShuffleWrite/ShuffleRead, the Input::FlightShuffle side-channel is removed, and a new TaskOutput enum allows tasks to return either ordinary MaterializedOutput or ShuffleWriteOutput containing per-partition FlightShufflePartitionRef metadata. The scheduler pipeline then reconstructs ShuffleRead tasks from that metadata instead of relying on bespoke plumbing.

  • Breaking-change title — the PR removes several public APIs (FlightShuffleReadInput, Input::FlightShuffle, FlightShuffleWrite, FlightShuffleRead). Per project convention the title should be refactor!: rather than refactor:.
  • Redundant Ray remote calls — in flotilla.py _get_result, await self.actor_handle.get_address.remote() is called once per partition inside the list comprehension (line 226) instead of once before it; all partitions share the same actor address.
  • Inline path referencecrate::channel::create_channel is used via its full path inline inside FlightShuffleReadSource::get_data instead of being imported at the top of the file, inconsistent with the project's import style.
  • Unused fields in ShuffleWriteOutputworker_id and task_id are stored but have no accessor methods and are not read anywhere; either a comment or removal would clarify intent.

Confidence Score: 4/5

  • PR is safe to merge with minor cleanup remaining — no blocking logic errors found.
  • The core refactoring is architecturally sound: the TaskOutput enum is introduced cleanly, the into_materialized helper propagates errors correctly, and the server_cache_mapping is built from actual FlightShufflePartitionRef metadata rather than side-channel task IDs. The only actionable items are a redundant per-partition Ray remote call in flotilla.py, a missed top-level import in flight_shuffle_read.rs, and two unused fields in ShuffleWriteOutput — all P2 style issues with no correctness impact.
  • daft/runners/flotilla.py (redundant get_address calls) and src/daft-local-execution/src/sources/flight_shuffle_read.rs (inline path reference)

Important Files Changed

Filename Overview
daft/runners/flotilla.py Adds shuffle_write_info and cache_id fields to RaySwordfishTaskHandle to detect shuffle-write tasks and return RayTaskResult.shuffle_success with FlightShufflePartitionRef metadata; minor perf issue: get_address is awaited once per partition inside the list comprehension instead of once before it.
src/daft-distributed/src/pipeline_node/mod.rs Introduces FlightShufflePartitionRef, ShufflePartitionRef, ShuffleWriteOutput, and TaskOutput enum alongside a new task_outputs builder stream; worker_id and task_id in ShuffleWriteOutput are stored but never read.
src/daft-distributed/src/pipeline_node/shuffles/flight_shuffle.rs Core refactor: replaces bespoke FlightShuffleWrite/FlightShuffleRead plan nodes with generic ShuffleWrite/ShuffleRead; collects ShufflePartitionRef metadata from TaskOutput::ShuffleWrite to build the server_cache_mapping and emits one ShuffleRead task per partition. Logic is clean.
src/daft-local-plan/src/plan.rs Replaces FlightShuffleWrite/FlightShuffleRead plan nodes and their flat fields with ShuffleWrite/ShuffleRead parameterized by ShuffleWriteBackend/ShuffleReadBackend enums; removes FlightShuffleReadInput; clean and consistent throughout all match arms.
src/daft-local-execution/src/sources/flight_shuffle_read.rs Simplifies FlightShuffleReadSource by embedding shuffle params directly instead of receiving them via a channel; removes the complex receiver/task-set loop since each source now handles a single partition. Minor: crate::channel::create_channel is used inline rather than imported at the top of the file.
src/daft-local-execution/src/pipeline.rs Adapts physical_plan_to_pipeline to destructure ShuffleWriteBackend::Flight and ShuffleReadBackend::Flight enums; removes the old channel/InputSender::FlightShuffle plumbing and passes params directly to FlightShuffleReadSource.
src/daft-distributed/src/python/ray/task.rs Adds FlightShufflePartitionRef pyclass and a ShuffleSuccess variant to RayTaskResult; the From impl correctly maps the Python ref into the internal ShufflePartitionRef::Flight variant.
src/daft-distributed/src/pipeline_node/materialize.rs Generalises the pipeline output stream to TaskOutput; the original materialize_all_pipeline_outputs is preserved as a thin wrapper that calls into_materialized on each item, keeping callers that need MaterializedOutput unchanged.
src/daft-local-plan/src/lib.rs Cleans up public re-exports: removes FlightShuffleRead, FlightShuffleReadInput, FlightShuffleWrite; adds ShuffleRead, ShuffleReadBackend, ShuffleWrite, ShuffleWriteBackend, and PlaceholderScan; removes Input::FlightShuffle.
src/daft-local-execution/src/input_sender.rs Removes InputSender::FlightShuffle variant and its send arm; clean deletion since the side-channel input path is no longer needed.

Sequence Diagram

sequenceDiagram
    participant Scheduler
    participant Worker
    participant FlightServer
    participant FlightShuffleNode
    participant ShuffleReadSource

    Scheduler->>Worker: Submit SwordfishTask (ShuffleWrite plan)
    Worker->>FlightServer: Write partitions (FlightShuffleWriteSink)
    Worker-->>Scheduler: TaskOutput::ShuffleWrite(ShuffleWriteOutput{partitions: [FlightShufflePartitionRef]})

    note over FlightShuffleNode: Collect all ShuffleWriteOutput<br/>Build server_cache_mapping

    FlightShuffleNode->>Scheduler: Submit SwordfishTask (ShuffleRead plan × N partitions)
    Scheduler->>Worker: Run ShuffleRead plan
    Worker->>ShuffleReadSource: Instantiate with shuffle_id, partition_idx, server_cache_mapping
    ShuffleReadSource->>FlightServer: fetch_partition(shuffle_id, partition_idx, server_cache_mapping)
    FlightServer-->>ShuffleReadSource: MicroPartition stream
    ShuffleReadSource-->>Worker: Materialized partition
    Worker-->>Scheduler: TaskOutput::Materialized(MaterializedOutput)
Loading

Reviews (1): Last reviewed commit: "cleanup" | Re-trigger Greptile

Comment thread daft/runners/flotilla.py
Comment thread src/daft-local-execution/src/sources/flight_shuffle_read.rs Outdated
Comment thread src/daft-distributed/src/pipeline_node/mod.rs
Copy link
Copy Markdown
Contributor

@srilman srilman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw 2 main things, will do another pass for smaller stuff later. Also, can we run the integration tests, at least the TPCH ones, to confirm that we didn't accidentally break shuffles

Comment thread src/daft-distributed/src/scheduling/task.rs
};
for partition in output.partitions {
match partition {
ShufflePartitionRef::Flight(partition) => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this interface, each backend implementation of shuffle would need its own operator right, to understand how to pass between the write and read like in this case with flight. Is there some way to genericize it? Such that instead of a FlightShuffle operator, we have a generic Repartition operator like we have (that assumed Ray behind the scenes)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I organized it operator a little better to make it easier to add another enum. Also changed the name from FlightShuffleNode -> ShuffleNode
e5ca334

@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 23, 2026

Codecov Report

❌ Patch coverage is 23.67758% with 303 lines in your changes missing coverage. Please review.
✅ Project coverage is 75.07%. Comparing base (774179e) to head (1fa1ca7).
⚠️ Report is 5 commits behind head on main.

Files with missing lines Patch % Lines
...-distributed/src/pipeline_node/shuffles/shuffle.rs 24.33% 143 Missing ⚠️
src/daft-distributed/src/python/ray/task.rs 0.00% 48 Missing ⚠️
src/daft-local-plan/src/plan.rs 0.00% 22 Missing ⚠️
src/daft-distributed/src/pipeline_node/mod.rs 16.00% 21 Missing ⚠️
...local-execution/src/sources/flight_shuffle_read.rs 0.00% 17 Missing ⚠️
src/daft-local-execution/src/pipeline.rs 0.00% 15 Missing ⚠️
src/daft-local-plan/src/python.rs 0.00% 10 Missing ⚠️
daft/runners/flotilla.py 20.00% 8 Missing ⚠️
...tributed/src/pipeline_node/join/sort_merge_join.rs 0.00% 6 Missing ⚠️
src/daft-distributed/src/pipeline_node/limit.rs 0.00% 4 Missing ⚠️
... and 4 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #6456      +/-   ##
==========================================
- Coverage   75.14%   75.07%   -0.08%     
==========================================
  Files        1035     1035              
  Lines      139453   139622     +169     
==========================================
+ Hits       104798   104819      +21     
- Misses      34655    34803     +148     
Files with missing lines Coverage Δ
.../daft-distributed/src/pipeline_node/materialize.rs 97.71% <100.00%> (+0.08%) ⬆️
...ed/src/pipeline_node/shuffles/translate_shuffle.rs 95.78% <100.00%> (ø)
src/daft-distributed/src/python/mod.rs 40.93% <100.00%> (+0.39%) ⬆️
src/daft-distributed/src/python/ray/mod.rs 0.00% <ø> (ø)
...c/daft-distributed/src/scheduling/scheduler/mod.rs 85.32% <100.00%> (ø)
src/daft-distributed/src/scheduling/task.rs 69.89% <100.00%> (+0.12%) ⬆️
src/daft-local-plan/src/lib.rs 100.00% <ø> (ø)
src/daft-distributed/src/scheduling/dispatcher.rs 96.15% <85.71%> (-0.23%) ⬇️
...ibuted/src/scheduling/scheduler/scheduler_actor.rs 90.04% <90.90%> (-0.13%) ⬇️
...t-distributed/src/pipeline_node/into_partitions.rs 16.34% <0.00%> (-0.16%) ⬇️
... and 11 more

... and 12 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@ohbh ohbh force-pushed the oh/generic_shuffle_interface branch 2 times, most recently from 38ab5e6 to e5ca334 Compare March 24, 2026 17:32
@ohbh ohbh requested a review from srilman March 24, 2026 18:57
@ohbh
Copy link
Copy Markdown
Contributor Author

ohbh commented Mar 24, 2026

I saw 2 main things, will do another pass for smaller stuff later. Also, can we run the integration tests, at least the TPCH ones, to confirm that we didn't accidentally break shuffles

Thanks for the feedback! Addressed both comments. The integration tpch tests are passing https://github.com/Eventual-Inc/Daft/actions/runs/23503474727

Copy link
Copy Markdown
Contributor

@srilman srilman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is 1 small nit I have before merging, but overall LGTM. Thanks @ohbh!

Comment thread src/daft-local-execution/src/sources/flight_shuffle_read.rs Outdated
Comment thread src/daft-local-execution/src/sources/flight_shuffle_read.rs
@ohbh ohbh force-pushed the oh/generic_shuffle_interface branch from 5a17e5a to 1fa1ca7 Compare March 27, 2026 00:21
@ohbh ohbh requested a review from srilman March 27, 2026 00:22
@ohbh ohbh merged commit 216e03e into main Mar 27, 2026
83 of 88 checks passed
@ohbh ohbh deleted the oh/generic_shuffle_interface branch March 27, 2026 17:51
ohbh added a commit that referenced this pull request Apr 20, 2026
#6627)

## Summary

This PR moves Flight shuffle to a ref-first model. Instead of relying on
shuffle_cache and well-known file locations, the Flight server now owns
persisted shuffle data directly and the scheduler exchanges
FlightShufflePartitionRef values as first-class retrieval handles.

That makes Flight shuffle consistent with the broader direction of
passing explicit partition references through the distributed scheduler.

## Background

The flight shuffle cache existed previously because we had no way to
pass metadata of non-ray operations back to the scheduler. It operated
on being able to look up all partition data based only on a cache id and
partition index. This model is challenging for other operations, such as
into_batches or gather, that don't really rely on the partition
semantics.

#6456 introduced a way to pass
metadata as a first class object to the scheduler. With this, we can
pass MicroPartition refs directly back to the scheduler so it can make
intelligent scheduling decisions for distributed operators.

#6499 added some types to the
scheduler, but ended up being the wrong abstraction. This PR reverts
part of those changes, specifically around the scheduler passing around
TaskOutput types instead of MaterializedResult types. Now it's back to
how it was before the change

##  Changes Made

- Replaced the cache-based Flight shuffle storage model with a
server-owned partition store and opaque partition refs.
- Updated the distributed shuffle planning path so Flight uses ref lists
and positional transpose, matching the Ray shuffle contract.
- Refactored the Flight read/write path to produce, transport, and
consume first-class refs end to end.
- Updated the Ray/flotilla bridge and related type surfaces to use
partition_ref_id-based Flight refs instead of cache oriented metadata
- moved FlightPartitionRef to a new crate so it can be used by local and
distributed (a single type passed all the way through)
- Updated the execution engine result, blocking sink, and pipeline
message to stream FlightPartitionRef's
- Simplified several dual codepaths regarding the running a shuffle plan
vs regular plan
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants