diff --git a/differential-dataflow/examples/columnar/main.rs b/differential-dataflow/examples/columnar/main.rs index 582e5ce98..f26d435b2 100644 --- a/differential-dataflow/examples/columnar/main.rs +++ b/differential-dataflow/examples/columnar/main.rs @@ -125,13 +125,15 @@ mod reachability { let edges_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; let reach_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; - let edges_arr = arrange_core::<_, + let edges_arr = arrange_core::<_, _, + ValChunker<(Node, Node, IterTime, Diff)>, ValBatcher, ValBuilder, ValSpine, >(edges_inner.inner, edges_pact, "Edges"); - let reach_arr = arrange_core::<_, + let reach_arr = arrange_core::<_, _, + ValChunker<(Node, (), IterTime, Diff)>, ValBatcher, ValBuilder, ValSpine, @@ -155,7 +157,8 @@ mod reachability { // Arrange for reduce. let combined_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; - let combined_arr = arrange_core::<_, + let combined_arr = arrange_core::<_, _, + ValChunker<(Node, (), IterTime, Diff)>, ValBatcher, ValBuilder, ValSpine, diff --git a/differential-dataflow/examples/columnar_spill.rs b/differential-dataflow/examples/columnar_spill.rs index 96776121f..5203ed97b 100644 --- a/differential-dataflow/examples/columnar_spill.rs +++ b/differential-dataflow/examples/columnar_spill.rs @@ -63,7 +63,7 @@ fn reset_stats() { use columnar::Push; use columnar::bytes::stash::Stash; -use differential_dataflow::columnar::{RecordedUpdates, ValBuilder, ValColBuilder, ValSpine}; +use differential_dataflow::columnar::{ValBuilder, ValChunker, ValColBuilder, ValSpine}; use differential_dataflow::columnar::batcher::MergeBatcher; use differential_dataflow::columnar::layout::ColumnarUpdate as Update; use differential_dataflow::columnar::spill::{Entry, Fetch, Spill, SpillPolicy}; @@ -75,6 +75,7 @@ use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::probe::{Handle as ProbeHandle, Probe}; use timely::dataflow::operators::Input; use timely::dataflow::InputHandle; +use timely::container::PushInto; use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; @@ -323,7 +324,6 @@ where R: columnar::Columnar + 'static, (K, V, T, R): Update