Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,7 @@ impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
let summaries_borrow = self.summaries.borrow();
let internal_borrow = self.internal.borrow();
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
Rc::ptr_eq(&internal_borrow[port], query_buffer) &&
summaries_borrow.get(port).map_or(false, |path| path.elements() == [Default::default()])
Rc::ptr_eq(&internal_borrow[port], query_buffer) && summaries_borrow.is_default(port)
}
}

Expand Down Expand Up @@ -280,16 +279,18 @@ impl<T: Timestamp> InputCapability<T> {
/// This method panics if `self.time` is not less or equal to `new_time`.
pub fn delayed(&self, new_time: &T, output_port: usize) -> Capability<T> {
use crate::progress::timestamp::PathSummary;
if let Some(path) = self.summaries.borrow().get(output_port) {
if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
} else {
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time());
}
}
else {
let summaries = self.summaries.borrow();
if !summaries.contains(output_port) {
panic!("Attempted to delay a capability for a disconnected output");
}
let valid = summaries.any_summary(output_port, |summary| {
summary.results_in(self.time()).is_some_and(|time| time.less_equal(new_time))
});
if valid {
Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
} else {
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, summaries, self.time());
}
}

/// Transforms to an owned capability for a specific output port.
Expand Down
297 changes: 273 additions & 24 deletions timely/src/progress/operate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,50 +76,299 @@ pub enum FrontierInterest {

/// Operator internal connectivity, from inputs to outputs.
pub type Connectivity<TS> = Vec<PortConnectivity<TS>>;

/// Tagged view of the summary attached to a single output port.
///
/// `Default` indicates the identity summary (`TS::default()`) is implied for that
/// port, with no element stored. `Specific` borrows the materialized antichain for
/// summaries that contain any non-default element.
#[derive(Debug)]
pub enum PortEntry<'a, TS> {
/// Identity (default) summary, stored implicitly.
Default,
/// A non-default antichain of summaries.
Specific(&'a Antichain<TS>),
}

/// Internal connectivity from one port to any number of opposing ports.
///
/// Default summaries are represented implicitly via a bitset, avoiding per-edge
/// heap allocation in the dominant case. Non-default summaries live in a sparse
/// map. Invariant: a port appears in `defaults` xor `specifics`, never both, and
/// `specifics` never holds an antichain that is exactly a single default element
/// (such antichains are demoted into `defaults`).
#[derive(serde::Serialize, serde::Deserialize, columnar::Columnar, Debug, Clone, Eq, PartialEq)]
pub struct PortConnectivity<TS> {
tree: std::collections::BTreeMap<usize, Antichain<TS>>,
/// Bitset of output ports with default (identity) summary. Bit `p` of
/// `defaults[p / 64]` set iff port `p` has the identity summary.
defaults: smallvec::SmallVec<[u64; 2]>,
/// Non-default antichains, keyed by output port. Disjoint from `defaults`.
specifics: std::collections::BTreeMap<usize, Antichain<TS>>,
}

impl<TS> Default for PortConnectivity<TS> {
fn default() -> Self {
Self { tree: std::collections::BTreeMap::new() }
Self {
defaults: smallvec::SmallVec::new(),
specifics: std::collections::BTreeMap::new(),
}
}
}

impl<TS> PortConnectivity<TS> {
/// Inserts an element by reference, ensuring that the index exists.
pub fn insert(&mut self, index: usize, element: TS) -> bool where TS : crate::PartialOrder {
self.tree.entry(index).or_default().insert(element)
fn default_bit(&self, port: usize) -> bool {
let (word, bit) = (port / 64, port % 64);
self.defaults.get(word).is_some_and(|w| (w >> bit) & 1 == 1)
}
/// Inserts an element by reference, ensuring that the index exists.
pub fn insert_ref(&mut self, index: usize, element: &TS) -> bool where TS : crate::PartialOrder + Clone {
self.tree.entry(index).or_default().insert_ref(element)

fn set_default_bit(&mut self, port: usize) {
let (word, bit) = (port / 64, port % 64);
if self.defaults.len() <= word {
self.defaults.resize(word + 1, 0);
}
self.defaults[word] |= 1u64 << bit;
}
/// Introduces a summary for `port`. Panics if a summary already exists.
pub fn add_port(&mut self, port: usize, summary: Antichain<TS>) {
if !summary.is_empty() {
let prior = self.tree.insert(port, summary);
assert!(prior.is_none());

fn clear_default_bit(&mut self, port: usize) {
let (word, bit) = (port / 64, port % 64);
if let Some(w) = self.defaults.get_mut(word) {
*w &= !(1u64 << bit);
}
}

/// True if `port` has any connection (default or specific summary).
pub fn contains(&self, port: usize) -> bool {
self.default_bit(port) || self.specifics.contains_key(&port)
}

/// True if `port` has the identity (default) summary, and only the default.
pub fn is_default(&self, port: usize) -> bool {
self.default_bit(port)
}

/// Returns the non-default antichain at `port`, if any.
pub fn specific(&self, port: usize) -> Option<&Antichain<TS>> {
self.specifics.get(&port)
}

/// Returns the entry for `port`, if any.
pub fn get(&self, port: usize) -> Option<PortEntry<'_, TS>> {
if self.default_bit(port) {
Some(PortEntry::Default)
} else {
self.specifics.get(&port).map(PortEntry::Specific)
}
}

/// Iterates port indices that hold the identity summary, in ascending order.
pub fn iter_defaults(&self) -> impl Iterator<Item = usize> + '_ {
self.defaults
.iter()
.enumerate()
.flat_map(|(word_idx, &word)| BitIter { word, base: word_idx * 64 })
}

/// Iterates port indices with non-default summaries and their antichains.
pub fn iter_specifics(&self) -> impl Iterator<Item = (usize, &Antichain<TS>)> {
self.specifics.iter().map(|(p, ac)| (*p, ac))
}

/// Iterates `(port, entry)` for all connected ports. Defaults precede specifics.
pub fn iter_ports(&self) -> impl Iterator<Item = (usize, PortEntry<'_, TS>)> {
self.iter_defaults()
.map(|p| (p, PortEntry::Default))
.chain(self.iter_specifics().map(|(p, ac)| (p, PortEntry::Specific(ac))))
}

/// Iterates `(port, summary)` flattening default-port entries to `TS::default()`
/// and each element of every specific antichain. No sorting guarantee on `port`.
pub fn iter_summaries_owned(&self) -> impl Iterator<Item = (usize, TS)> + '_
where
TS: Default + Clone,
{
self.iter_defaults()
.map(|p| (p, TS::default()))
.chain(self.iter_specifics().flat_map(|(p, ac)| {
ac.elements().iter().map(move |s| (p, s.clone()))
}))
}

/// Invokes `pred` for each summary attached to `port`, returning `true` on the
/// first hit. Materializes a `TS::default()` for the default-bit case.
pub fn any_summary<F>(&self, port: usize, mut pred: F) -> bool
where
TS: Default,
F: FnMut(&TS) -> bool,
{
if self.default_bit(port) {
return pred(&TS::default());
}
if let Some(ac) = self.specifics.get(&port) {
return ac.elements().iter().any(pred);
}
false
}

/// Restore canonical form at `port`: if `specifics[port]` holds exactly the
/// default summary as a single element, demote it into the `defaults` bitset.
fn canonicalize(&mut self, port: usize)
where
TS: Default + Eq,
{
let demote = self
.specifics
.get(&port)
.is_some_and(|ac| ac.elements().len() == 1 && ac.elements()[0] == TS::default());
if demote {
self.specifics.remove(&port);
self.set_default_bit(port);
}
}

/// Inserts `element` into the antichain at `port`, returning `true` if the
/// antichain changed.
pub fn insert(&mut self, port: usize, element: TS) -> bool
where
TS: crate::PartialOrder + Default + Eq + Clone,
{
use std::collections::btree_map::Entry;

if self.default_bit(port) {
if element == TS::default() {
return false;
}
// Promote the existing default into `specifics` so the mixed
// antichain has both the default and the new element.
self.clear_default_bit(port);
let mut ac = Antichain::new();
ac.insert(TS::default());
let prior = self.specifics.insert(port, ac);
debug_assert!(prior.is_none());
}
else {
assert!(self.tree.remove(&port).is_none());
let changed = match self.specifics.entry(port) {
Entry::Vacant(e) => {
if element == TS::default() {
self.set_default_bit(port);
} else {
let mut ac = Antichain::new();
ac.insert(element);
e.insert(ac);
}
return true;
}
Entry::Occupied(mut e) => e.get_mut().insert(element),
};
self.canonicalize(port);
changed
}

/// As [`insert`], but takes the element by reference.
pub fn insert_ref(&mut self, port: usize, element: &TS) -> bool
where
TS: crate::PartialOrder + Default + Eq + Clone,
{
use std::collections::btree_map::Entry;

if self.default_bit(port) {
if element == &TS::default() {
return false;
}
self.clear_default_bit(port);
let mut ac = Antichain::new();
ac.insert(TS::default());
let prior = self.specifics.insert(port, ac);
debug_assert!(prior.is_none());
}
let changed = match self.specifics.entry(port) {
Entry::Vacant(e) => {
if element == &TS::default() {
self.set_default_bit(port);
} else {
let mut ac = Antichain::new();
ac.insert_ref(element);
e.insert(ac);
}
return true;
}
Entry::Occupied(mut e) => e.get_mut().insert_ref(element),
};
self.canonicalize(port);
changed
}
/// Borrowing iterator of port identifiers and antichains.
pub fn iter_ports(&self) -> impl Iterator<Item = (usize, &Antichain<TS>)> {
self.tree.iter().map(|(o,p)| (*o, p))

/// Introduces a summary for `port`. Panics if a summary already exists.
pub fn add_port(&mut self, port: usize, summary: Antichain<TS>)
where
TS: Default + Eq,
{
if summary.is_empty() {
assert!(
!self.default_bit(port) && !self.specifics.contains_key(&port),
"add_port with empty summary on already-connected port"
);
return;
}
let is_default_only =
summary.elements().len() == 1 && summary.elements()[0] == TS::default();
if is_default_only {
assert!(
!self.default_bit(port) && !self.specifics.contains_key(&port),
"add_port called on port that already has a summary"
);
self.set_default_bit(port);
} else {
assert!(
!self.default_bit(port),
"add_port called on port that already has a default summary"
);
let prior = self.specifics.insert(port, summary);
assert!(prior.is_none(), "add_port called on port that already has a summary");
}
}
/// Returns the associated path summary, if it exists.
pub fn get(&self, index: usize) -> Option<&Antichain<TS>> {
self.tree.get(&index)
}

impl<TS> FromIterator<(usize, Antichain<TS>)> for PortConnectivity<TS>
where
TS: Default + Eq,
{
fn from_iter<T>(iter: T) -> Self
where
T: IntoIterator<Item = (usize, Antichain<TS>)>,
{
let mut out = Self::default();
for (port, ac) in iter {
if ac.is_empty() {
continue;
}
let is_default_only =
ac.elements().len() == 1 && ac.elements()[0] == TS::default();
if is_default_only {
out.set_default_bit(port);
} else {
out.specifics.insert(port, ac);
}
}
out
}
}

impl<TS> FromIterator<(usize, Antichain<TS>)> for PortConnectivity<TS> {
fn from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = (usize, Antichain<TS>)> {
Self { tree: iter.into_iter().filter(|(_,p)| !p.is_empty()).collect() }
/// Iterates over the set bits of a single 64-bit word, yielding bit indices
/// (offset by `base`) in ascending order.
struct BitIter {
word: u64,
base: usize,
}

impl Iterator for BitIter {
type Item = usize;
fn next(&mut self) -> Option<usize> {
if self.word == 0 {
None
} else {
let bit = self.word.trailing_zeros() as usize;
self.word &= self.word - 1;
Some(self.base + bit)
}
}
}

Expand Down
Loading
Loading