Skip to content

Commit

Permalink
wip implementation of cross join via broadcasting batches
Browse files Browse the repository at this point in the history
Signed-off-by: Petros Angelatos <[email protected]>
  • Loading branch information
petrosagg committed Mar 11, 2022
1 parent f862b65 commit 9ba2363
Show file tree
Hide file tree
Showing 3 changed files with 282 additions and 1 deletion.
116 changes: 116 additions & 0 deletions examples/broadcast_cross_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
extern crate timely;
extern crate differential_dataflow;

use std::ops::Mul;

use timely::Data;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::broadcast::Broadcast;
use timely::dataflow::operators::Operator;
use timely::dataflow::{Scope, Stream};
use timely::order::TotalOrder;
use differential_dataflow::{Collection, AsCollection};
use differential_dataflow::difference::Semigroup;
use differential_dataflow::input::Input;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::{Arrange, Arranged};
use differential_dataflow::trace::{Cursor, BatchReader, TraceReader};
use differential_dataflow::trace::implementations::ord::OrdKeySpineAbomArc;

// This function is supposed to do one half of a cross join but its implementation is currently
// incorrect
// TODO: actually implement a half cross join
fn half_cross_join<G, Tr1, Key2, R2, Batch2>(
left: Arranged<G, Tr1>,
right: &Stream<G, Batch2>,
) -> Collection<G, (Tr1::Key, Key2), <R2 as Mul<Tr1::R>>::Output>
where
G: Scope,
G::Timestamp: Lattice + TotalOrder + Ord,
Tr1: TraceReader<Time = G::Timestamp> + Clone + 'static,
Tr1::Key: Clone,
Tr1::R: Clone,
Batch2: BatchReader<Key2, (), G::Timestamp, R2> + Data,
Key2: Clone + 'static,
R2: Semigroup + Clone + Mul<Tr1::R>,
<R2 as Mul<Tr1::R>>::Output: Semigroup,
{
let mut trace = left.trace;
right.unary(Pipeline, "CrossJoin", move |_cap, _info| {
let mut vector = Vec::new();
move |input, output| {
while let Some((time, data)) = input.next() {
data.swap(&mut vector);
for batch in vector.drain(..) {
let mut cursor = batch.cursor();
while let Some(key1) = cursor.get_key(&batch) {
let (mut trace_cursor, trace_storage) = trace.cursor();
cursor.map_times(&batch, |time1, diff1| {
while let Some(key2) = trace_cursor.get_key(&trace_storage) {
trace_cursor.map_times(&trace_storage, |time2, diff2| {
let effect_time = std::cmp::max(time1.clone(), time2.clone());
let cap_time = time.delayed(&effect_time);
let diff = diff1.clone().mul(diff2.clone());
let mut session = output.session(&cap_time);
session.give((((key2.clone(), key1.clone())), effect_time, diff));
});
trace_cursor.step_key(&trace_storage);
}
});
cursor.step_key(&batch);
}
}
}
}
})
.as_collection()
}

fn main() {
timely::execute_from_args(::std::env::args(), move |worker| {
let worker_idx = worker.index();
let (mut handle1, mut handle2, probe) = worker.dataflow::<u64, _, _>(|scope| {
let (handle1, input1) = scope.new_collection();
let (handle2, input2) = scope.new_collection();

let arranged1 = input1.arrange::<OrdKeySpineAbomArc<_, _, _>>();
let arranged2 = input2.arrange::<OrdKeySpineAbomArc<_, _, _>>();

let batches1 = arranged1.stream.broadcast();
let batches2 = arranged2.stream.broadcast();

// Changes from input1 need to be joined with the per-worker arrangement state of input2
let cross1 = half_cross_join(arranged2, &batches1);

// Changes from input2 need to be joined with the per-worker arrangement state of input1
let cross2 = half_cross_join(arranged1, &batches2);

// The final cross join is the combination of these two
let cross_join = cross1.map(|(key1, key2)| (key2, key1)).concat(&cross2);

let probe = cross_join
.inspect(move |d| {
println!("worker {} produced: {:?}", worker_idx, d);
})
.probe();

(handle1, handle2, probe)
});

handle1.insert(1i64);
handle1.advance_to(1);
handle1.insert(2);
handle1.advance_to(2);
handle1.flush();

handle2.insert("apple".to_string());
handle2.advance_to(1);
handle2.insert("orange".to_string());
handle2.advance_to(2);
handle2.flush();

while probe.less_than(handle1.time()) {
worker.step();
}
}).unwrap();
}
5 changes: 4 additions & 1 deletion src/trace/implementations/ord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use trace::layers::Builder as TrieBuilder;
use trace::layers::Cursor as TrieCursor;
use trace::layers::ordered::{OrdOffset, OrderedLayer, OrderedBuilder, OrderedCursor};
use trace::layers::ordered_leaf::{OrderedLeaf, OrderedLeafBuilder};
use trace::abomonated_arc_blanket_impls::AbomArc;
use trace::{Batch, BatchReader, Builder, Merger, Cursor};
use trace::description::Description;

