-
Notifications
You must be signed in to change notification settings - Fork 706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix the WritePartitioner to exactly match cascading #1805
Conversation
We were failing the laws previously, sometimes taking more than n + 1 steps where cascading took n. By improving the logic, we fix those bugs and reach actually exactly matching cascading. This should allow use batching to bypass any case of cascading taking too long to plan.
@non helped me get the algorithms right here. Thanks! |
fixes #1804 |
@@ -215,80 +215,163 @@ object WritePartitioner { | |||
} | |||
} | |||
|
|||
/** | |||
* If cascading would conside the current pipe as a Logical reduce |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
conside => consider?
@@ -80,7 +81,7 @@ class WritePartitionerTest extends FunSuite with PropertyChecks { | |||
} | |||
} | |||
|
|||
forAll(TypedPipeGen.genWithFakeSources)(afterPartitioningEachStepIsSize1(_)) | |||
//forAll(TypedPipeGen.genWithFakeSources)(afterPartitioningEachStepIsSize1(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented test?
@@ -127,6 +207,6 @@ class WritePartitionerTest extends FunSuite with PropertyChecks { | |||
.waitFor(Config.empty, Local(true)).get.isEmpty) | |||
} | |||
|
|||
forAll(TypedPipeGen.genWithIterableSources)(partitioningDoesNotChange(_)) | |||
//forAll(TypedPipeGen.genWithIterableSources)(partitioningDoesNotChange(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented test?
/** | ||
* This is a lattice value that tracks | ||
* what we have seen below a given TypedPipe as | ||
* we recurse up. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is super neet/nice, could you expand the comments here -- don't think i'd have followed other than I know what your aiming to do.
(Motivation/why we are tracking/what we are aiming to do with it)
rec(src) | ||
case (Fork(src@SourcePipe(_)), rec) => | ||
case ((cp: CounterPipe[a], bs), rec) => | ||
mat.map(rec((cp.pipe, bs | Write)))(CounterPipe(_: TypedPipe[(a, Iterable[((String, String), Long)])])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is a counter pipe | Write
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't need to be, Write
is the bottom, so x | Write = x
.
lgtm |
We were failing the laws previously, sometimes taking
more than n + 1 steps where cascading took n. By
improving the logic, we fix those bugs and reach
actually exactly matching cascading.
This should allow use batching to bypass any
case of cascading taking too long to plan.