From 51d49c73057fff256f8cf120321ce49f682abbde Mon Sep 17 00:00:00 2001 From: xiejialong Date: Tue, 21 Feb 2023 22:05:16 +0800 Subject: [PATCH] Limit the initial size of spark executor --- .../kylin/engine/spark/application/SparkApplication.java | 8 ++++++++ .../apache/kylin/engine/spark/utils/SparkConfHelper.java | 2 ++ .../scala/org/apache/spark/conf/rule/SparkConfRule.scala | 3 ++- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index 9f8b7dea462..7f8e46c94d3 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -343,6 +343,14 @@ private void autoSetSparkConf(SparkConf sparkConf) throws Exception { helper.setConf(SparkConfHelper.COUNT_DISTICT, hasCountDistinct().toString()); helper.generateSparkConf(); helper.applySparkConf(sparkConf); + helper.setConf(SparkConfHelper.COUNT_DISTICT, hasCountDistinct().toString()); + if (Boolean.valueOf(configOverride.getOrDefault("spark.dynamicAllocation.enabled", "false"))) { + int maxExecutors = Integer.valueOf(configOverride.getOrDefault("spark.dynamicAllocation.maxExecutors", + String.valueOf(Integer.MAX_VALUE))); + helper.setConf(SparkConfHelper.MAX_EXECUTORS, String.valueOf(maxExecutors)); + } else { + helper.setConf(SparkConfHelper.MAX_EXECUTORS, String.valueOf(Integer.MAX_VALUE)); + } } protected String chooseContentSize(Path shareDir) throws IOException { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/SparkConfHelper.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/SparkConfHelper.java index 5edbaeb3e9f..d9fc10cd932 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/SparkConfHelper.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/SparkConfHelper.java @@ -63,6 +63,8 @@ public class SparkConfHelper { public static final String DRIVER_OVERHEAD = "spark.driver.memoryOverhead"; public static final String DRIVER_CORES = "spark.driver.cores"; public static final String COUNT_DISTICT = "count_distinct"; + public static final String MAX_EXECUTORS = "spark.dynamicAllocation.maxExecutors"; + private static final List EXECUTOR_RULES = ImmutableList.of(new ExecutorMemoryRule(), new ExecutorCoreRule(), new ExecutorOverheadRule(), new ExecutorInstancesRule(), diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/conf/rule/SparkConfRule.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/conf/rule/SparkConfRule.scala index 781d1b012bc..65012f27fc4 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/conf/rule/SparkConfRule.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/conf/rule/SparkConfRule.scala @@ -137,7 +137,8 @@ class ExecutorInstancesRule extends SparkConfRule { val needInstance = Math.max(calculateExecutorInsByLayoutSize.toLong, requiredCores.toInt / executorCore) val instance = Math.min(needInstance, queueAvailableInstance) - val executorInstance = Math.max(instance.toLong, baseExecutorInstances.toLong).toString + val dynamicMaxExecutors = helper.getConf(SparkConfHelper.MAX_EXECUTORS) + val executorInstance = Math.min(Math.max(instance.toLong, baseExecutorInstances.toLong), dynamicMaxExecutors.toLong).toString logInfo(s"Current queueAvailableInstance is $queueAvailableInstance, " + s"needInstance is $needInstance, instance is $instance") helper.setConf(SparkConfHelper.EXECUTOR_INSTANCES, executorInstance)