From 134800ab37603c70184da9d312da98b61404740e Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Sun, 24 May 2026 21:56:55 -0400 Subject: [PATCH] progress: split PortConnectivity into bitset + sparse map Most operator edges carry the identity (default) summary. Represent those implicitly via a per-input `SmallVec<[u64; 2]>` bitset and reserve the `BTreeMap` for non-default antichains. Inline storage of the bitset covers up to 128 output ports without heap allocation. For the `event_driven` example with 100 dataflows of 1000 maps each, max RSS drops from 528 MB to 434 MB (-17.9%), and dataflow construction time drops ~15%. Profile shows zero hot-path overhead: PortConnectivity methods total 0.07% of samples; runtime loops iterate the unchanged columnar storage. Co-Authored-By: Claude Opus 4.7 (1M context) --- timely/src/dataflow/operators/capability.rs | 21 +- timely/src/progress/operate.rs | 297 ++++++++++++++++++-- timely/src/progress/reachability.rs | 74 ++--- timely/src/progress/subgraph.rs | 7 +- 4 files changed, 324 insertions(+), 75 deletions(-) diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index f7a2b4b82..3b12b3fee 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -250,8 +250,7 @@ impl CapabilityTrait for InputCapability { let summaries_borrow = self.summaries.borrow(); let internal_borrow = self.internal.borrow(); // To be valid, the output buffer must match and the timestamp summary needs to be the default. - Rc::ptr_eq(&internal_borrow[port], query_buffer) && - summaries_borrow.get(port).map_or(false, |path| path.elements() == [Default::default()]) + Rc::ptr_eq(&internal_borrow[port], query_buffer) && summaries_borrow.is_default(port) } } @@ -280,16 +279,18 @@ impl InputCapability { /// This method panics if `self.time` is not less or equal to `new_time`. pub fn delayed(&self, new_time: &T, output_port: usize) -> Capability { use crate::progress::timestamp::PathSummary; - if let Some(path) = self.summaries.borrow().get(output_port) { - if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) { - Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port])) - } else { - panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time()); - } - } - else { + let summaries = self.summaries.borrow(); + if !summaries.contains(output_port) { panic!("Attempted to delay a capability for a disconnected output"); } + let valid = summaries.any_summary(output_port, |summary| { + summary.results_in(self.time()).is_some_and(|time| time.less_equal(new_time)) + }); + if valid { + Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port])) + } else { + panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, summaries, self.time()); + } } /// Transforms to an owned capability for a specific output port. diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 6c3f12955..6e0249266 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -76,50 +76,299 @@ pub enum FrontierInterest { /// Operator internal connectivity, from inputs to outputs. pub type Connectivity = Vec>; + +/// Tagged view of the summary attached to a single output port. +/// +/// `Default` indicates the identity summary (`TS::default()`) is implied for that +/// port, with no element stored. `Specific` borrows the materialized antichain for +/// summaries that contain any non-default element. +#[derive(Debug)] +pub enum PortEntry<'a, TS> { + /// Identity (default) summary, stored implicitly. + Default, + /// A non-default antichain of summaries. + Specific(&'a Antichain), +} + /// Internal connectivity from one port to any number of opposing ports. +/// +/// Default summaries are represented implicitly via a bitset, avoiding per-edge +/// heap allocation in the dominant case. Non-default summaries live in a sparse +/// map. Invariant: a port appears in `defaults` xor `specifics`, never both, and +/// `specifics` never holds an antichain that is exactly a single default element +/// (such antichains are demoted into `defaults`). #[derive(serde::Serialize, serde::Deserialize, columnar::Columnar, Debug, Clone, Eq, PartialEq)] pub struct PortConnectivity { - tree: std::collections::BTreeMap>, + /// Bitset of output ports with default (identity) summary. Bit `p` of + /// `defaults[p / 64]` set iff port `p` has the identity summary. + defaults: smallvec::SmallVec<[u64; 2]>, + /// Non-default antichains, keyed by output port. Disjoint from `defaults`. + specifics: std::collections::BTreeMap>, } impl Default for PortConnectivity { fn default() -> Self { - Self { tree: std::collections::BTreeMap::new() } + Self { + defaults: smallvec::SmallVec::new(), + specifics: std::collections::BTreeMap::new(), + } } } impl PortConnectivity { - /// Inserts an element by reference, ensuring that the index exists. - pub fn insert(&mut self, index: usize, element: TS) -> bool where TS : crate::PartialOrder { - self.tree.entry(index).or_default().insert(element) + fn default_bit(&self, port: usize) -> bool { + let (word, bit) = (port / 64, port % 64); + self.defaults.get(word).is_some_and(|w| (w >> bit) & 1 == 1) } - /// Inserts an element by reference, ensuring that the index exists. - pub fn insert_ref(&mut self, index: usize, element: &TS) -> bool where TS : crate::PartialOrder + Clone { - self.tree.entry(index).or_default().insert_ref(element) + + fn set_default_bit(&mut self, port: usize) { + let (word, bit) = (port / 64, port % 64); + if self.defaults.len() <= word { + self.defaults.resize(word + 1, 0); + } + self.defaults[word] |= 1u64 << bit; } - /// Introduces a summary for `port`. Panics if a summary already exists. - pub fn add_port(&mut self, port: usize, summary: Antichain) { - if !summary.is_empty() { - let prior = self.tree.insert(port, summary); - assert!(prior.is_none()); + + fn clear_default_bit(&mut self, port: usize) { + let (word, bit) = (port / 64, port % 64); + if let Some(w) = self.defaults.get_mut(word) { + *w &= !(1u64 << bit); + } + } + + /// True if `port` has any connection (default or specific summary). + pub fn contains(&self, port: usize) -> bool { + self.default_bit(port) || self.specifics.contains_key(&port) + } + + /// True if `port` has the identity (default) summary, and only the default. + pub fn is_default(&self, port: usize) -> bool { + self.default_bit(port) + } + + /// Returns the non-default antichain at `port`, if any. + pub fn specific(&self, port: usize) -> Option<&Antichain> { + self.specifics.get(&port) + } + + /// Returns the entry for `port`, if any. + pub fn get(&self, port: usize) -> Option> { + if self.default_bit(port) { + Some(PortEntry::Default) + } else { + self.specifics.get(&port).map(PortEntry::Specific) + } + } + + /// Iterates port indices that hold the identity summary, in ascending order. + pub fn iter_defaults(&self) -> impl Iterator + '_ { + self.defaults + .iter() + .enumerate() + .flat_map(|(word_idx, &word)| BitIter { word, base: word_idx * 64 }) + } + + /// Iterates port indices with non-default summaries and their antichains. + pub fn iter_specifics(&self) -> impl Iterator)> { + self.specifics.iter().map(|(p, ac)| (*p, ac)) + } + + /// Iterates `(port, entry)` for all connected ports. Defaults precede specifics. + pub fn iter_ports(&self) -> impl Iterator)> { + self.iter_defaults() + .map(|p| (p, PortEntry::Default)) + .chain(self.iter_specifics().map(|(p, ac)| (p, PortEntry::Specific(ac)))) + } + + /// Iterates `(port, summary)` flattening default-port entries to `TS::default()` + /// and each element of every specific antichain. No sorting guarantee on `port`. + pub fn iter_summaries_owned(&self) -> impl Iterator + '_ + where + TS: Default + Clone, + { + self.iter_defaults() + .map(|p| (p, TS::default())) + .chain(self.iter_specifics().flat_map(|(p, ac)| { + ac.elements().iter().map(move |s| (p, s.clone())) + })) + } + + /// Invokes `pred` for each summary attached to `port`, returning `true` on the + /// first hit. Materializes a `TS::default()` for the default-bit case. + pub fn any_summary(&self, port: usize, mut pred: F) -> bool + where + TS: Default, + F: FnMut(&TS) -> bool, + { + if self.default_bit(port) { + return pred(&TS::default()); + } + if let Some(ac) = self.specifics.get(&port) { + return ac.elements().iter().any(pred); + } + false + } + + /// Restore canonical form at `port`: if `specifics[port]` holds exactly the + /// default summary as a single element, demote it into the `defaults` bitset. + fn canonicalize(&mut self, port: usize) + where + TS: Default + Eq, + { + let demote = self + .specifics + .get(&port) + .is_some_and(|ac| ac.elements().len() == 1 && ac.elements()[0] == TS::default()); + if demote { + self.specifics.remove(&port); + self.set_default_bit(port); + } + } + + /// Inserts `element` into the antichain at `port`, returning `true` if the + /// antichain changed. + pub fn insert(&mut self, port: usize, element: TS) -> bool + where + TS: crate::PartialOrder + Default + Eq + Clone, + { + use std::collections::btree_map::Entry; + + if self.default_bit(port) { + if element == TS::default() { + return false; + } + // Promote the existing default into `specifics` so the mixed + // antichain has both the default and the new element. + self.clear_default_bit(port); + let mut ac = Antichain::new(); + ac.insert(TS::default()); + let prior = self.specifics.insert(port, ac); + debug_assert!(prior.is_none()); } - else { - assert!(self.tree.remove(&port).is_none()); + let changed = match self.specifics.entry(port) { + Entry::Vacant(e) => { + if element == TS::default() { + self.set_default_bit(port); + } else { + let mut ac = Antichain::new(); + ac.insert(element); + e.insert(ac); + } + return true; + } + Entry::Occupied(mut e) => e.get_mut().insert(element), + }; + self.canonicalize(port); + changed + } + + /// As [`insert`], but takes the element by reference. + pub fn insert_ref(&mut self, port: usize, element: &TS) -> bool + where + TS: crate::PartialOrder + Default + Eq + Clone, + { + use std::collections::btree_map::Entry; + + if self.default_bit(port) { + if element == &TS::default() { + return false; + } + self.clear_default_bit(port); + let mut ac = Antichain::new(); + ac.insert(TS::default()); + let prior = self.specifics.insert(port, ac); + debug_assert!(prior.is_none()); } + let changed = match self.specifics.entry(port) { + Entry::Vacant(e) => { + if element == &TS::default() { + self.set_default_bit(port); + } else { + let mut ac = Antichain::new(); + ac.insert_ref(element); + e.insert(ac); + } + return true; + } + Entry::Occupied(mut e) => e.get_mut().insert_ref(element), + }; + self.canonicalize(port); + changed } - /// Borrowing iterator of port identifiers and antichains. - pub fn iter_ports(&self) -> impl Iterator)> { - self.tree.iter().map(|(o,p)| (*o, p)) + + /// Introduces a summary for `port`. Panics if a summary already exists. + pub fn add_port(&mut self, port: usize, summary: Antichain) + where + TS: Default + Eq, + { + if summary.is_empty() { + assert!( + !self.default_bit(port) && !self.specifics.contains_key(&port), + "add_port with empty summary on already-connected port" + ); + return; + } + let is_default_only = + summary.elements().len() == 1 && summary.elements()[0] == TS::default(); + if is_default_only { + assert!( + !self.default_bit(port) && !self.specifics.contains_key(&port), + "add_port called on port that already has a summary" + ); + self.set_default_bit(port); + } else { + assert!( + !self.default_bit(port), + "add_port called on port that already has a default summary" + ); + let prior = self.specifics.insert(port, summary); + assert!(prior.is_none(), "add_port called on port that already has a summary"); + } } - /// Returns the associated path summary, if it exists. - pub fn get(&self, index: usize) -> Option<&Antichain> { - self.tree.get(&index) +} + +impl FromIterator<(usize, Antichain)> for PortConnectivity +where + TS: Default + Eq, +{ + fn from_iter(iter: T) -> Self + where + T: IntoIterator)>, + { + let mut out = Self::default(); + for (port, ac) in iter { + if ac.is_empty() { + continue; + } + let is_default_only = + ac.elements().len() == 1 && ac.elements()[0] == TS::default(); + if is_default_only { + out.set_default_bit(port); + } else { + out.specifics.insert(port, ac); + } + } + out } } -impl FromIterator<(usize, Antichain)> for PortConnectivity { - fn from_iter(iter: T) -> Self where T: IntoIterator)> { - Self { tree: iter.into_iter().filter(|(_,p)| !p.is_empty()).collect() } +/// Iterates over the set bits of a single 64-bit word, yielding bit indices +/// (offset by `base`) in ascending order. +struct BitIter { + word: u64, + base: usize, +} + +impl Iterator for BitIter { + type Item = usize; + fn next(&mut self) -> Option { + if self.word == 0 { + None + } else { + let bit = self.word.trailing_zeros() as usize; + self.word &= self.word - 1; + Some(self.base + bit) + } } } diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 93e37668b..5db8bf8c2 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -83,7 +83,7 @@ use crate::progress::Timestamp; use crate::progress::{Source, Target}; use crate::progress::ChangeBatch; use crate::progress::{Location, Port}; -use crate::progress::operate::{Connectivity, PortConnectivity}; +use crate::progress::operate::{Connectivity, PortConnectivity, PortEntry}; use crate::progress::frontier::MutableAntichain; use crate::progress::timestamp::PathSummary; @@ -302,12 +302,18 @@ impl Builder { for (input, outputs) in summary.iter().enumerate() { let target = Location::new_target(index, input); in_degree.entry(target).or_insert(0); - for (output, summaries) in outputs.iter_ports() { + for (output, entry) in outputs.iter_ports() { let source = Location::new_source(index, output); - for summary in summaries.elements().iter() { - if summary == &Default::default() { - *in_degree.entry(source).or_insert(0) += 1; - } + let default_count = match entry { + PortEntry::Default => 1, + PortEntry::Specific(ac) => ac + .elements() + .iter() + .filter(|s| *s == &Default::default()) + .count(), + }; + if default_count > 0 { + *in_degree.entry(source).or_insert(0) += default_count; } } } @@ -339,15 +345,21 @@ impl Builder { } }, Port::Target(port) => { - for (output, summaries) in self.nodes[node][port].iter_ports() { + for (output, entry) in self.nodes[node][port].iter_ports() { let source = Location::new_source(node, output); - for summary in summaries.elements().iter() { - if summary == &Default::default() { - *in_degree.get_mut(&source).unwrap() -= 1; - if in_degree[&source] == 0 { - in_degree.remove(&source); - worklist.push(source); - } + let default_count = match entry { + PortEntry::Default => 1, + PortEntry::Specific(ac) => ac + .elements() + .iter() + .filter(|s| *s == &Default::default()) + .count(), + }; + for _ in 0..default_count { + *in_degree.get_mut(&source).unwrap() -= 1; + if in_degree[&source] == 0 { + in_degree.remove(&source); + worklist.push(source); } } } @@ -564,11 +576,7 @@ impl Tracker { // Build columnar nodes: Vecs>>. let nodes = build_nested_vecs(builder.nodes.iter().map(|connectivity| { - connectivity.iter().map(|port_conn| { - port_conn.iter_ports().flat_map(|(port, antichain)| { - antichain.elements().iter().map(move |s| (port, s.clone())) - }) - }) + connectivity.iter().map(|port_conn| port_conn.iter_summaries_owned()) })); // Build columnar edges: Vecs>>. @@ -578,18 +586,10 @@ impl Tracker { // Build columnar target and source summaries. let target_summaries = build_nested_vecs(target_sum.iter().map(|ports| { - ports.iter().map(|port_conn| { - port_conn.iter_ports().flat_map(|(port, antichain)| { - antichain.elements().iter().map(move |s| (port, s.clone())) - }) - }) + ports.iter().map(|port_conn| port_conn.iter_summaries_owned()) })); let source_summaries = build_nested_vecs(source_sum.iter().map(|ports| { - ports.iter().map(|port_conn| { - port_conn.iter_ports().flat_map(|(port, antichain)| { - antichain.elements().iter().map(move |s| (port, s.clone())) - }) - }) + ports.iter().map(|port_conn| port_conn.iter_summaries_owned()) })); let scope_outputs = builder.shape[0].0; @@ -803,10 +803,12 @@ fn summarize_outputs( } // A reverse map from operator outputs to inputs, along their internal summaries. - let mut reverse_internal: HashMap<_, Vec<_>> = HashMap::new(); + // Each entry pairs the input port with one summary element drawn from the + // antichain; default-port entries are materialized as `T::Summary::default()`. + let mut reverse_internal: HashMap<_, Vec<(usize, T::Summary)>> = HashMap::new(); for (node, connectivity) in nodes.iter().enumerate() { for (input, outputs) in connectivity.iter().enumerate() { - for (output, summary) in outputs.iter_ports() { + for (output, summary) in outputs.iter_summaries_owned() { reverse_internal .entry(Location::new_source(node, output)) .or_default() @@ -838,13 +840,11 @@ fn summarize_outputs( // We want to crawl up the operator, to its inputs. Port::Source(_output_port) => { if let Some(inputs) = reverse_internal.get(&location) { - for (input_port, operator_summary) in inputs.iter() { + for (input_port, op_summary) in inputs.iter() { let new_location = Location::new_target(location.node, *input_port); - for op_summary in operator_summary.elements().iter() { - if let Some(combined) = op_summary.followed_by(&summary) { - if results.entry(new_location).or_default().insert_ref(output, &combined) { - worklist.push_back((new_location, output, combined)); - } + if let Some(combined) = op_summary.followed_by(&summary) { + if results.entry(new_location).or_default().insert_ref(output, &combined) { + worklist.push_back((new_location, output, combined)); } } } diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 393223f53..19fac1f4e 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -567,10 +567,9 @@ where // of how long `self.scope_summary` is let mut internal_summary = vec![PortConnectivity::default(); self.inputs()]; for (input_idx, input) in self.scope_summary.iter().enumerate() { - for (output_idx, output) in input.iter_ports() { - for outer in output.elements().iter().cloned().map(TInner::summarize) { - internal_summary[input_idx].insert(output_idx, outer); - } + for (output_idx, inner_summary) in input.iter_summaries_owned() { + let outer = TInner::summarize(inner_summary); + internal_summary[input_idx].insert(output_idx, outer); } }