diff --git a/core/src/main/java/com/cloudera/labs/envelope/run/Runner.java b/core/src/main/java/com/cloudera/labs/envelope/run/Runner.java index 04c86b5..d80d142 100644 --- a/core/src/main/java/com/cloudera/labs/envelope/run/Runner.java +++ b/core/src/main/java/com/cloudera/labs/envelope/run/Runner.java @@ -222,7 +222,6 @@ private void runBatch(Set steps) throws Exception { LOG.debug("Looking into step: " + step.getName()); if (step instanceof BatchStep) { - LOG.debug("Step is batch"); BatchStep batchStep = (BatchStep)step; if (batchStep.getState() == StepState.WAITING) { @@ -248,12 +247,7 @@ private void runBatch(Set steps) throws Exception { // retrieve those and add them in. newSteps.addAll(batchStep.loadNewBatchSteps()); } - else if (step instanceof StreamingStep) { - LOG.debug("Step is streaming"); - } else if (step instanceof RefactorStep) { - LOG.debug("Step is a refactor step"); - RefactorStep refactorStep = (RefactorStep)step; if (refactorStep.getState() == StepState.WAITING) { @@ -271,8 +265,6 @@ else if (step instanceof RefactorStep) { } } else if (step instanceof TaskStep) { - LOG.debug("Step is a task"); - TaskStep taskStep = (TaskStep)step; if (taskStep.getState() == StepState.WAITING) { @@ -288,12 +280,6 @@ else if (step instanceof TaskStep) { } } } - else if (step instanceof StreamingStep) { - LOG.debug("Step is streaming"); - } - else { - throw new RuntimeException("Unknown step class type: " + step.getClass().getName()); - } LOG.debug("Finished looking into step: " + step.getName()); } diff --git a/core/src/main/java/com/cloudera/labs/envelope/spark/Contexts.java b/core/src/main/java/com/cloudera/labs/envelope/spark/Contexts.java index 3a11cc5..3168773 100644 --- a/core/src/main/java/com/cloudera/labs/envelope/spark/Contexts.java +++ b/core/src/main/java/com/cloudera/labs/envelope/spark/Contexts.java @@ -83,7 +83,7 @@ public static String getApplicationID() { } public static synchronized SparkSession getSparkSession() { - if (INSTANCE.ss == null) { + if (!hasSparkSession()) { startSparkSession(); } @@ -126,6 +126,7 @@ public static void initialize(Config config, ExecutionMode mode) { INSTANCE.config = config.hasPath(APPLICATION_SECTION_PREFIX) ? config.getConfig(APPLICATION_SECTION_PREFIX) : ConfigFactory.empty(); INSTANCE.mode = mode; + closeSparkSession(); getSparkSession(); } diff --git a/core/src/main/java/com/cloudera/labs/envelope/utils/StepUtils.java b/core/src/main/java/com/cloudera/labs/envelope/utils/StepUtils.java index c3ce301..88a6d89 100644 --- a/core/src/main/java/com/cloudera/labs/envelope/utils/StepUtils.java +++ b/core/src/main/java/com/cloudera/labs/envelope/utils/StepUtils.java @@ -358,7 +358,7 @@ public static Set mergeLoadedSteps(Set baseSteps, Step parentStep, C return mergedSteps; } else { - return baseSteps; + return Sets.newHashSet(baseSteps); } } diff --git a/core/src/test/java/com/cloudera/labs/envelope/configuration/TestConfigLoader.java b/core/src/test/java/com/cloudera/labs/envelope/configuration/TestConfigLoader.java index a428aa4..e7febf8 100644 --- a/core/src/test/java/com/cloudera/labs/envelope/configuration/TestConfigLoader.java +++ b/core/src/test/java/com/cloudera/labs/envelope/configuration/TestConfigLoader.java @@ -118,7 +118,7 @@ public static class StreamDummyTask implements Task { @Override public void configure(Config config) { - loadCount = config.getInt("load-count"); + loadCount = ConfigUtils.getOrElse(config, "load-count", 0); } @Override diff --git a/core/src/test/java/com/cloudera/labs/envelope/run/TestRunner.java b/core/src/test/java/com/cloudera/labs/envelope/run/TestRunner.java index 1ac63d7..e7e332b 100644 --- a/core/src/test/java/com/cloudera/labs/envelope/run/TestRunner.java +++ b/core/src/test/java/com/cloudera/labs/envelope/run/TestRunner.java @@ -26,6 +26,7 @@ import com.cloudera.labs.envelope.plan.BulkPlanner; import com.cloudera.labs.envelope.plan.MutationType; import com.cloudera.labs.envelope.spark.Contexts; +import com.cloudera.labs.envelope.task.Task; import com.cloudera.labs.envelope.utils.ConfigUtils; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -47,6 +48,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TestRunner { @@ -218,5 +220,45 @@ public void testExceptionEvent() { assertTrue(eventTypes.contains(CoreEventTypes.PIPELINE_EXCEPTION_OCCURRED)); } + + public static class CheckRefactoredStepsContainedTask implements Task { + private static int runCount = 0; + + @Override + public void configure(Config config) { } + + @Override + public void run(Map> dependencies) { + if (dependencies.size() > 2) { + throw new RuntimeException("Too many loop iterations!"); + } + + if (++runCount == 2) { + throw new RuntimeException("End of stream"); + } + } + } + + @Test + public void testRefactoredStepsContainedToStreamBatch() throws Exception { + Config config = ConfigUtils.configFromResource("/run/refactored_contained.conf"); + + try { + new Runner().run(config); + } + catch (Exception e) { + if (e.getMessage().equals("End of stream")) { + return; + } + else if (e.getMessage().equals("Too many loop iterations!")) { + fail("Micro-batch loop ran too many iterations"); + } + else { + throw e; + } + } + + fail("Pipeline expected to throw an exception"); + } } diff --git a/core/src/test/java/com/cloudera/labs/envelope/spark/TestContexts.java b/core/src/test/java/com/cloudera/labs/envelope/spark/TestContexts.java index 97c1583..bd0465a 100644 --- a/core/src/test/java/com/cloudera/labs/envelope/spark/TestContexts.java +++ b/core/src/test/java/com/cloudera/labs/envelope/spark/TestContexts.java @@ -18,7 +18,6 @@ import com.cloudera.labs.envelope.utils.ConfigUtils; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; - import org.apache.spark.SparkConf; import org.apache.spark.sql.AnalysisException; import org.junit.Before; diff --git a/core/src/test/resources/run/refactored_contained.conf b/core/src/test/resources/run/refactored_contained.conf new file mode 100644 index 0000000..bcf3fd6 --- /dev/null +++ b/core/src/test/resources/run/refactored_contained.conf @@ -0,0 +1,43 @@ +application { + batch.milliseconds = 1000 +} + +steps { + stream { + input { + type = "com.cloudera.labs.envelope.configuration.TestConfigLoader$DummyStreamInput" + translator { + type = "com.cloudera.labs.envelope.configuration.TestConfigLoader$DummyTranslator" + } + } + } + + loop { + dependencies = [stream] + type = loop + mode = serial + source = range + range { + start = 1 + end = 2 + } + parameter = not_used + } + + iteration { + dependencies = [loop] + deriver { + type = "com.cloudera.labs.envelope.run.TestRunner$TestingSQLDeriver" + query.literal = "SELECT 1" + } + } + + // Runs after the loop is finished + check_refactored { + dependencies = [iteration] + type = task + task { + type = "com.cloudera.labs.envelope.run.TestRunner$CheckRefactoredStepsContainedTask" + } + } +} \ No newline at end of file