Skip to content

Commit

Permalink
[ENV-428] Prevent refactored steps being basis for new micro-batches …
Browse files Browse the repository at this point in the history
…(#298)
  • Loading branch information
Jeremy Beard authored and Ian Buss committed Apr 30, 2019
1 parent 5587388 commit 2832c35
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 18 deletions.
14 changes: 0 additions & 14 deletions core/src/main/java/com/cloudera/labs/envelope/run/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ private void runBatch(Set<Step> steps) throws Exception {
LOG.debug("Looking into step: " + step.getName());

if (step instanceof BatchStep) {
LOG.debug("Step is batch");
BatchStep batchStep = (BatchStep)step;

if (batchStep.getState() == StepState.WAITING) {
Expand All @@ -248,12 +247,7 @@ private void runBatch(Set<Step> steps) throws Exception {
// retrieve those and add them in.
newSteps.addAll(batchStep.loadNewBatchSteps());
}
else if (step instanceof StreamingStep) {
LOG.debug("Step is streaming");
}
else if (step instanceof RefactorStep) {
LOG.debug("Step is a refactor step");

RefactorStep refactorStep = (RefactorStep)step;

if (refactorStep.getState() == StepState.WAITING) {
Expand All @@ -271,8 +265,6 @@ else if (step instanceof RefactorStep) {
}
}
else if (step instanceof TaskStep) {
LOG.debug("Step is a task");

TaskStep taskStep = (TaskStep)step;

if (taskStep.getState() == StepState.WAITING) {
Expand All @@ -288,12 +280,6 @@ else if (step instanceof TaskStep) {
}
}
}
else if (step instanceof StreamingStep) {
LOG.debug("Step is streaming");
}
else {
throw new RuntimeException("Unknown step class type: " + step.getClass().getName());
}

LOG.debug("Finished looking into step: " + step.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static String getApplicationID() {
}

public static synchronized SparkSession getSparkSession() {
if (INSTANCE.ss == null) {
if (!hasSparkSession()) {
startSparkSession();
}

Expand Down Expand Up @@ -126,6 +126,7 @@ public static void initialize(Config config, ExecutionMode mode) {
INSTANCE.config = config.hasPath(APPLICATION_SECTION_PREFIX) ?
config.getConfig(APPLICATION_SECTION_PREFIX) : ConfigFactory.empty();
INSTANCE.mode = mode;
closeSparkSession();
getSparkSession();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public static Set<Step> mergeLoadedSteps(Set<Step> baseSteps, Step parentStep, C
return mergedSteps;
}
else {
return baseSteps;
return Sets.newHashSet(baseSteps);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public static class StreamDummyTask implements Task {

@Override
public void configure(Config config) {
loadCount = config.getInt("load-count");
loadCount = ConfigUtils.getOrElse(config, "load-count", 0);
}

@Override
Expand Down
42 changes: 42 additions & 0 deletions core/src/test/java/com/cloudera/labs/envelope/run/TestRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.cloudera.labs.envelope.plan.BulkPlanner;
import com.cloudera.labs.envelope.plan.MutationType;
import com.cloudera.labs.envelope.spark.Contexts;
import com.cloudera.labs.envelope.task.Task;
import com.cloudera.labs.envelope.utils.ConfigUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -47,6 +48,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class TestRunner {

Expand Down Expand Up @@ -218,5 +220,45 @@ public void testExceptionEvent() {

assertTrue(eventTypes.contains(CoreEventTypes.PIPELINE_EXCEPTION_OCCURRED));
}

public static class CheckRefactoredStepsContainedTask implements Task {
private static int runCount = 0;

@Override
public void configure(Config config) { }

@Override
public void run(Map<String, Dataset<Row>> dependencies) {
if (dependencies.size() > 2) {
throw new RuntimeException("Too many loop iterations!");
}

if (++runCount == 2) {
throw new RuntimeException("End of stream");
}
}
}

@Test
public void testRefactoredStepsContainedToStreamBatch() throws Exception {
Config config = ConfigUtils.configFromResource("/run/refactored_contained.conf");

try {
new Runner().run(config);
}
catch (Exception e) {
if (e.getMessage().equals("End of stream")) {
return;
}
else if (e.getMessage().equals("Too many loop iterations!")) {
fail("Micro-batch loop ran too many iterations");
}
else {
throw e;
}
}

fail("Pipeline expected to throw an exception");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.cloudera.labs.envelope.utils.ConfigUtils;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.AnalysisException;
import org.junit.Before;
Expand Down
43 changes: 43 additions & 0 deletions core/src/test/resources/run/refactored_contained.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
application {
batch.milliseconds = 1000
}

steps {
stream {
input {
type = "com.cloudera.labs.envelope.configuration.TestConfigLoader$DummyStreamInput"
translator {
type = "com.cloudera.labs.envelope.configuration.TestConfigLoader$DummyTranslator"
}
}
}

loop {
dependencies = [stream]
type = loop
mode = serial
source = range
range {
start = 1
end = 2
}
parameter = not_used
}

iteration {
dependencies = [loop]
deriver {
type = "com.cloudera.labs.envelope.run.TestRunner$TestingSQLDeriver"
query.literal = "SELECT 1"
}
}

// Runs after the loop is finished
check_refactored {
dependencies = [iteration]
type = task
task {
type = "com.cloudera.labs.envelope.run.TestRunner$CheckRefactoredStepsContainedTask"
}
}
}

0 comments on commit 2832c35

Please sign in to comment.