From 542af207cf918f503ac00b5fc5aec704676307af Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 28 May 2026 20:51:40 -0400 Subject: [PATCH] Remove B: Builder from Batcher::seal --- differential-dataflow/examples/columnar_spill.rs | 9 +++------ differential-dataflow/src/columnar/batcher.rs | 7 +++---- .../src/operators/arrange/arrangement.rs | 9 ++++++--- .../src/trace/implementations/merge_batcher.rs | 7 +++---- differential-dataflow/src/trace/mod.rs | 7 +++++-- differential-dataflow/tests/trace.rs | 8 ++++---- 6 files changed, 24 insertions(+), 23 deletions(-) diff --git a/differential-dataflow/examples/columnar_spill.rs b/differential-dataflow/examples/columnar_spill.rs index 5203ed97b..3c87b1d4a 100644 --- a/differential-dataflow/examples/columnar_spill.rs +++ b/differential-dataflow/examples/columnar_spill.rs @@ -70,7 +70,7 @@ use differential_dataflow::columnar::spill::{Entry, Fetch, Spill, SpillPolicy}; use differential_dataflow::columnar::updates::{Updates, UpdatesTyped}; use differential_dataflow::logging::Logger; use differential_dataflow::operators::arrange::arrangement::arrange_core; -use differential_dataflow::trace::{Batcher, Builder}; +use differential_dataflow::trace::{Batcher, Description}; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::probe::{Handle as ProbeHandle, Probe}; use timely::dataflow::operators::Input; @@ -343,11 +343,8 @@ where Self(inner) } - fn seal>( - &mut self, - upper: Antichain, - ) -> B::Output { - self.0.seal::(upper) + fn seal(&mut self, upper: Antichain) -> (Vec, Description) { + self.0.seal(upper) } fn frontier(&mut self) -> AntichainRef<'_, T> { diff --git a/differential-dataflow/src/columnar/batcher.rs b/differential-dataflow/src/columnar/batcher.rs index fff0cfe8e..3a3b63c05 100644 --- a/differential-dataflow/src/columnar/batcher.rs +++ b/differential-dataflow/src/columnar/batcher.rs @@ -13,7 +13,7 @@ use timely::progress::{frontier::Antichain, Timestamp}; use timely::container::PushInto; use crate::logging::Logger; -use crate::trace::{Batcher, Builder, Description}; +use crate::trace::{Batcher, Description}; use super::layout::ColumnarUpdate as Update; use super::updates::UpdatesTyped; @@ -51,7 +51,7 @@ impl> Batcher for MergeBatcher { // in `upper`. All updates must have time greater or equal to the previously used `upper`, // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. - fn seal>(&mut self, upper: Antichain) -> B::Output { + fn seal(&mut self, upper: Antichain) -> (Vec, Description) { // Merge all remaining chains into a single chain. while self.chains.len() > 1 { let list1 = self.chains.pop().unwrap(); @@ -93,9 +93,8 @@ impl> Batcher for MergeBatcher { } let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(U::Time::minimum())); - let seal = B::seal(&mut readied, description); self.lower = upper; - seal + (readied, description) } /// The frontier of elements remaining after the most recent call to `self.seal`. diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 857444eee..06ba1f005 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -457,7 +457,8 @@ where } // Extract updates not in advance of `upper`. - let batch = batcher.seal::(upper.clone()); + let (mut chain, description) = batcher.seal(upper.clone()); + let batch = Bu::seal(&mut chain, description); writer.insert(batch.clone(), Some(capability.time().clone())); @@ -484,8 +485,10 @@ where capabilities = new_capabilities; } else { - // Announce progress updates, even without data. - let _batch = batcher.seal::(frontier.frontier().to_owned()); + // Announce progress updates, even without data. We seal the batcher to + // advance its lower bound and frontier, but discard the readied updates + // rather than building a batch we would immediately drop. + let _ = batcher.seal(frontier.frontier().to_owned()); writer.seal(frontier.frontier().to_owned()); } diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index 6f86f503a..1c7b1ca07 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -13,7 +13,7 @@ use timely::progress::{frontier::Antichain, Timestamp}; use timely::container::PushInto; use crate::logging::{BatcherEvent, Logger}; -use crate::trace::{Batcher, Builder, Description}; +use crate::trace::{Batcher, Description}; /// Creates batches from chunks of sorted, consolidated tuples. pub struct MergeBatcher { @@ -58,7 +58,7 @@ where // in `upper`. All updates must have time greater or equal to the previously used `upper`, // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. - fn seal>(&mut self, upper: Antichain) -> B::Output { + fn seal(&mut self, upper: Antichain) -> (Vec, Description) { // Merge all remaining chains into a single chain. while self.chains.len() > 1 { let list1 = self.chain_pop().unwrap(); @@ -82,9 +82,8 @@ where self.stash.clear(); let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum())); - let seal = B::seal(&mut readied, description); self.lower = upper; - seal + (readied, description) } /// The frontier of elements remaining after the most recent call to `self.seal`. diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 7ab47c98d..6fd1da106 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -309,8 +309,11 @@ pub trait Batcher: PushInto { type Time: Timestamp; /// Allocates a new empty batcher. fn new(logger: Option, operator_id: usize) -> Self; - /// Returns all updates not greater or equal to an element of `upper`. - fn seal>(&mut self, upper: Antichain) -> B::Output; + /// Returns all updates not greater or equal to an element of `upper`, as a sorted and + /// consolidated chain together with the description that bounds them. + /// + /// The returned chain is suitable to hand directly to [`Builder::seal`]. + fn seal(&mut self, upper: Antichain) -> (Vec, Description); /// Returns the lower envelope of contained update times. fn frontier(&mut self) -> AntichainRef<'_, Self::Time>; } diff --git a/differential-dataflow/tests/trace.rs b/differential-dataflow/tests/trace.rs index 75911e5bc..d30e8ef2d 100644 --- a/differential-dataflow/tests/trace.rs +++ b/differential-dataflow/tests/trace.rs @@ -3,7 +3,7 @@ use timely::dataflow::operators::generic::OperatorInfo; use timely::progress::{Antichain, frontier::AntichainRef}; use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine}; -use differential_dataflow::trace::{Trace, TraceReader, Batcher}; +use differential_dataflow::trace::{Trace, TraceReader, Batcher, Builder}; use differential_dataflow::trace::cursor::Cursor; type IntegerTrace = ValSpine; @@ -22,9 +22,9 @@ fn get_trace() -> ValSpine { ]); let batch_ts = &[1, 2, 3]; - let batches = batch_ts.iter().map(move |i| batcher.seal::(Antichain::from_elem(*i))); - for b in batches { - trace.insert(b); + for i in batch_ts { + let (mut chain, description) = batcher.seal(Antichain::from_elem(*i)); + trace.insert(IntegerBuilder::seal(&mut chain, description)); } } trace