refactor: implement generic shuffle interface for flight shuffle#6456
refactor: implement generic shuffle interface for flight shuffle#6456
Conversation
Greptile SummaryThis PR refactors the Flight shuffle integration to fit a generic shuffle model:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Scheduler
participant Worker
participant FlightServer
participant FlightShuffleNode
participant ShuffleReadSource
Scheduler->>Worker: Submit SwordfishTask (ShuffleWrite plan)
Worker->>FlightServer: Write partitions (FlightShuffleWriteSink)
Worker-->>Scheduler: TaskOutput::ShuffleWrite(ShuffleWriteOutput{partitions: [FlightShufflePartitionRef]})
note over FlightShuffleNode: Collect all ShuffleWriteOutput<br/>Build server_cache_mapping
FlightShuffleNode->>Scheduler: Submit SwordfishTask (ShuffleRead plan × N partitions)
Scheduler->>Worker: Run ShuffleRead plan
Worker->>ShuffleReadSource: Instantiate with shuffle_id, partition_idx, server_cache_mapping
ShuffleReadSource->>FlightServer: fetch_partition(shuffle_id, partition_idx, server_cache_mapping)
FlightServer-->>ShuffleReadSource: MicroPartition stream
ShuffleReadSource-->>Worker: Materialized partition
Worker-->>Scheduler: TaskOutput::Materialized(MaterializedOutput)
Reviews (1): Last reviewed commit: "cleanup" | Re-trigger Greptile |
srilman
left a comment
There was a problem hiding this comment.
I saw 2 main things, will do another pass for smaller stuff later. Also, can we run the integration tests, at least the TPCH ones, to confirm that we didn't accidentally break shuffles
| }; | ||
| for partition in output.partitions { | ||
| match partition { | ||
| ShufflePartitionRef::Flight(partition) => { |
There was a problem hiding this comment.
With this interface, each backend implementation of shuffle would need its own operator right, to understand how to pass between the write and read like in this case with flight. Is there some way to genericize it? Such that instead of a FlightShuffle operator, we have a generic Repartition operator like we have (that assumed Ray behind the scenes)?
There was a problem hiding this comment.
good point, I organized it operator a little better to make it easier to add another enum. Also changed the name from FlightShuffleNode -> ShuffleNode
e5ca334
38ab5e6 to
e5ca334
Compare
Thanks for the feedback! Addressed both comments. The integration tpch tests are passing https://github.com/Eventual-Inc/Daft/actions/runs/23503474727 |
fb004e5 to
5a17e5a
Compare
5a17e5a to
1fa1ca7
Compare
#6627) ## Summary This PR moves Flight shuffle to a ref-first model. Instead of relying on shuffle_cache and well-known file locations, the Flight server now owns persisted shuffle data directly and the scheduler exchanges FlightShufflePartitionRef values as first-class retrieval handles. That makes Flight shuffle consistent with the broader direction of passing explicit partition references through the distributed scheduler. ## Background The flight shuffle cache existed previously because we had no way to pass metadata of non-ray operations back to the scheduler. It operated on being able to look up all partition data based only on a cache id and partition index. This model is challenging for other operations, such as into_batches or gather, that don't really rely on the partition semantics. #6456 introduced a way to pass metadata as a first class object to the scheduler. With this, we can pass MicroPartition refs directly back to the scheduler so it can make intelligent scheduling decisions for distributed operators. #6499 added some types to the scheduler, but ended up being the wrong abstraction. This PR reverts part of those changes, specifically around the scheduler passing around TaskOutput types instead of MaterializedResult types. Now it's back to how it was before the change ## Changes Made - Replaced the cache-based Flight shuffle storage model with a server-owned partition store and opaque partition refs. - Updated the distributed shuffle planning path so Flight uses ref lists and positional transpose, matching the Ray shuffle contract. - Refactored the Flight read/write path to produce, transport, and consume first-class refs end to end. - Updated the Ray/flotilla bridge and related type surfaces to use partition_ref_id-based Flight refs instead of cache oriented metadata - moved FlightPartitionRef to a new crate so it can be used by local and distributed (a single type passed all the way through) - Updated the execution engine result, blocking sink, and pipeline message to stream FlightPartitionRef's - Simplified several dual codepaths regarding the running a shuffle plan vs regular plan
Summary
This refactor starts turning shuffle into a generic interface instead of a Flight-specific special case.
The main motivation is to make shuffle and intermediate storage backends pluggable at the plan layer and task-output layer, so Flight is no longer wired through bespoke plan nodes and task input plumbing.
What Changed
At a high level, this PR:
FlightShuffleNodewith genericShuffleNodeFlightShuffleWrite/FlightShuffleReadwith genericShuffleWrite/ShuffleReadlocal plan nodes parameterized by backend enumsTaskOutputenum so tasks can return either:FlightShufflePartitionRefmetadataInput::FlightShufflepath is removedWhy This Matters
The net effect is that Flight shuffle now fits the same conceptual model we want for future backends:
This lays the groundwork for supporting additional shuffle backends without repeating the same bespoke integration pattern.