Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always create different cascading pipes out of forked cross typed pipe #1908

Merged
merged 6 commits into from
Apr 17, 2019

Conversation

ttim
Copy link
Collaborator

@ttim ttim commented Apr 3, 2019

As described in #1905 currently scalding might create forks in cascading graph after cross nodes. This is almost never wanted and broke one of production jobs in Twitter - job started to produce petabytes of intermediate data.

In this PR:

  • I've added two tests for the issue. One with old .toPipe api and another one with two different maps after cross.
  • Changed CascadingBackend in a way it never caches cascading pipe created out of cross operation.

@ttim
Copy link
Collaborator Author

ttim commented Apr 3, 2019

I've tried to write optimization rule to ensure the same, but:

  • It's not going to cover .toPipe use case
  • I don't know how to write it properly given current Rule abstraction. Basically we want to optimize cross <- fork <- op1 & cross <- fork <- op2 to fork <- cross <- op1 & fork <- cross <- op2, so this rule cannot be over fork or cross nodes and should be over op1 & op2 nodes. Any ideas? cc/ @johnynek

@johnynek
Copy link
Collaborator

johnynek commented Apr 3, 2019

I thought a rule like:

case Fork(Cross(a, b)) => Cross(Fork(a), Fork(b))

might be all that is needed: we always push Fork above a cross. Will that not work?

Note: we do apply optimization rules to toPipe:
https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala#L380

@ttim
Copy link
Collaborator Author

ttim commented Apr 3, 2019

Note: we do apply optimization rules to toPipe

Yes, we do, but this is not going to help in this case. Issue arise because a) we cache TypedPipe => CascadingPipe mapping, b) we optimize two .toPipe calls independently.
First test case (https://github.com/twitter/scalding/pull/1908/files#diff-14795c9a6959be6381e99caaafba874dR1569) covers this case.
In this test case .toPipe called twice and after optimization the only common part of this two calls is cross and since it's cached it ends up being cascading's fork.

I thought a rule like:
case Fork(Cross(a, b)) => Cross(Fork(a), Fork(b))

This I might misunderstand but I don't see how it helps. If you do this, subsequent maps (like in second test https://github.com/twitter/scalding/pull/1908/files#diff-14795c9a6959be6381e99caaafba874dR1615) will be dependent on the same Cross(Fork(a), Fork(b)) so it will end up with being same fork on cascading graph.

@ttim
Copy link
Collaborator Author

ttim commented Apr 3, 2019

And I have related question - is there any way right now to force scalding to compute same typed pipe twice? Is it going to be always squashed? I.e. is it true that:

TypedPipe.from(source).map(fn1).map(fn2).write(sink1)
TypedPipe.from(source).map(fn1).map(fn3).write(sink2)

Always ends up with having fork after .map(fn1) and there is no way to force other behaviour (imagine use case where fn1 is very cheap and doesn't shrink data)?

@johnynek
Copy link
Collaborator

johnynek commented Apr 3, 2019

Currently no. There is no way for rerun. In this case you probably don’t want to materialize. We could choose to not materialize pure map operations if they could be helped by moving the fork above Map operations, then map fusion should do the thing we want.

@johnynek
Copy link
Collaborator

johnynek commented Apr 3, 2019

flatMap is more dubious since it often used for filtering as well.

@ttim
Copy link
Collaborator Author

ttim commented Apr 3, 2019

Currently no. There is no way for rerun. In this case you probably don’t want to materialize. We could choose to not materialize pure map operations if they could be helped by moving the fork above Map operations, then map fusion should do the thing we want.

I see. I think in future we might want to have .opaque which is opposite to .forceToDisk in a way.

@ttim
Copy link
Collaborator Author

ttim commented Apr 4, 2019

@johnynek tests are passing now so I think it's good to go, wdyt?

As a side note I found that some counters (like org.apache.hadoop.mapreduce.FileSystemCounter#FILE_BYTES_WRITTEN) aren't properly nullified between runs with runHadoop =(.

}
}

private val transform: RecursiveK[TypedPipe, CascadingPipe] = new RecursiveK[TypedPipe, CascadingPipe] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m worried this is causing nothing below here to be cached. Note the use of rec here used to cache. Now it has been disabled for everything in here.

I think this is a pretty large difference from the current behavior and basically only the outer most pipe will be cache. I don’t think that’s what we want.

Copy link
Collaborator Author

@ttim ttim Apr 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess code isn't 100% clear, but everything apart from Cross implementation (cross itself, and not left and right arguments to it), Fork and WithDescriptionTypedPipe will be cached.

transform here is pure implementation of cascading backend tranformation without any caching logic applied. On top of that caching logic applied (in cached) which is:

  • don't cache Fork and WithDescriptionTypedPipe
  • don't cache Cross, but cache left and right branches of it
  • cache everything else

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think adding comments will help. There are two layers of caching here that is confusing.

  1. the first cache is the explicit HCache
  2. the second cache is the RecursiveK which caches each call.

So, I think you are right, the transform is caching, but I think nested Cross will be cached by transform.

so a.cross(b).cross(c) I think will hit the inner cross inside transform, which will be cached.

Do you think that's not right?

Copy link
Collaborator Author

@ttim ttim Apr 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might know where your confusion comes from: before this code was using Memoize over RecursiveK. Now this transform function isn't caching at all and I basically reimplemented it's caching ability explicitly with a bit different policy. And since core transforming function is still RecursiveK by essence I reused it from Memoize.

In the case you wrote - first cross will be not cached, then it will use notCached(Set(a.cross(b)), c)) which leads to cached(a.cross(b)) which leads to not caching inner cross as well. I guess I should rename .cached to .withCachePolicy or something like this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, yeah. I was getting confused with new RecursiveK thinking it was caching. It isn't. I see it is not controlling how the recursion is done.

Instead, you call it with transform(tp, this) which recurses using the entire outer strategy.

Yeah, I now think I understand and think it is correct, but I think it is a bit subtle. Can we add a comment that describes and summarizes the points of confusion and clears them up. We are going be confused later:

  1. why are we doing this? We are avoiding caching certain node types because they make bad plans that are usually worse.
  2. how are we achieving this? We are wiring up this kind of co-recursive check so that the recursion has a branch and is sure not to cache certain types of nodes...

something like this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed RecursiveK usage. I think it's more straightforward and simpler this way. Also added comment explaining motivation for this logic.

@ttim
Copy link
Collaborator Author

ttim commented Apr 17, 2019

@johnynek do you mind to take a look on updated version?

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the latency. The new version is clearer to me now, thanks.

It would really be ideal to have a better algorithm for estimating if we should materialize or not a particular pipe.

@johnynek johnynek merged commit e541c3f into twitter:develop Apr 17, 2019
@ttim
Copy link
Collaborator Author

ttim commented Apr 17, 2019

@johnynek I have a feeling like at least all hashJoins should end up being not materialized. And at most everything?

@johnynek
Copy link
Collaborator

I don’t think it is clear. HashJoins can often be used to filter, and I think we generally would want to materialize filters that can out.

I think we certainly want to materialize joins that fan out, no? That would be crazy to recompute joins wouldn’t it be?

@johnynek
Copy link
Collaborator

Maybe the rule is as simple as never materialize map-only operations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants