From 5fc079fdbadbb254c6983a53d5cdd9e37855a553 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 28 May 2026 16:10:22 -0400 Subject: [PATCH] Separate chunker from batcher The chunker was part of the batcher and responsible for transforming input data into the batcher's chunk format. Hence, the batcher needed to be aware of its input types, although it would not otherwise use this information. Drop the `Input` associated type and `push_container` method from the `Batcher` trait; batchers now accept already-chunked input via `PushInto`. The vec `MergeBatcher` loses its `Input` and `C` (chunker) type parameters, and the columnar `MergeBatcher` loses its internal `TrieChunker`. Both now expose `PushInto` that inserts a chunk directly as a chain. Chunking moves into `arrange_core`, which gains a `Chu: ContainerBuilder` type parameter so callers supply a chunker that maps the stream's input container into the batcher's output container. The operator drives the chunker (push, extract, and a `finish` drain before sealing) where the batcher previously did. The `Arrange` trait constrains `Ba::Output = C` and hardcodes `ContainerChunker` internally, so `.arrange::()` callsites for `Vec`-based collections are unchanged. Callers needing a cross-container chunker (columnar layouts, interactive, spill) drop to `arrange_core` directly and pass an explicit `ValChunker`. Signed-off-by: Moritz Hoffmann Co-Authored-By: Claude Opus 4.7 --- .../examples/columnar/main.rs | 9 ++- .../examples/columnar_spill.rs | 19 ++++-- differential-dataflow/examples/spines.rs | 6 +- differential-dataflow/src/collection.rs | 17 +++--- .../src/columnar/arrangement/mod.rs | 2 + differential-dataflow/src/columnar/batcher.rs | 41 +++++-------- differential-dataflow/src/columnar/mod.rs | 2 +- .../src/operators/arrange/arrangement.rs | 33 ++++++++-- .../trace/implementations/merge_batcher.rs | 61 ++++++------------- .../src/trace/implementations/mod.rs | 1 + .../src/trace/implementations/ord_neu.rs | 5 +- differential-dataflow/src/trace/mod.rs | 15 ++--- differential-dataflow/tests/trace.rs | 3 +- interactive/examples/ddir_col.rs | 5 +- 14 files changed, 110 insertions(+), 109 deletions(-) diff --git a/differential-dataflow/examples/columnar/main.rs b/differential-dataflow/examples/columnar/main.rs index 582e5ce98..f26d435b2 100644 --- a/differential-dataflow/examples/columnar/main.rs +++ b/differential-dataflow/examples/columnar/main.rs @@ -125,13 +125,15 @@ mod reachability { let edges_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; let reach_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; - let edges_arr = arrange_core::<_, + let edges_arr = arrange_core::<_, _, + ValChunker<(Node, Node, IterTime, Diff)>, ValBatcher, ValBuilder, ValSpine, >(edges_inner.inner, edges_pact, "Edges"); - let reach_arr = arrange_core::<_, + let reach_arr = arrange_core::<_, _, + ValChunker<(Node, (), IterTime, Diff)>, ValBatcher, ValBuilder, ValSpine, @@ -155,7 +157,8 @@ mod reachability { // Arrange for reduce. let combined_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; - let combined_arr = arrange_core::<_, + let combined_arr = arrange_core::<_, _, + ValChunker<(Node, (), IterTime, Diff)>, ValBatcher, ValBuilder, ValSpine, diff --git a/differential-dataflow/examples/columnar_spill.rs b/differential-dataflow/examples/columnar_spill.rs index 96776121f..5203ed97b 100644 --- a/differential-dataflow/examples/columnar_spill.rs +++ b/differential-dataflow/examples/columnar_spill.rs @@ -63,7 +63,7 @@ fn reset_stats() { use columnar::Push; use columnar::bytes::stash::Stash; -use differential_dataflow::columnar::{RecordedUpdates, ValBuilder, ValColBuilder, ValSpine}; +use differential_dataflow::columnar::{ValBuilder, ValChunker, ValColBuilder, ValSpine}; use differential_dataflow::columnar::batcher::MergeBatcher; use differential_dataflow::columnar::layout::ColumnarUpdate as Update; use differential_dataflow::columnar::spill::{Entry, Fetch, Spill, SpillPolicy}; @@ -75,6 +75,7 @@ use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::probe::{Handle as ProbeHandle, Probe}; use timely::dataflow::operators::Input; use timely::dataflow::InputHandle; +use timely::container::PushInto; use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; @@ -323,7 +324,6 @@ where R: columnar::Columnar + 'static, (K, V, T, R): Update