Expand All @@ -46,6 +47,8 @@ pub type OrdKeySpine<K, T, R, O=usize> = Spine<K, (), T, R, Rc<OrdKeyBatch<K, T,
/// A trace implementation for empty values using a spine of abomonated ordered lists.
pub type OrdKeySpineAbom<K, T, R, O=usize> = Spine<K, (), T, R, Rc<Abomonated<OrdKeyBatch<K, T, R, O>, Vec<u8>>>>;

/// A trace implementation for empty values using a spine of atomic reference counted abomonated ordered lists.
pub type OrdKeySpineAbomArc<K, T, R, O = usize> = Spine<K, (), T, R, AbomArc<OrdKeyBatch<K, T, R, O>>>;

/// An immutable collection of update tuples, from a contiguous interval of logical times.
#[derive(Debug, Abomonation)]
Expand Down Expand Up @@ -396,7 +399,7 @@ where


/// An immutable collection of update tuples, from a contiguous interval of logical times.
#[derive(Debug, Abomonation)]
#[derive(Debug, Clone, Abomonation)]
pub struct OrdKeyBatch<K, T, R, O=usize>
where
K: Ord,
Expand Down
162 changes: 162 additions & 0 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,168 @@ pub mod rc_blanket_impls {
}
}

/// Blanket implementations for atomic reference counted batches.
pub mod abomonated_arc_blanket_impls {
use std::io::Write;
use std::mem;
use std::ops::Deref;
use std::sync::Arc;

use abomonation::Abomonation;

/// Wrapper over Arc that can be safely Abomonated.
pub enum AbomArc<T> {
/// An Arc that has been constructed normally
Owned(Arc<T>),
/// The result of decoding an abomonated AbomArc
Abomonated(Box<T>),
}

impl<T> AbomArc<T> {
fn new(inner: T) -> Self {
Self::Owned(Arc::new(inner))
}
}

impl<T: Clone> Clone for AbomArc<T> {
fn clone(&self) -> Self {
match self {
Self::Owned(arc) => Self::Owned(Arc::clone(arc)),
Self::Abomonated(val) => Self::Owned(Arc::new(T::clone(&*val))),
}
}
}

impl<T> Deref for AbomArc<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
match self {
Self::Owned(arc) => &**arc,
Self::Abomonated(val) => &*val,
}
}
}

impl<T: Abomonation> Abomonation for AbomArc<T> {
unsafe fn entomb<W: Write>(&self, bytes: &mut W) -> std::io::Result<()> {
bytes.write_all(std::slice::from_raw_parts(mem::transmute(&**self), mem::size_of::<T>()))?;
(**self).entomb(bytes)
}
unsafe fn exhume<'a,'b>(&'a mut self, bytes: &'b mut [u8]) -> Option<&'b mut [u8]> {
let binary_len = mem::size_of::<T>();
if binary_len > bytes.len() {
None
} else {
let (mine, rest) = bytes.split_at_mut(binary_len);
let mut value = Box::from_raw(mine.as_mut_ptr() as *mut T);
let rest = (*value).exhume(rest)?;
std::ptr::write(self, Self::Abomonated(value));
Some(rest)
}
}
fn extent(&self) -> usize {
mem::size_of::<T>() + (&**self).extent()
}
}

use timely::progress::{Antichain, frontier::AntichainRef};
use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description};

