Skip to content

Commit

Permalink
Always create different cascading pipes out of forked cross typed pipe (
Browse files Browse the repository at this point in the history
#1908)

* Test

* Add test without toPipe

* Fix tests

* Use BYTES_WRITTEN in tests

* Add JobTestExt

* Add comment and simplify code a bit
  • Loading branch information
ttim authored and johnynek committed Apr 17, 2019
1 parent f09a7b9 commit e541c3f
Show file tree
Hide file tree
Showing 2 changed files with 300 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cascading.flow.FlowDef
import cascading.operation.Debug
import cascading.pipe.{ CoGroup, Each, Pipe, HashJoin }
import cascading.tuple.{ Fields, Tuple => CTuple }
import com.stripe.dagon.{ FunctionK, Id, Memoize, Rule, Dag }
import com.stripe.dagon.{ FunctionK, HCache, Id, Rule, Dag }
import com.twitter.scalding.TupleConverter.{ singleConverter, tuple2Converter }
import com.twitter.scalding.TupleSetter.{ singleSetter, tup2Setter }
import com.twitter.scalding.{
Expand Down Expand Up @@ -182,165 +182,220 @@ object CascadingBackend {
}
private[this] val cache = new CompilerCache

private def compile[T](mode: Mode): FunctionK[TypedPipe, CascadingPipe] =
Memoize.functionK[TypedPipe, CascadingPipe](
new Memoize.RecursiveK[TypedPipe, CascadingPipe] {
def toFunction[T] = {
case (cp@CounterPipe(_), rec) =>
def go[A](cp: CounterPipe[A]): CascadingPipe[A] = {
val CascadingPipe(pipe0, initF, fd, conv) = rec(cp.pipe)
val cpipe = RichPipe(pipe0)
.eachTo(initF -> f0)(new IncrementCounters[A](_, TupleConverter.asSuperConverter(conv)))
CascadingPipe.single[A](cpipe, fd)
}
go(cp)
case (cp@CrossPipe(_, _), rec) =>
rec(cp.viaHashJoin)
case (cv@CrossValue(_, _), rec) =>
rec(cv.viaHashJoin)
case (DebugPipe(p), rec) =>
val inner = rec(p)
inner.copy(pipe = new Each(inner.pipe, new Debug))
case (EmptyTypedPipe, rec) =>
// just use an empty iterable pipe.
rec(IterablePipe(List.empty[T]))
case (fk@FilterKeys(_, _), rec) =>
def go[K, V](node: FilterKeys[K, V]): CascadingPipe[(K, V)] = {
val rewrite = Filter[(K, V)](node.input, FilterKeysToFilter(node.fn))
rec(rewrite)
}
go(fk)
case (f@Filter(_, _), rec) =>
// hand holding for type inference
def go[T1 <: T](f: Filter[T1]): CascadingPipe[T] = {
val Filter(input, fn) = f
val CascadingPipe(pipe, initF, fd, conv) = rec(input)
// This does not need a setter, which is nice.
val fpipe = RichPipe(pipe).filter[T1](initF)(fn)(TupleConverter.asSuperConverter(conv))
CascadingPipe[T](fpipe, initF, fd, conv)
}
/**
* Method to compile scalding's `TypedPipe`s to cascading's `Pipe`s.
*
* Since equal `TypedPipe`s define same computation we would like to compile them into referentially the same cascading's `Pipe` instance.
* This logic breaks if one typed pipe is really big and has two forked different computations both of which significantly decrease size of the data.
* If we will cache common part of this two computations in the same cascading's `Pipe` instance we end up with common part being materialized.
* Therefore for some kind of `TypedPipe`s we want to avoid their caching.
*
* `.cross` `TypedPipe` is one of examples of such `TypedPipe`s we never want to materialize and, therefore, cache.
*
* `compile` logic is separated into next functions:
* - `transform` which defines main transformation logic, without any caching applied.
* This method accepts `rec` parameter which is being called to transform children pipes.
* - `withCachePolicy` which defines transformation logic with caching applied.
* - `notCached` to support use case with `.cross` pipe, where pipe itself shouldn't be cached but `left` and `right` sides of it should be.
*/
private def compile(mode: Mode): FunctionK[TypedPipe, CascadingPipe] =
new FunctionK[TypedPipe, CascadingPipe] {

private val cache = HCache.empty[TypedPipe, CascadingPipe]

override def toFunction[U]: TypedPipe[U] => CascadingPipe[U] = withCachePolicy

private def withCachePolicy[U]: TypedPipe[U] => CascadingPipe[U] = {
// Don't cache `CrossPipe`, but cache `left` and `right` side of it
case cp@CrossPipe(left, right) =>
notCached(excludes = Set(left, right))(cp)
// Don't cache `Fork` and `WithDescriptionTypedPipe`
// since if we do cache them `CrossPipe` will end up being cached as well
case tp@Fork(_) =>
transform(tp, this)
case tp@WithDescriptionTypedPipe(_, _) =>
transform(tp, this)
// Cache all other typed pipes
case tp =>
cache.getOrElseUpdate(tp, transform(tp, this))
}

go(f)
case (f@FlatMapValues(_, _), rec) =>
def go[K, V, U](node: FlatMapValues[K, V, U]): CascadingPipe[T] =
rec(FlatMapped[(K, V), (K, U)](node.input, FlatMapValuesToFlatMap(node.fn)))

go(f)
case (fm@FlatMapped(_, _), rec) =>
// TODO we can optimize a flatmapped input directly and skip some tupleconverters
def go[A, B <: T](fm: FlatMapped[A, B]): CascadingPipe[T] = {
val CascadingPipe(pipe, initF, fd, conv) = rec(fm.input)
val fmpipe = RichPipe(pipe).flatMapTo[A, T](initF -> f0)(fm.fn)(TupleConverter.asSuperConverter(conv), singleSetter)
CascadingPipe.single[B](fmpipe, fd)
}
private def notCached(excludes: Set[TypedPipe[_]]): FunctionK[TypedPipe, CascadingPipe] =
new FunctionK[TypedPipe, CascadingPipe] {
override def toFunction[U]: TypedPipe[U] => CascadingPipe[U] = { tp =>
if (excludes.contains(tp)) withCachePolicy(tp) else transform(tp, this)
}
}

go(fm)
case (ForceToDisk(input), rec) =>
val cp = rec(input)
cp.copy(pipe = RichPipe(cp.pipe).forceToDisk)
case (Fork(input), rec) =>
// fork doesn't mean anything here since we are already planning each TypedPipe to
// something in cascading. Fork is an optimizer level operation
rec(input)
case (IterablePipe(iter), _) =>
val fd = new FlowDef
val pipe = IterableSource[T](iter, f0)(singleSetter, singleConverter).read(fd, mode)
CascadingPipe.single[T](pipe, fd)
case (f@MapValues(_, _), rec) =>
def go[K, A, B](fn: MapValues[K, A, B]): CascadingPipe[_ <: (K, B)] =
rec(Mapped[(K, A), (K, B)](fn.input, MapValuesToMap(fn.fn)))

go(f)
case (m@Mapped(_, _), rec) =>
def go[A, B <: T](m: Mapped[A, B]): CascadingPipe[T] = {
val Mapped(input, fn) = m
val CascadingPipe(pipe, initF, fd, conv) = rec(input)
val fmpipe = RichPipe(pipe).mapTo[A, T](initF -> f0)(fn)(TupleConverter.asSuperConverter(conv), singleSetter)
CascadingPipe.single[B](fmpipe, fd)
}
private def transform[T](
pipe: TypedPipe[T],
rec: FunctionK[TypedPipe, CascadingPipe]
): CascadingPipe[T] = pipe match {
case cp@CounterPipe(_) =>
def go[A](cp: CounterPipe[A]): CascadingPipe[A] = {
val CascadingPipe(pipe0, initF, fd, conv) = rec(cp.pipe)
val cpipe = RichPipe(pipe0)
.eachTo(initF -> f0)(new IncrementCounters[A](_, TupleConverter
.asSuperConverter(conv)))
CascadingPipe.single[A](cpipe, fd)
}

go(m)

case (m@MergedTypedPipe(_, _), rec) =>
OptimizationRules.unrollMerge(m) match {
case Nil => rec(EmptyTypedPipe)
case h :: Nil => rec(h)
case nonEmpty =>
// TODO: a better optimization is to not materialize this
// node at all if there is no fan out since groupBy and cogroupby
// can accept multiple inputs

val flowDef = new FlowDef
// if all of the converters are the same, we could skip some work
// here, but need to be able to see that correctly
val pipes = nonEmpty.map { p => rec(p).toPipe(f0, flowDef, singleSetter) }
val merged = new cascading.pipe.Merge(pipes.map(RichPipe.assignName): _*)
CascadingPipe.single[T](merged, flowDef)
}
case (SourcePipe(typedSrc), _) =>
val fd = new FlowDef
val pipe = typedSrc.read(fd, mode)
CascadingPipe[T](pipe, typedSrc.sourceFields, fd, typedSrc.converter[T])
case (sblk@SumByLocalKeys(_, _), rec) =>
def go[K, V](sblk: SumByLocalKeys[K, V]): CascadingPipe[(K, V)] = {
val cp = rec(sblk.input)
val localFD = new FlowDef
val cpKV: Pipe = cp.toPipe(kvFields, localFD, tup2Setter)
val msr = new MapsideReduce(sblk.semigroup, new Fields("key"), valueField, None)(singleConverter[V], singleSetter[V])
val kvpipe = RichPipe(cpKV).eachTo(kvFields -> kvFields) { _ => msr }
CascadingPipe(kvpipe, kvFields, localFD, tuple2Converter[K, V])
go(cp)
case cp@CrossPipe(_, _) =>
rec(cp.viaHashJoin)
case cv@CrossValue(_, _) =>
rec(cv.viaHashJoin)
case DebugPipe(p) =>
val inner = rec(p)
inner.copy(pipe = new Each(inner.pipe, new Debug))
case EmptyTypedPipe =>
// just use an empty iterable pipe.
rec(IterablePipe(List.empty[T]))
case fk@FilterKeys(_, _) =>
def go[K, V](node: FilterKeys[K, V]): CascadingPipe[(K, V)] = {
val rewrite = Filter[(K, V)](node.input, FilterKeysToFilter(node.fn))
rec(rewrite)
}

go(fk)
case f@Filter(_, _) =>
// hand holding for type inference
def go[T1 <: T](f: Filter[T1]): CascadingPipe[T] = {
val Filter(input, fn) = f
val CascadingPipe(pipe, initF, fd, conv) = rec(input)
// This does not need a setter, which is nice.
val fpipe = RichPipe(pipe).filter[T1](initF)(fn)(TupleConverter.asSuperConverter(conv))
CascadingPipe[T](fpipe, initF, fd, conv)
}

go(f)
case f@FlatMapValues(_, _) =>
def go[K, V, U](node: FlatMapValues[K, V, U]): CascadingPipe[T] =
rec(FlatMapped[(K, V), (K, U)](node.input, FlatMapValuesToFlatMap(node.fn)))

go(f)
case fm@FlatMapped(_, _) =>
// TODO we can optimize a flatmapped input directly and skip some tupleconverters
def go[A, B <: T](fm: FlatMapped[A, B]): CascadingPipe[T] = {
val CascadingPipe(pipe, initF, fd, conv) = rec(fm.input)
val fmpipe = RichPipe(pipe).flatMapTo[A, T](initF -> f0)(fm.fn)(TupleConverter
.asSuperConverter(conv), singleSetter)
CascadingPipe.single[B](fmpipe, fd)
}

go(fm)
case ForceToDisk(input) =>
val cp = rec(input)
cp.copy(pipe = RichPipe(cp.pipe).forceToDisk)
case Fork(input) =>
// fork doesn't mean anything here since we are already planning each TypedPipe to
// something in cascading. Fork is an optimizer level operation
rec(input)
case IterablePipe(iter) =>
val fd = new FlowDef
val pipe = IterableSource[T](iter, f0)(singleSetter, singleConverter).read(fd, mode)
CascadingPipe.single[T](pipe, fd)
case f@MapValues(_, _) =>
def go[K, A, B](fn: MapValues[K, A, B]): CascadingPipe[_ <: (K, B)] =
rec(Mapped[(K, A), (K, B)](fn.input, MapValuesToMap(fn.fn)))

go(f)
case m@Mapped(_, _) =>
def go[A, B <: T](m: Mapped[A, B]): CascadingPipe[T] = {
val Mapped(input, fn) = m
val CascadingPipe(pipe, initF, fd, conv) = rec(input)
val fmpipe = RichPipe(pipe).mapTo[A, T](initF -> f0)(fn)(TupleConverter
.asSuperConverter(conv), singleSetter)
CascadingPipe.single[B](fmpipe, fd)
}

go(m)

case m@MergedTypedPipe(_, _) =>
OptimizationRules.unrollMerge(m) match {
case Nil => rec(EmptyTypedPipe)
case h :: Nil => rec(h)
case nonEmpty =>
// TODO: a better optimization is to not materialize this
// node at all if there is no fan out since groupBy and cogroupby
// can accept multiple inputs

val flowDef = new FlowDef
// if all of the converters are the same, we could skip some work
// here, but need to be able to see that correctly
val pipes = nonEmpty.map { p => rec(p).toPipe(f0, flowDef, singleSetter) }
val merged = new cascading.pipe.Merge(pipes.map(RichPipe.assignName): _*)
CascadingPipe.single[T](merged, flowDef)
}
case SourcePipe(typedSrc) =>
val fd = new FlowDef
val pipe = typedSrc.read(fd, mode)
CascadingPipe[T](pipe, typedSrc.sourceFields, fd, typedSrc.converter[T])
case sblk@SumByLocalKeys(_, _) =>
def go[K, V](sblk: SumByLocalKeys[K, V]): CascadingPipe[(K, V)] = {
val cp = rec(sblk.input)
val localFD = new FlowDef
val cpKV: Pipe = cp.toPipe(kvFields, localFD, tup2Setter)
val msr = new MapsideReduce(sblk
.semigroup, new Fields("key"), valueField, None)(singleConverter[V], singleSetter[V])
val kvpipe = RichPipe(cpKV).eachTo(kvFields -> kvFields) { _ => msr }
CascadingPipe(kvpipe, kvFields, localFD, tuple2Converter[K, V])
}

go(sblk)
case trapped: TrappedPipe[u] =>
val cp = rec(trapped.input)
import trapped._
// TODO: with diamonds in the graph, this might not be correct
// it seems cascading requires puts the immediate tuple that
// caused the exception, so if you addTrap( ).map(f).map(g)
// and f changes the tuple structure, if we don't collapse the
// maps into 1 operation, cascading can write two different
// schemas into the trap, making it unreadable.
// this basically means there can only be one operation in between
// a trap and a forceToDisk or a groupBy/cogroupBy (any barrier).
val fd = new FlowDef
val pp: Pipe = cp.toPipe[u](sink.sinkFields, fd, TupleSetter.asSubSetter(sink.setter))
val pipe = RichPipe.assignName(pp)
fd.addTrap(pipe, sink.createTap(Write)(mode))
CascadingPipe[u](pipe, sink.sinkFields, fd, conv)
case WithDescriptionTypedPipe(input, descs) =>

@annotation.tailrec
def loop[A](
t: TypedPipe[A],
acc: List[(String, Boolean)]
): (TypedPipe[A], List[(String, Boolean)]) =
t match {
case WithDescriptionTypedPipe(i, descs) =>
loop(i, descs ::: acc)
case notDescr => (notDescr, acc)
}
go(sblk)
case (trapped: TrappedPipe[u], rec) =>
val cp = rec(trapped.input)
import trapped._
// TODO: with diamonds in the graph, this might not be correct
// it seems cascading requires puts the immediate tuple that
// caused the exception, so if you addTrap( ).map(f).map(g)
// and f changes the tuple structure, if we don't collapse the
// maps into 1 operation, cascading can write two different
// schemas into the trap, making it unreadable.
// this basically means there can only be one operation in between
// a trap and a forceToDisk or a groupBy/cogroupBy (any barrier).
val fd = new FlowDef
val pp: Pipe = cp.toPipe[u](sink.sinkFields, fd, TupleSetter.asSubSetter(sink.setter))
val pipe = RichPipe.assignName(pp)
fd.addTrap(pipe, sink.createTap(Write)(mode))
CascadingPipe[u](pipe, sink.sinkFields, fd, conv)
case (WithDescriptionTypedPipe(input, descs), rec) =>

@annotation.tailrec
def loop[A](t: TypedPipe[A], acc: List[(String, Boolean)]): (TypedPipe[A], List[(String, Boolean)]) =
t match {
case WithDescriptionTypedPipe(i, descs) =>
loop(i, descs ::: acc)
case notDescr => (notDescr, acc)
}

val (root, allDesc) = loop(input, descs)
val cp = rec(root)
cp.copy(pipe = applyDescriptions(cp.pipe, allDesc))
val (root, allDesc) = loop(input, descs)
val cp = rec(root)
cp.copy(pipe = applyDescriptions(cp.pipe, allDesc))

case (WithOnComplete(input, fn), rec) =>
val cp = rec(input)
val next = new Each(cp.pipe, Fields.ALL, new CleanupIdentityFunction(fn))
cp.copy(pipe = next)
case WithOnComplete(input, fn) =>
val cp = rec(input)
val next = new Each(cp.pipe, Fields.ALL, new CleanupIdentityFunction(fn))
cp.copy(pipe = next)

case (hcg@HashCoGroup(_, _, _), rec) =>
def go[K, V1, V2, R](hcg: HashCoGroup[K, V1, V2, R]): CascadingPipe[(K, R)] =
planHashJoin(hcg.left,
hcg.right,
hcg.joiner,
rec)
case hcg@HashCoGroup(_, _, _) =>
def go[K, V1, V2, R](hcg: HashCoGroup[K, V1, V2, R]): CascadingPipe[(K, R)] =
planHashJoin(hcg.left,
hcg.right,
hcg.joiner,
rec)

go(hcg)
case (ReduceStepPipe(rs), rec) =>
planReduceStep(rs, rec)
go(hcg)
case ReduceStepPipe(rs) =>
planReduceStep(rs, rec)

case (CoGroupedPipe(cg), rec) =>
planCoGroup(cg, rec)
}
})
case CoGroupedPipe(cg) =>
planCoGroup(cg, rec)
}
}

private def applyDescriptions(p: Pipe, descriptions: List[(String, Boolean)]): Pipe = {
val ordered = descriptions.collect { case (d, false) => d }.reverse
Expand Down
Loading

0 comments on commit e541c3f

Please sign in to comment.