Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions differential-dataflow/examples/columnar/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,15 @@ mod reachability {
let edges_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 };
let reach_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 };

let edges_arr = arrange_core::<_,
let edges_arr = arrange_core::<_, _,
ValChunker<(Node, Node, IterTime, Diff)>,
ValBatcher<Node, Node, IterTime, Diff>,
ValBuilder<Node, Node, IterTime, Diff>,
ValSpine<Node, Node, IterTime, Diff>,
>(edges_inner.inner, edges_pact, "Edges");

let reach_arr = arrange_core::<_,
let reach_arr = arrange_core::<_, _,
ValChunker<(Node, (), IterTime, Diff)>,
ValBatcher<Node, (), IterTime, Diff>,
ValBuilder<Node, (), IterTime, Diff>,
ValSpine<Node, (), IterTime, Diff>,
Expand All @@ -155,7 +157,8 @@ mod reachability {

// Arrange for reduce.
let combined_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 };
let combined_arr = arrange_core::<_,
let combined_arr = arrange_core::<_, _,
ValChunker<(Node, (), IterTime, Diff)>,
ValBatcher<Node, (), IterTime, Diff>,
ValBuilder<Node, (), IterTime, Diff>,
ValSpine<Node, (), IterTime, Diff>,
Expand Down
19 changes: 13 additions & 6 deletions differential-dataflow/examples/columnar_spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ fn reset_stats() {
use columnar::Push;
use columnar::bytes::stash::Stash;

use differential_dataflow::columnar::{RecordedUpdates, ValBuilder, ValColBuilder, ValSpine};
use differential_dataflow::columnar::{ValBuilder, ValChunker, ValColBuilder, ValSpine};
use differential_dataflow::columnar::batcher::MergeBatcher;
use differential_dataflow::columnar::layout::ColumnarUpdate as Update;
use differential_dataflow::columnar::spill::{Entry, Fetch, Spill, SpillPolicy};
Expand All @@ -75,6 +75,7 @@ use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::probe::{Handle as ProbeHandle, Probe};
use timely::dataflow::operators::Input;
use timely::dataflow::InputHandle;
use timely::container::PushInto;
use timely::progress::frontier::AntichainRef;
use timely::progress::{frontier::Antichain, Timestamp};

Expand Down Expand Up @@ -323,7 +324,6 @@ where
R: columnar::Columnar + 'static,
(K, V, T, R): Update<Time = T> + 'static,
{
type Input = RecordedUpdates<(K, V, T, R)>;
type Time = T;
type Output = UpdatesTyped<(K, V, T, R)>;

Expand All @@ -343,10 +343,6 @@ where
Self(inner)
}

fn push_container(&mut self, container: &mut Self::Input) {
self.0.push_container(container);
}

fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
&mut self,
upper: Antichain<T>,
Expand All @@ -359,6 +355,15 @@ where
}
}

impl<K, V, T, R> PushInto<UpdatesTyped<(K, V, T, R)>> for SpillBatcher<K, V, T, R>
where
(K, V, T, R): Update,
{
fn push_into(&mut self, chunk: UpdatesTyped<(K, V, T, R)>) {
self.0.push_into(chunk);
}
}

type TestUpdate = (u64, u64, u64, i64);

