Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the WritePartitioner to exactly match cascading #1805

Merged
merged 3 commits into from
Feb 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -621,13 +621,31 @@ object OptimizationRules {
object RemoveDuplicateForceFork extends PartialRule[TypedPipe] {
def applyWhere[T](on: Dag[TypedPipe]) = {
case ForceToDisk(ForceToDisk(t)) => ForceToDisk(t)
case ForceToDisk(WithDescriptionTypedPipe(ForceToDisk(t), desc)) =>
// we might as well only do one force to disk in this case
WithDescriptionTypedPipe(ForceToDisk(t), desc)
case ForceToDisk(Fork(t)) => ForceToDisk(t)
case Fork(Fork(t)) => Fork(t)
case Fork(ForceToDisk(t)) => ForceToDisk(t)
case Fork(t) if on.contains(ForceToDisk(t)) => ForceToDisk(t)
}
}

/**
* If a fork has no fan-out when planned, it serves no purpose
* and is safe to remove. Likewise, there is no reason
* to put a forceToDisk immediatle after a source
*/
object RemoveUselessFork extends PartialRule[TypedPipe] {
def applyWhere[T](on: Dag[TypedPipe]) = {
case fork@Fork(t) if on.hasSingleDependent(fork) => t
case Fork(src@SourcePipe(_)) => src
case Fork(iter@IterablePipe(_)) => iter
case ForceToDisk(src@SourcePipe(_)) => src
case ForceToDisk(iter@IterablePipe(_)) => iter
}
}

/**
* We ignore .group if there are is no setting of reducers
*
Expand Down Expand Up @@ -930,6 +948,7 @@ object OptimizationRules {
List(
// phase 0, add explicit forks to not duplicate pipes on fanout below
AddExplicitForks,
RemoveUselessFork,
// phase 1, compose flatMap/map, move descriptions down, defer merge, filter pushup etc...
composeSame.orElse(DescribeLater).orElse(FilterKeysEarly).orElse(DeferMerge),
// phase 2, combine different kinds of mapping operations into flatMaps, including redundant merges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object WritePartitioner {
}
// Now apply the rules:
logger.info(s"applying rules to graph of size: ${finalDag.allNodes.size}")
val optDag = finalDag.applySeq(phases)
val optDag = finalDag.applySeq(phases :+ OptimizationRules.RemoveUselessFork)
logger.info(s"optimized graph hash size: ${optDag.allNodes.size}")

import TypedPipe.{ReduceStepPipe, HashCoGroup}
Expand Down Expand Up @@ -215,80 +215,171 @@ object WritePartitioner {
}
}

// Now we convert
val fn = Memoize.functionK[TypedPipe, mat.TP](
new Memoize.RecursiveK[TypedPipe, mat.TP] {
/**
* If cascading would consider the current pipe as a Logical reduce
* we can avoid some forces below. This method returns true
* if the pipe is ending on a reduce (not potentially a map-only job)
*/
@annotation.tailrec
def isLogicalReduce(tp: TypedPipe[Any]): Boolean = {
import TypedPipe._
tp match {
case EmptyTypedPipe | IterablePipe(_) | SourcePipe(_) => false
case CounterPipe(a) => isLogicalReduce(a)
case cp@CrossPipe(_, _) => isLogicalReduce(cp.viaHashJoin)
case cp@CrossValue(_, _) => isLogicalReduce(cp.viaHashJoin)
case DebugPipe(p) => isLogicalReduce(p)
case FilterKeys(p, _) => isLogicalReduce(p)
case Filter(p, _) => isLogicalReduce(p)
case FlatMapValues(p, _) => isLogicalReduce(p)
case FlatMapped(p, _) => isLogicalReduce(p)
case ForceToDisk(_) => false // not reducers for sure, could be a map-only job
case Fork(_) => false // TODO, not super clear
case HashCoGroup(left, _, _) => isLogicalReduce(left)
case MapValues(p, _) => isLogicalReduce(p)
case Mapped(p, _) => isLogicalReduce(p)
case MergedTypedPipe(_, _) => false
case ReduceStepPipe(_) => true
case SumByLocalKeys(p, _) => isLogicalReduce(p)
case TrappedPipe(p, _, _) => isLogicalReduce(p)
case CoGroupedPipe(_) => true
case WithOnComplete(p, _) => isLogicalReduce(p)
case WithDescriptionTypedPipe(p, _) => isLogicalReduce(p)
}
}

/**
* We use this state to track where we are as we recurse up the graph.
* Since we know at the very end we will write, we can avoid, for instance
* forcing a reduce operation that is followed only by a map and a write.
*
* Coupled with the isLogicalReduce above, we can emulate the behavior
* of the cascading planner as we recurse up.
*/
sealed abstract class BelowState {
def |(that: BelowState): BelowState =
(this, that) match {
case (BelowState.Write, later) => later
case (BelowState.OnlyMapping, BelowState.Write) => BelowState.OnlyMapping
case (BelowState.OnlyMapping, mapOrMater) => mapOrMater
case (BelowState.Materialized, _) => BelowState.Materialized
}
}
object BelowState {
case object Write extends BelowState
case object OnlyMapping extends BelowState
case object Materialized extends BelowState
}
type P[a] = (TypedPipe[a], BelowState)
/**
* Given a pipe, and the state below it, return the materialized
* version of that pipe. This should cause no more materializations
* than cascading would do, and indeed we test for this property
*/
val fn = Memoize.functionK[P, mat.TP](
new Memoize.RecursiveK[P, mat.TP] {
import TypedPipe._
import BelowState._

def toFunction[A] = {
case (cp: CounterPipe[a], rec) =>
mat.map(rec(cp.pipe))(CounterPipe(_: TypedPipe[(a, Iterable[((String, String), Long)])]))
case (c: CrossPipe[a, b], rec) =>
rec(c.viaHashJoin)
case (cv@CrossValue(_, _), rec) =>
rec(cv.viaHashJoin)
case (p: DebugPipe[a], rec) =>
mat.map(rec(p.input))(DebugPipe(_: TypedPipe[a]))
case (p: FilterKeys[a, b], rec) =>
mat.map(rec(p.input))(FilterKeys(_: TypedPipe[(a, b)], p.fn))
case (p: Filter[a], rec) =>
mat.map(rec(p.input))(Filter(_: TypedPipe[a], p.fn))
case (Fork(src@IterablePipe(_)), rec) =>
// no need to put a checkpoint here:
rec(src)
case (Fork(src@SourcePipe(_)), rec) =>
case ((cp: CounterPipe[a], bs), rec) =>
mat.map(rec((cp.pipe, bs)))(CounterPipe(_: TypedPipe[(a, Iterable[((String, String), Long)])]))
case ((c: CrossPipe[a, b], bs), rec) =>
rec((c.viaHashJoin, bs))
case ((cv@CrossValue(_, _), bs), rec) =>
rec((cv.viaHashJoin, bs))
case ((p: DebugPipe[a], bs), rec) =>
mat.map(rec((p.input, bs)))(DebugPipe(_: TypedPipe[a]))
case ((p: FilterKeys[a, b], bs), rec) =>
mat.map(rec((p.input, bs | OnlyMapping)))(FilterKeys(_: TypedPipe[(a, b)], p.fn))
case ((p: Filter[a], bs), rec) =>
mat.map(rec((p.input, bs | OnlyMapping)))(Filter(_: TypedPipe[a], p.fn))
case ((Fork(of), bs), rec) =>
// Treat forks as forceToDisk after
// optimizations (which should have removed unneeded forks
rec((ForceToDisk(of), bs))
case ((p: FlatMapValues[a, b, c], bs), rec) =>
mat.map(rec((p.input, bs | OnlyMapping)))(FlatMapValues(_: TypedPipe[(a, b)], p.fn))
case ((p: FlatMapped[a, b], bs), rec) =>
mat.map(rec((p.input, bs | OnlyMapping)))(FlatMapped(_: TypedPipe[a], p.fn))
case ((ForceToDisk(src@IterablePipe(_)), bs), rec) =>
// no need to put a checkpoint here:
rec(src)
case (p: Fork[a], rec) =>
mat.materialize(rec(p.input))
case (p: FlatMapValues[a, b, c], rec) =>
mat.map(rec(p.input))(FlatMapValues(_: TypedPipe[(a, b)], p.fn))
case (p: FlatMapped[a, b], rec) =>
mat.map(rec(p.input))(FlatMapped(_: TypedPipe[a], p.fn))
case (ForceToDisk(src@IterablePipe(_)), rec) =>
rec((src, bs))
case ((ForceToDisk(src@SourcePipe(_)), bs), rec) =>
// no need to put a checkpoint here:
rec(src)
case (ForceToDisk(src@SourcePipe(_)), rec) =>
// no need to put a checkpoint here:
rec(src)
case (p: ForceToDisk[a], rec) =>
mat.materialize(rec(p.input))
case (it@IterablePipe(_), _) =>
rec((src, bs))
case ((p: ForceToDisk[a], bs), rec) =>
val newBs =
if (isLogicalReduce(p.input)) OnlyMapping
else Materialized
val matP = rec((p.input, newBs))
bs match {
case Write =>
// there is no need force to disk immediately before a write
matP
case _ => mat.materialize(matP)
}
case ((it@IterablePipe(_), _), _) =>
mat.pure(it)
case (p: MapValues[a, b, c], rec) =>
mat.map(rec(p.input))(MapValues(_: TypedPipe[(a, b)], p.fn))
case (p: Mapped[a, b], rec) =>
mat.map(rec(p.input))(Mapped(_: TypedPipe[a], p.fn))
case (p: MergedTypedPipe[a], rec) =>
val mleft = rec(p.left)
val mright = rec(p.right)
case ((p: MapValues[a, b, c], bs), rec) =>
mat.map(rec((p.input, bs | OnlyMapping)))(MapValues(_: TypedPipe[(a, b)], p.fn))
case ((p: Mapped[a, b], bs), rec) =>
mat.map(rec((p.input, bs | OnlyMapping)))(Mapped(_: TypedPipe[a], p.fn))
case ((p: MergedTypedPipe[a], bs), rec) =>
val mleft = rec((p.left, bs))
val mright = rec((p.right, bs))
val both = mat.zip(mleft, mright)
mat.map(both) { case (l, r) => MergedTypedPipe(l, r) }
case (src@SourcePipe(_), _) =>
case ((src@SourcePipe(_), _), _) =>
mat.pure(src)
case (p: SumByLocalKeys[a, b], rec) =>
mat.map(rec(p.input))(SumByLocalKeys(_: TypedPipe[(a, b)], p.semigroup))
case (p: TrappedPipe[a], rec) =>
mat.map(rec(p.input))(TrappedPipe[a](_: TypedPipe[a], p.sink, p.conv))
case (p: WithDescriptionTypedPipe[a], rec) =>
mat.map(rec(p.input))(WithDescriptionTypedPipe(_: TypedPipe[a], p.descriptions))
case (p: WithOnComplete[a], rec) =>
mat.map(rec(p.input))(WithOnComplete(_: TypedPipe[a], p.fn))
case (EmptyTypedPipe, _) =>
case ((p: SumByLocalKeys[a, b], bs), rec) =>
mat.map(rec((p.input, bs | OnlyMapping)))(SumByLocalKeys(_: TypedPipe[(a, b)], p.semigroup))
case ((p: TrappedPipe[a], bs), rec) =>
// TODO: it is a bit unclear if a trap is allowed on the back of a reduce?
mat.map(rec((p.input, bs)))(TrappedPipe[a](_: TypedPipe[a], p.sink, p.conv))
case ((p: WithDescriptionTypedPipe[a], bs), rec) =>
mat.map(rec((p.input, bs)))(WithDescriptionTypedPipe(_: TypedPipe[a], p.descriptions))
case ((p: WithOnComplete[a], bs), rec) =>
mat.map(rec((p.input, bs)))(WithOnComplete(_: TypedPipe[a], p.fn))
case ((EmptyTypedPipe, _), _) =>
mat.pure(EmptyTypedPipe)
case (hg: HashCoGroup[a, b, c, d], rec) =>
handleHashCoGroup(hg, rec)
case (CoGroupedPipe(cg), f) =>
// simple version puts a checkpoint here
mat.materialize(handleCoGrouped(cg, f))
case (ReduceStepPipe(rs), f) =>
// simple version puts a checkpoint here
mat.materialize(handleReduceStep(rs, f))
case ((hg: HashCoGroup[a, b, c, d], bs), rec) =>
val withBs = new FunctionK[TypedPipe, P] {
def toFunction[A] = { tp => (tp, bs | OnlyMapping) }
}
// TODO: hashJoins may not be allowed in a reduce step in cascading,
// not clear
val recHG = FunctionK.andThen[TypedPipe, P, mat.TP](withBs, rec)
handleHashCoGroup(hg, recHG)
case ((CoGroupedPipe(cg), bs), rec) =>
val withBs = new FunctionK[TypedPipe, P] {
def toFunction[A] = { tp => (tp, bs | Materialized) }
}
// TODO: hashJoins may not be allowed in a reduce step in cascading,
// not clear
val recHG = FunctionK.andThen[TypedPipe, P, mat.TP](withBs, rec)
val hcg = handleCoGrouped(cg, recHG)
bs match {
case BelowState.Materialized => mat.materialize(hcg)
case _ => hcg
}
case ((ReduceStepPipe(rs), bs), rec) =>
val withBs = new FunctionK[TypedPipe, P] {
def toFunction[A] = { tp => (tp, bs | BelowState.Materialized) }
}
// TODO: hashJoins may not be allowed in a reduce step in cascading,
// not clear
val recHG = FunctionK.andThen[TypedPipe, P, mat.TP](withBs, rec)
val hrs = handleReduceStep(rs, recHG)
bs match {
case BelowState.Materialized => mat.materialize(hrs)
case _ => hrs
}
}
})

def write[A](p: PairK[Id, S, A]): (M[TypedPipe[A]], S[A]) = {
val materialized: M[TypedPipe[A]] = fn(optDag.evaluate(p._1))
val materialized: M[TypedPipe[A]] = fn((optDag.evaluate(p._1), BelowState.Write))
(materialized, p._2)
}

Expand Down
Loading