impl<K, V, T, R, B: BatchReader<K,V,T,R>> BatchReader<K,V,T,R> for AbomArc<B> {

/// The type used to enumerate the batch's contents.
type Cursor = ArcBatchCursor<K, V, T, R, B>;
/// Acquires a cursor to the batch's contents.
fn cursor(&self) -> Self::Cursor {
ArcBatchCursor::new((&**self).cursor())
}

/// The number of updates in the batch.
fn len(&self) -> usize { (&**self).len() }
/// Describes the times of the updates in the batch.
fn description(&self) -> &Description<T> { (&**self).description() }
}

/// Wrapper to provide cursor to nested scope.
pub struct ArcBatchCursor<K, V, T, R, B: BatchReader<K, V, T, R>> {
phantom: ::std::marker::PhantomData<(K, V, T, R)>,
cursor: B::Cursor,
}

impl<K, V, T, R, B: BatchReader<K, V, T, R>> ArcBatchCursor<K, V, T, R, B> {
fn new(cursor: B::Cursor) -> Self {
ArcBatchCursor {
cursor,
phantom: ::std::marker::PhantomData,
}
}
}

impl<K, V, T, R, B: BatchReader<K, V, T, R>> Cursor<K, V, T, R> for ArcBatchCursor<K, V, T, R, B> {

type Storage = AbomArc<B>;

#[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
#[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }

#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(storage) }

#[inline]
fn map_times<L: FnMut(&T, &R)>(&mut self, storage: &Self::Storage, logic: L) {
self.cursor.map_times(storage, logic)
}

#[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
#[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(storage, key) }

#[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
#[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(storage, val) }

#[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
#[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
}

/// An immutable collection of updates.
impl<K,V,T,R,B: Batch<K,V,T,R>> Batch<K, V, T, R> for AbomArc<B> {
type Batcher = ArcBatcher<K, V, T, R, B>;
type Builder = ArcBuilder<K, V, T, R, B>;
type Merger = ArcMerger<K, V, T, R, B>;
}

/// Wrapper type for batching reference counted batches.
pub struct ArcBatcher<K,V,T,R,B:Batch<K,V,T,R>> { batcher: B::Batcher }

/// Functionality for collecting and batching updates.
impl<K,V,T,R,B:Batch<K,V,T,R>> Batcher<K, V, T, R, AbomArc<B>> for ArcBatcher<K,V,T,R,B> {
fn new() -> Self { ArcBatcher { batcher: <B::Batcher as Batcher<K,V,T,R,B>>::new() } }
fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>) { self.batcher.push_batch(batch) }
fn seal(&mut self, upper: Antichain<T>) -> AbomArc<B> { AbomArc::new(self.batcher.seal(upper)) }
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> { self.batcher.frontier() }
}

/// Wrapper type for building reference counted batches.
pub struct ArcBuilder<K,V,T,R,B:Batch<K,V,T,R>> { builder: B::Builder }

/// Functionality for building batches from ordered update sequences.
impl<K,V,T,R,B:Batch<K,V,T,R>> Builder<K, V, T, R, AbomArc<B>> for ArcBuilder<K,V,T,R,B> {
fn new() -> Self { ArcBuilder { builder: <B::Builder as Builder<K,V,T,R,B>>::new() } }
fn with_capacity(cap: usize) -> Self { ArcBuilder { builder: <B::Builder as Builder<K,V,T,R,B>>::with_capacity(cap) } }
fn push(&mut self, element: (K, V, T, R)) { self.builder.push(element) }
fn done(self, lower: Antichain<T>, upper: Antichain<T>, since: Antichain<T>) -> AbomArc<B> { AbomArc::new(self.builder.done(lower, upper, since)) }
}

/// Wrapper type for merging reference counted batches.
pub struct ArcMerger<K,V,T,R,B:Batch<K,V,T,R>> { merger: B::Merger }

/// Represents a merge in progress.
impl<K,V,T,R,B:Batch<K,V,T,R>> Merger<K, V, T, R, AbomArc<B>> for ArcMerger<K,V,T,R,B> {
fn new(source1: &AbomArc<B>, source2: &AbomArc<B>, compaction_frontier: Option<AntichainRef<T>>) -> Self { ArcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } }
fn work(&mut self, source1: &AbomArc<B>, source2: &AbomArc<B>, fuel: &mut isize) { self.merger.work(source1, source2, fuel) }
fn done(self) -> AbomArc<B> { AbomArc::new(self.merger.done()) }
}
}


/// Blanket implementations for reference counted batches.
pub mod abomonated_blanket_impls {
Expand Down

0 comments on commit 9ba2363

Please sign in to comment.