fn make_chunk(updates: &[(u64, u64, u64, i64)]) -> UpdatesTyped<TestUpdate> {
Expand Down Expand Up @@ -537,6 +542,8 @@ fn run_timely_dataflow(
let stream = scope.input_from(&mut input);
let arranged = arrange_core::<
_,
_,
ValChunker<(u64, u64, u64, i64)>,
SpillBatcher<u64, u64, u64, i64>,
ValBuilder<u64, u64, u64, i64>,
ValSpine<u64, u64, u64, i64>,
Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ fn main() {
},
"col" => {
use timely::dataflow::operators::Input as _;
use differential_dataflow::columnar::{ValBatcher, ValBuilder, ValPact, ValSpine};
use differential_dataflow::columnar::{ValBatcher, ValBuilder, ValChunker, ValPact, ValSpine};
use differential_dataflow::operators::arrange::arrangement::arrange_core;

fn string_hash(s: columnar::Ref<'_, String>) -> u64 {
Expand All @@ -124,10 +124,10 @@ fn main() {
let data_stream = scope.input_from(&mut data_input);
let keys_stream = scope.input_from(&mut keys_input);

let data = arrange_core::<_, ValBatcher<String,(),u64,i64>, ValBuilder<String,(),u64,i64>, ValSpine<String,(),u64,i64>>(
let data = arrange_core::<_, _, ValChunker<(String,(),u64,i64)>, ValBatcher<String,(),u64,i64>, ValBuilder<String,(),u64,i64>, ValSpine<String,(),u64,i64>>(
data_stream, ValPact { hashfunc: |k: columnar::Ref<'_, String>| string_hash(k) }, "DataArrange",
);
let keys = arrange_core::<_, ValBatcher<String,(),u64,i64>, ValBuilder<String,(),u64,i64>, ValSpine<String,(),u64,i64>>(
let keys = arrange_core::<_, _, ValChunker<(String,(),u64,i64)>, ValBatcher<String,(),u64,i64>, ValBuilder<String,(),u64,i64>, ValSpine<String,(),u64,i64>>(
keys_stream, ValPact { hashfunc: |k: columnar::Ref<'_, String>| string_hash(k) }, "KeysArrange",
);
keys.join_core(data, |_k, (), ()| Option::<()>::None)
Expand Down
17 changes: 9 additions & 8 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,9 +961,9 @@ pub mod vec {
/// and provide the function `reify` to produce owned keys and values..
pub fn consolidate_named<Ba, Bu, Tr, F>(self, name: &str, reify: F) -> Self
where
Ba: crate::trace::Batcher<Input=Vec<((D,()),T,R)>, Time=T> + 'static,
Ba: crate::trace::Batcher<Output=Vec<((D, ()), T, R)>, Time=T> + 'static,
Tr: for<'a> crate::trace::Trace<Time=T,Diff=R>+'static,
Bu: crate::trace::Builder<Time=Tr::Time, Input=Ba::Output, Output=Tr::Batch>,
Bu: crate::trace::Builder<Time=Tr::Time, Input=Vec<((D, ()), T, R)>, Output=Tr::Batch>,
F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
{
use crate::operators::arrange::arrangement::Arrange;
Expand Down Expand Up @@ -1018,6 +1018,7 @@ pub mod vec {

use crate::trace::implementations::{ValSpine, ValBatcher, ValBuilder};
use crate::trace::implementations::{KeySpine, KeyBatcher, KeyBuilder};
use crate::trace::implementations::ContainerChunker;
use crate::operators::arrange::Arrange;

impl<'scope, T, K, V, R> Arrange<'scope, T, Vec<((K, V), T, R)>> for Collection<'scope, T, (K, V), R>
Expand All @@ -1029,12 +1030,12 @@ pub mod vec {
{
fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
where
Ba: crate::trace::Batcher<Input=Vec<((K, V), T, R)>, Time=T> + 'static,
Bu: crate::trace::Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
Ba: crate::trace::Batcher<Output=Vec<((K, V), T, R)>, Time=T> + 'static,
Bu: crate::trace::Builder<Time=T, Input=Vec<((K, V), T, R)>, Output = Tr::Batch>,
Tr: crate::trace::Trace<Time=T> + 'static,
{
let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),T,R)| (update.0).0.hashed().into());
crate::operators::arrange::arrangement::arrange_core::<_, Ba, Bu, _>(self.inner, exchange, name)
crate::operators::arrange::arrangement::arrange_core::<_, _, ContainerChunker<Vec<((K, V), T, R)>>, Ba, Bu, _>(self.inner, exchange, name)
}
}

Expand All @@ -1044,12 +1045,12 @@ pub mod vec {
{
fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
where
Ba: crate::trace::Batcher<Input=Vec<((K,()),T,R)>, Time=T> + 'static,
Bu: crate::trace::Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
Ba: crate::trace::Batcher<Output=Vec<((K, ()), T, R)>, Time=T> + 'static,
Bu: crate::trace::Builder<Time=T, Input=Vec<((K, ()), T, R)>, Output = Tr::Batch>,
Tr: crate::trace::Trace<Time=T> + 'static,
{
let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),T,R)| (update.0).0.hashed().into());
crate::operators::arrange::arrangement::arrange_core::<_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name)
crate::operators::arrange::arrangement::arrange_core::<_, _, ContainerChunker<Vec<((K, ()), T, R)>>, Ba, Bu, _>(self.map(|k| (k, ())).inner, exchange, name)
}
}

Expand Down
2 changes: 2 additions & 0 deletions differential-dataflow/src/columnar/arrangement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub mod trie_merger;
pub type ValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<ColumnarLayout<(K,V,T,R)>>>>;
/// A batcher for columnar storage.
pub type ValBatcher<K, V, T, R> = super::batcher::MergeBatcher<(K,V,T,R)>;
/// A chunker that maps `RecordedUpdates<U>` streams into the batcher's `UpdatesTyped<U>` chunks.
pub type ValChunker<U> = TrieChunker<U>;
/// A builder for columnar storage.
pub type ValBuilder<K, V, T, R> = RcBuilder<builder::ValMirror<(K,V,T,R)>>;

Expand Down
41 changes: 15 additions & 26 deletions differential-dataflow/src/columnar/batcher.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
//! A `Batcher` for `RecordedUpdates<U>` streams that consolidates input via
//! `TrieChunker` and merges sorted chains via the free functions in `trie_merger`.
//! A `Batcher` for columnar streams that merges sorted chains via the free
//! functions in `trie_merger`.
//!
//! Callers feed already-chunked, sorted-and-consolidated `UpdatesTyped<U>` into
//! the batcher via [`PushInto`]; forming such chunks from `RecordedUpdates<U>`
//! is the responsibility of the surrounding dataflow operator's chunker
//! (`TrieChunker`).

use std::collections::VecDeque;

use timely::progress::frontier::AntichainRef;
use timely::progress::{frontier::Antichain, Timestamp};
use timely::container::{ContainerBuilder, PushInto};
use timely::container::PushInto;

use crate::logging::Logger;
use crate::trace::{Batcher, Builder, Description};

use super::layout::ColumnarUpdate as Update;
use super::updates::UpdatesTyped;
use super::RecordedUpdates;
use super::arrangement::TrieChunker;
use super::arrangement::trie_merger;
use super::spill::{Entry, SpillPolicy};

/// Creates batches from `RecordedUpdates<U>` streams.
/// Creates batches from chunks of sorted, consolidated columnar updates.
pub struct MergeBatcher<U: Update> {
/// Transforms input streams to chunks of sorted, consolidated data.
chunker: TrieChunker<U>,
/// A sequence of power-of-two length chains of sorted, consolidated entries.
/// Each entry is either an in-memory chunk or a handle to a paged-out chunk.
chains: Vec<VecDeque<Entry<UpdatesTyped<U>>>>,
Expand All @@ -34,41 +35,23 @@ pub struct MergeBatcher<U: Update> {
}

impl<U: Update<Time: Timestamp>> Batcher for MergeBatcher<U> {
type Input = RecordedUpdates<U>;
type Time = U::Time;
type Output = UpdatesTyped<U>;

fn new(_logger: Option<Logger>, _operator_id: usize) -> Self {
Self {
chunker: TrieChunker::default(),
chains: Vec::new(),
frontier: Antichain::new(),
lower: Antichain::from_elem(U::Time::minimum()),
policy: None,
}
}

/// Push a container of data into this merge batcher. Updates the internal chain structure if
/// needed.
fn push_container(&mut self, container: &mut RecordedUpdates<U>) {
self.chunker.push_into(container);
while let Some(chunk) = self.chunker.extract() {
let chunk = std::mem::take(chunk);
self.insert_chain(VecDeque::from([Entry::Typed(chunk)]));
}
}

// Sealing a batch means finding those updates with times not greater or equal to any time
// in `upper`. All updates must have time greater or equal to the previously used `upper`,
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<U::Time>) -> B::Output {
// Finish
while let Some(chunk) = self.chunker.finish() {
let chunk = std::mem::take(chunk);
self.insert_chain(VecDeque::from([Entry::Typed(chunk)]));
}

// Merge all remaining chains into a single chain.
while self.chains.len() > 1 {
let list1 = self.chains.pop().unwrap();
Expand Down Expand Up @@ -122,6 +105,12 @@ impl<U: Update<Time: Timestamp>> Batcher for MergeBatcher<U> {
}
}

impl<U: Update> PushInto<UpdatesTyped<U>> for MergeBatcher<U> {
fn push_into(&mut self, chunk: UpdatesTyped<U>) {
self.insert_chain(VecDeque::from([Entry::Typed(chunk)]));
}
}

impl<U: Update> MergeBatcher<U> {
/// Install a spill policy. Consulted after each chain insert.
pub fn set_spill_policy(&mut self, policy: Box<dyn SpillPolicy<UpdatesTyped<U>>>) {
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/src/columnar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub mod spill;
pub use updates::UpdatesTyped;
pub use builder::ValBuilder as ValColBuilder;
pub use exchange::ValPact;
pub use arrangement::{ValBatcher, ValBuilder, ValSpine};
pub use arrangement::{ValBatcher, ValBuilder, ValChunker, ValSpine};

/// Target size for update batches, in number of updates.
pub const LINK_TARGET: usize = 64 * 1024;
Expand Down
33 changes: 27 additions & 6 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline};
use timely::progress::Timestamp;
use timely::progress::Antichain;
use timely::container::{ContainerBuilder, PushInto};
use timely::dataflow::operators::Capability;

use crate::{Data, VecCollection, AsCollection};
Expand Down Expand Up @@ -195,7 +196,7 @@
while let Some(key) = cursor.get_key(batch) {
while let Some(val) = cursor.get_val(batch) {
for datum in logic(key, val) {
cursor.map_times(batch, |time, diff| {

Check warning on line 199 in differential-dataflow/src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`time` shadows a previous, unrelated binding
session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
});
}
Expand Down Expand Up @@ -303,19 +304,25 @@
/// A type that can be arranged as if a collection of updates.
pub trait Arrange<'scope, T: Timestamp+Lattice, C> : Sized {
/// Arranges updates into a shared trace.
///
/// The batcher's output container must equal the stream container `C`; the default
/// chunker only consolidates same-type containers. For chunker setups that convert
/// between container types (e.g. columnar layouts), call [`arrange_core`] directly.
fn arrange<Ba, Bu, Tr>(self) -> Arranged<'scope, TraceAgent<Tr>>
where
Ba: Batcher<Input=C, Time=T> + 'static,
Ba: Batcher<Output=C, Time=T> + 'static,
Bu: Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time=T> + 'static,
{
self.arrange_named::<Ba, Bu, Tr>("Arrange")
}

/// Arranges updates into a shared trace, with a supplied name.
///
/// See [`Arrange::arrange`] for constraints on the batcher's output container.
fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
where
Ba: Batcher<Input=C, Time=T> + 'static,
Ba: Batcher<Output=C, Time=T> + 'static,
Bu: Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time=T> + 'static,
;
Expand All @@ -326,10 +333,12 @@
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// It uses the supplied parallelization contract to distribute the data, which does not need to
/// be consistently by key (though this is the most common).
pub fn arrange_core<'scope, P, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, Ba::Input>, pact: P, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
pub fn arrange_core<'scope, P, C, Chu, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, C>, pact: P, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
where
P: ParallelizationContract<Tr::Time, Ba::Input>,
Ba: Batcher<Time=Tr::Time,Input: Container> + 'static,
C: Container + Clone + 'static,
P: ParallelizationContract<Tr::Time, C>,
Chu: ContainerBuilder<Container=Ba::Output> + for<'a> PushInto<&'a mut C> + 'static,
Ba: Batcher<Time=Tr::Time> + 'static,
Bu: Builder<Time=Tr::Time, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace+'static,
{
Expand Down Expand Up @@ -379,6 +388,8 @@
// Initialize to the minimal input frontier.
let mut prev_frontier = Antichain::from_elem(Tr::Time::minimum());

let mut chunker = Chu::default();

move |(input, frontier), output| {

// As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
Expand All @@ -387,7 +398,10 @@

input.for_each(|cap, data| {
capabilities.insert(cap.retain(0));
batcher.push_container(data);
chunker.push_into(data);
while let Some(chunk) = chunker.extract() {
batcher.push_into(std::mem::take(chunk));
}
});

// The frontier may have advanced by multiple elements, which is an issue because
Expand All @@ -402,6 +416,13 @@
// frontier isn't equal to the previous. It is only in this case that we have any
// data processing to do.
if prev_frontier.borrow() != frontier.frontier() {
// Flush any data the chunker is still accumulating into the batcher before we
// seal. The batcher only sees chunks the chunker has emitted; without this drain
// a partial final chunk would never reach the batcher.
while let Some(chunk) = chunker.finish() {
batcher.push_into(std::mem::take(chunk));
}

// There are two cases to handle with some care:
//
// 1. If any held capabilities are not in advance of the new input frontier,
Expand Down
Loading
Loading