From e541c3fbe4c9abca04518bfa4a700608490338bd Mon Sep 17 00:00:00 2001 From: Timur Date: Tue, 16 Apr 2019 18:15:54 -0700 Subject: [PATCH] Always create different cascading pipes out of forked cross typed pipe (#1908) * Test * Add test without toPipe * Fix tests * Use BYTES_WRITTEN in tests * Add JobTestExt * Add comment and simplify code a bit --- .../cascading_backend/CascadingBackend.scala | 357 ++++++++++-------- .../com/twitter/scalding/TypedPipeTest.scala | 94 +++++ 2 files changed, 300 insertions(+), 151 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala index 3ac2af4c83..894fe7d4d2 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala @@ -4,7 +4,7 @@ import cascading.flow.FlowDef import cascading.operation.Debug import cascading.pipe.{ CoGroup, Each, Pipe, HashJoin } import cascading.tuple.{ Fields, Tuple => CTuple } -import com.stripe.dagon.{ FunctionK, Id, Memoize, Rule, Dag } +import com.stripe.dagon.{ FunctionK, HCache, Id, Rule, Dag } import com.twitter.scalding.TupleConverter.{ singleConverter, tuple2Converter } import com.twitter.scalding.TupleSetter.{ singleSetter, tup2Setter } import com.twitter.scalding.{ @@ -182,165 +182,220 @@ object CascadingBackend { } private[this] val cache = new CompilerCache - private def compile[T](mode: Mode): FunctionK[TypedPipe, CascadingPipe] = - Memoize.functionK[TypedPipe, CascadingPipe]( - new Memoize.RecursiveK[TypedPipe, CascadingPipe] { - def toFunction[T] = { - case (cp@CounterPipe(_), rec) => - def go[A](cp: CounterPipe[A]): CascadingPipe[A] = { - val CascadingPipe(pipe0, initF, fd, conv) = rec(cp.pipe) - val cpipe = RichPipe(pipe0) - .eachTo(initF -> f0)(new IncrementCounters[A](_, TupleConverter.asSuperConverter(conv))) - CascadingPipe.single[A](cpipe, fd) - } - go(cp) - case (cp@CrossPipe(_, _), rec) => - rec(cp.viaHashJoin) - case (cv@CrossValue(_, _), rec) => - rec(cv.viaHashJoin) - case (DebugPipe(p), rec) => - val inner = rec(p) - inner.copy(pipe = new Each(inner.pipe, new Debug)) - case (EmptyTypedPipe, rec) => - // just use an empty iterable pipe. - rec(IterablePipe(List.empty[T])) - case (fk@FilterKeys(_, _), rec) => - def go[K, V](node: FilterKeys[K, V]): CascadingPipe[(K, V)] = { - val rewrite = Filter[(K, V)](node.input, FilterKeysToFilter(node.fn)) - rec(rewrite) - } - go(fk) - case (f@Filter(_, _), rec) => - // hand holding for type inference - def go[T1 <: T](f: Filter[T1]): CascadingPipe[T] = { - val Filter(input, fn) = f - val CascadingPipe(pipe, initF, fd, conv) = rec(input) - // This does not need a setter, which is nice. - val fpipe = RichPipe(pipe).filter[T1](initF)(fn)(TupleConverter.asSuperConverter(conv)) - CascadingPipe[T](fpipe, initF, fd, conv) - } + /** + * Method to compile scalding's `TypedPipe`s to cascading's `Pipe`s. + * + * Since equal `TypedPipe`s define same computation we would like to compile them into referentially the same cascading's `Pipe` instance. + * This logic breaks if one typed pipe is really big and has two forked different computations both of which significantly decrease size of the data. + * If we will cache common part of this two computations in the same cascading's `Pipe` instance we end up with common part being materialized. + * Therefore for some kind of `TypedPipe`s we want to avoid their caching. + * + * `.cross` `TypedPipe` is one of examples of such `TypedPipe`s we never want to materialize and, therefore, cache. + * + * `compile` logic is separated into next functions: + * - `transform` which defines main transformation logic, without any caching applied. + * This method accepts `rec` parameter which is being called to transform children pipes. + * - `withCachePolicy` which defines transformation logic with caching applied. + * - `notCached` to support use case with `.cross` pipe, where pipe itself shouldn't be cached but `left` and `right` sides of it should be. + */ + private def compile(mode: Mode): FunctionK[TypedPipe, CascadingPipe] = + new FunctionK[TypedPipe, CascadingPipe] { + + private val cache = HCache.empty[TypedPipe, CascadingPipe] + + override def toFunction[U]: TypedPipe[U] => CascadingPipe[U] = withCachePolicy + + private def withCachePolicy[U]: TypedPipe[U] => CascadingPipe[U] = { + // Don't cache `CrossPipe`, but cache `left` and `right` side of it + case cp@CrossPipe(left, right) => + notCached(excludes = Set(left, right))(cp) + // Don't cache `Fork` and `WithDescriptionTypedPipe` + // since if we do cache them `CrossPipe` will end up being cached as well + case tp@Fork(_) => + transform(tp, this) + case tp@WithDescriptionTypedPipe(_, _) => + transform(tp, this) + // Cache all other typed pipes + case tp => + cache.getOrElseUpdate(tp, transform(tp, this)) + } - go(f) - case (f@FlatMapValues(_, _), rec) => - def go[K, V, U](node: FlatMapValues[K, V, U]): CascadingPipe[T] = - rec(FlatMapped[(K, V), (K, U)](node.input, FlatMapValuesToFlatMap(node.fn))) - - go(f) - case (fm@FlatMapped(_, _), rec) => - // TODO we can optimize a flatmapped input directly and skip some tupleconverters - def go[A, B <: T](fm: FlatMapped[A, B]): CascadingPipe[T] = { - val CascadingPipe(pipe, initF, fd, conv) = rec(fm.input) - val fmpipe = RichPipe(pipe).flatMapTo[A, T](initF -> f0)(fm.fn)(TupleConverter.asSuperConverter(conv), singleSetter) - CascadingPipe.single[B](fmpipe, fd) - } + private def notCached(excludes: Set[TypedPipe[_]]): FunctionK[TypedPipe, CascadingPipe] = + new FunctionK[TypedPipe, CascadingPipe] { + override def toFunction[U]: TypedPipe[U] => CascadingPipe[U] = { tp => + if (excludes.contains(tp)) withCachePolicy(tp) else transform(tp, this) + } + } - go(fm) - case (ForceToDisk(input), rec) => - val cp = rec(input) - cp.copy(pipe = RichPipe(cp.pipe).forceToDisk) - case (Fork(input), rec) => - // fork doesn't mean anything here since we are already planning each TypedPipe to - // something in cascading. Fork is an optimizer level operation - rec(input) - case (IterablePipe(iter), _) => - val fd = new FlowDef - val pipe = IterableSource[T](iter, f0)(singleSetter, singleConverter).read(fd, mode) - CascadingPipe.single[T](pipe, fd) - case (f@MapValues(_, _), rec) => - def go[K, A, B](fn: MapValues[K, A, B]): CascadingPipe[_ <: (K, B)] = - rec(Mapped[(K, A), (K, B)](fn.input, MapValuesToMap(fn.fn))) - - go(f) - case (m@Mapped(_, _), rec) => - def go[A, B <: T](m: Mapped[A, B]): CascadingPipe[T] = { - val Mapped(input, fn) = m - val CascadingPipe(pipe, initF, fd, conv) = rec(input) - val fmpipe = RichPipe(pipe).mapTo[A, T](initF -> f0)(fn)(TupleConverter.asSuperConverter(conv), singleSetter) - CascadingPipe.single[B](fmpipe, fd) - } + private def transform[T]( + pipe: TypedPipe[T], + rec: FunctionK[TypedPipe, CascadingPipe] + ): CascadingPipe[T] = pipe match { + case cp@CounterPipe(_) => + def go[A](cp: CounterPipe[A]): CascadingPipe[A] = { + val CascadingPipe(pipe0, initF, fd, conv) = rec(cp.pipe) + val cpipe = RichPipe(pipe0) + .eachTo(initF -> f0)(new IncrementCounters[A](_, TupleConverter + .asSuperConverter(conv))) + CascadingPipe.single[A](cpipe, fd) + } - go(m) - - case (m@MergedTypedPipe(_, _), rec) => - OptimizationRules.unrollMerge(m) match { - case Nil => rec(EmptyTypedPipe) - case h :: Nil => rec(h) - case nonEmpty => - // TODO: a better optimization is to not materialize this - // node at all if there is no fan out since groupBy and cogroupby - // can accept multiple inputs - - val flowDef = new FlowDef - // if all of the converters are the same, we could skip some work - // here, but need to be able to see that correctly - val pipes = nonEmpty.map { p => rec(p).toPipe(f0, flowDef, singleSetter) } - val merged = new cascading.pipe.Merge(pipes.map(RichPipe.assignName): _*) - CascadingPipe.single[T](merged, flowDef) - } - case (SourcePipe(typedSrc), _) => - val fd = new FlowDef - val pipe = typedSrc.read(fd, mode) - CascadingPipe[T](pipe, typedSrc.sourceFields, fd, typedSrc.converter[T]) - case (sblk@SumByLocalKeys(_, _), rec) => - def go[K, V](sblk: SumByLocalKeys[K, V]): CascadingPipe[(K, V)] = { - val cp = rec(sblk.input) - val localFD = new FlowDef - val cpKV: Pipe = cp.toPipe(kvFields, localFD, tup2Setter) - val msr = new MapsideReduce(sblk.semigroup, new Fields("key"), valueField, None)(singleConverter[V], singleSetter[V]) - val kvpipe = RichPipe(cpKV).eachTo(kvFields -> kvFields) { _ => msr } - CascadingPipe(kvpipe, kvFields, localFD, tuple2Converter[K, V]) + go(cp) + case cp@CrossPipe(_, _) => + rec(cp.viaHashJoin) + case cv@CrossValue(_, _) => + rec(cv.viaHashJoin) + case DebugPipe(p) => + val inner = rec(p) + inner.copy(pipe = new Each(inner.pipe, new Debug)) + case EmptyTypedPipe => + // just use an empty iterable pipe. + rec(IterablePipe(List.empty[T])) + case fk@FilterKeys(_, _) => + def go[K, V](node: FilterKeys[K, V]): CascadingPipe[(K, V)] = { + val rewrite = Filter[(K, V)](node.input, FilterKeysToFilter(node.fn)) + rec(rewrite) + } + + go(fk) + case f@Filter(_, _) => + // hand holding for type inference + def go[T1 <: T](f: Filter[T1]): CascadingPipe[T] = { + val Filter(input, fn) = f + val CascadingPipe(pipe, initF, fd, conv) = rec(input) + // This does not need a setter, which is nice. + val fpipe = RichPipe(pipe).filter[T1](initF)(fn)(TupleConverter.asSuperConverter(conv)) + CascadingPipe[T](fpipe, initF, fd, conv) + } + + go(f) + case f@FlatMapValues(_, _) => + def go[K, V, U](node: FlatMapValues[K, V, U]): CascadingPipe[T] = + rec(FlatMapped[(K, V), (K, U)](node.input, FlatMapValuesToFlatMap(node.fn))) + + go(f) + case fm@FlatMapped(_, _) => + // TODO we can optimize a flatmapped input directly and skip some tupleconverters + def go[A, B <: T](fm: FlatMapped[A, B]): CascadingPipe[T] = { + val CascadingPipe(pipe, initF, fd, conv) = rec(fm.input) + val fmpipe = RichPipe(pipe).flatMapTo[A, T](initF -> f0)(fm.fn)(TupleConverter + .asSuperConverter(conv), singleSetter) + CascadingPipe.single[B](fmpipe, fd) + } + + go(fm) + case ForceToDisk(input) => + val cp = rec(input) + cp.copy(pipe = RichPipe(cp.pipe).forceToDisk) + case Fork(input) => + // fork doesn't mean anything here since we are already planning each TypedPipe to + // something in cascading. Fork is an optimizer level operation + rec(input) + case IterablePipe(iter) => + val fd = new FlowDef + val pipe = IterableSource[T](iter, f0)(singleSetter, singleConverter).read(fd, mode) + CascadingPipe.single[T](pipe, fd) + case f@MapValues(_, _) => + def go[K, A, B](fn: MapValues[K, A, B]): CascadingPipe[_ <: (K, B)] = + rec(Mapped[(K, A), (K, B)](fn.input, MapValuesToMap(fn.fn))) + + go(f) + case m@Mapped(_, _) => + def go[A, B <: T](m: Mapped[A, B]): CascadingPipe[T] = { + val Mapped(input, fn) = m + val CascadingPipe(pipe, initF, fd, conv) = rec(input) + val fmpipe = RichPipe(pipe).mapTo[A, T](initF -> f0)(fn)(TupleConverter + .asSuperConverter(conv), singleSetter) + CascadingPipe.single[B](fmpipe, fd) + } + + go(m) + + case m@MergedTypedPipe(_, _) => + OptimizationRules.unrollMerge(m) match { + case Nil => rec(EmptyTypedPipe) + case h :: Nil => rec(h) + case nonEmpty => + // TODO: a better optimization is to not materialize this + // node at all if there is no fan out since groupBy and cogroupby + // can accept multiple inputs + + val flowDef = new FlowDef + // if all of the converters are the same, we could skip some work + // here, but need to be able to see that correctly + val pipes = nonEmpty.map { p => rec(p).toPipe(f0, flowDef, singleSetter) } + val merged = new cascading.pipe.Merge(pipes.map(RichPipe.assignName): _*) + CascadingPipe.single[T](merged, flowDef) + } + case SourcePipe(typedSrc) => + val fd = new FlowDef + val pipe = typedSrc.read(fd, mode) + CascadingPipe[T](pipe, typedSrc.sourceFields, fd, typedSrc.converter[T]) + case sblk@SumByLocalKeys(_, _) => + def go[K, V](sblk: SumByLocalKeys[K, V]): CascadingPipe[(K, V)] = { + val cp = rec(sblk.input) + val localFD = new FlowDef + val cpKV: Pipe = cp.toPipe(kvFields, localFD, tup2Setter) + val msr = new MapsideReduce(sblk + .semigroup, new Fields("key"), valueField, None)(singleConverter[V], singleSetter[V]) + val kvpipe = RichPipe(cpKV).eachTo(kvFields -> kvFields) { _ => msr } + CascadingPipe(kvpipe, kvFields, localFD, tuple2Converter[K, V]) + } + + go(sblk) + case trapped: TrappedPipe[u] => + val cp = rec(trapped.input) + import trapped._ + // TODO: with diamonds in the graph, this might not be correct + // it seems cascading requires puts the immediate tuple that + // caused the exception, so if you addTrap( ).map(f).map(g) + // and f changes the tuple structure, if we don't collapse the + // maps into 1 operation, cascading can write two different + // schemas into the trap, making it unreadable. + // this basically means there can only be one operation in between + // a trap and a forceToDisk or a groupBy/cogroupBy (any barrier). + val fd = new FlowDef + val pp: Pipe = cp.toPipe[u](sink.sinkFields, fd, TupleSetter.asSubSetter(sink.setter)) + val pipe = RichPipe.assignName(pp) + fd.addTrap(pipe, sink.createTap(Write)(mode)) + CascadingPipe[u](pipe, sink.sinkFields, fd, conv) + case WithDescriptionTypedPipe(input, descs) => + + @annotation.tailrec + def loop[A]( + t: TypedPipe[A], + acc: List[(String, Boolean)] + ): (TypedPipe[A], List[(String, Boolean)]) = + t match { + case WithDescriptionTypedPipe(i, descs) => + loop(i, descs ::: acc) + case notDescr => (notDescr, acc) } - go(sblk) - case (trapped: TrappedPipe[u], rec) => - val cp = rec(trapped.input) - import trapped._ - // TODO: with diamonds in the graph, this might not be correct - // it seems cascading requires puts the immediate tuple that - // caused the exception, so if you addTrap( ).map(f).map(g) - // and f changes the tuple structure, if we don't collapse the - // maps into 1 operation, cascading can write two different - // schemas into the trap, making it unreadable. - // this basically means there can only be one operation in between - // a trap and a forceToDisk or a groupBy/cogroupBy (any barrier). - val fd = new FlowDef - val pp: Pipe = cp.toPipe[u](sink.sinkFields, fd, TupleSetter.asSubSetter(sink.setter)) - val pipe = RichPipe.assignName(pp) - fd.addTrap(pipe, sink.createTap(Write)(mode)) - CascadingPipe[u](pipe, sink.sinkFields, fd, conv) - case (WithDescriptionTypedPipe(input, descs), rec) => - - @annotation.tailrec - def loop[A](t: TypedPipe[A], acc: List[(String, Boolean)]): (TypedPipe[A], List[(String, Boolean)]) = - t match { - case WithDescriptionTypedPipe(i, descs) => - loop(i, descs ::: acc) - case notDescr => (notDescr, acc) - } - val (root, allDesc) = loop(input, descs) - val cp = rec(root) - cp.copy(pipe = applyDescriptions(cp.pipe, allDesc)) + val (root, allDesc) = loop(input, descs) + val cp = rec(root) + cp.copy(pipe = applyDescriptions(cp.pipe, allDesc)) - case (WithOnComplete(input, fn), rec) => - val cp = rec(input) - val next = new Each(cp.pipe, Fields.ALL, new CleanupIdentityFunction(fn)) - cp.copy(pipe = next) + case WithOnComplete(input, fn) => + val cp = rec(input) + val next = new Each(cp.pipe, Fields.ALL, new CleanupIdentityFunction(fn)) + cp.copy(pipe = next) - case (hcg@HashCoGroup(_, _, _), rec) => - def go[K, V1, V2, R](hcg: HashCoGroup[K, V1, V2, R]): CascadingPipe[(K, R)] = - planHashJoin(hcg.left, - hcg.right, - hcg.joiner, - rec) + case hcg@HashCoGroup(_, _, _) => + def go[K, V1, V2, R](hcg: HashCoGroup[K, V1, V2, R]): CascadingPipe[(K, R)] = + planHashJoin(hcg.left, + hcg.right, + hcg.joiner, + rec) - go(hcg) - case (ReduceStepPipe(rs), rec) => - planReduceStep(rs, rec) + go(hcg) + case ReduceStepPipe(rs) => + planReduceStep(rs, rec) - case (CoGroupedPipe(cg), rec) => - planCoGroup(cg, rec) - } - }) + case CoGroupedPipe(cg) => + planCoGroup(cg, rec) + } + } private def applyDescriptions(p: Pipe, descriptions: List[(String, Boolean)]): Pipe = { val ordered = descriptions.collect { case (d, false) => d }.reverse diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala index c74bcd5db2..fcd21bbcb7 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala @@ -30,6 +30,13 @@ object TUtil { def printStack(fn: => Unit): Unit = { try { fn } catch { case e: Throwable => e.printStackTrace; throw e } } + + implicit class JobTestExt(test: JobTest) { + def writesLessDataThen(limitInBytes: Int): JobTest = test + .counter("BYTES_WRITTEN", group = "org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter") { + value => assert(value < limitInBytes, s"Job wrote $value bytes of data with limit $limitInBytes") + } + } } class TupleAdderJob(args: Args) extends Job(args) { @@ -1551,3 +1558,90 @@ class TypedPipeConverterTest extends FunSuite { assert(result == expected) } } + +object TypedPipeCrossWithMapWithToPipeTest { + val source = TypedText.tsv[Int]("source") + + val sink1 = TypedText.tsv[Int]("sink1") + val sink2 = TypedText.tsv[Int]("sink2") + + class TestJob(args: Args) extends Job(args) { + val mapPipe: TypedPipe[Map[Int, Int]] = TypedPipe.from(source) + .groupAll + .toList + .mapValues(values => values.map(v => (v, v)).toMap) + .values + + val crossedMapped = TypedPipe.from(source) + .cross(mapPipe) + .map { case (value, map) => map(value) } + + crossedMapped.toPipe('value).write(sink1) + crossedMapped.map(identity).toPipe('value).write(sink2) + } +} + +class TypedPipeCrossWithMapWithToPipeTest extends FunSuite { + import TypedPipeCrossWithMapWithToPipeTest._ + import TUtil._ + + test("data between cross and subsequent map shouldn't be materialized") { + val n = 3000 + val bytesPerElement = 100 // we shouldn't write more than 100 bytes per element + val values = 1 to n + + JobTest(new TestJob(_)) + .source(source, values) + .typedSink(sink1) { outBuf => + assert(outBuf.toSet == values.toSet) + } + .typedSink(sink2) { outBuf => + assert(outBuf.toSet == values.toSet) + } + .writesLessDataThen(bytesPerElement * n) + .runHadoop + .finish() + } +} + +object TypedPipeCrossWithDifferentMapsAfterTest { + val source = TypedText.tsv[Int]("source") + + val sink1 = TypedText.tsv[Int]("sink1") + val sink2 = TypedText.tsv[Int]("sink2") + + class TestJob(args: Args) extends Job(args) { + val mapPipe: TypedPipe[Map[Int, Int]] = TypedPipe.from(source) + .groupAll + .toList + .mapValues(values => values.map(v => (v, v)).toMap) + .values + + val crossed = TypedPipe.from(source).cross(mapPipe) + crossed.map { case (value, map) => map(value) }.write(sink1) + crossed.map { case (value, map) => map(identity(value)) }.write(sink2) + } +} + +class TypedPipeCrossWithDifferentMapsAfterTest extends FunSuite { + import TypedPipeCrossWithDifferentMapsAfterTest._ + import TUtil._ + + test("cross data shouldn't be materialized") { + val n = 3000 + val bytesPerElement = 100 // we shouldn't write more than 100 bytes per element + val values = 1 to n + + JobTest(new TestJob(_)) + .source(source, values) + .typedSink(sink1) { outBuf => + assert(outBuf.toSet == values.toSet) + } + .typedSink(sink2) { outBuf => + assert(outBuf.toSet == values.toSet) + } + .writesLessDataThen(bytesPerElement * n) + .runHadoop + .finish() + } +}