From d6c323956aec186cdd1056131cd4c223e378df73 Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Fri, 29 May 2020 11:57:55 +0200 Subject: [PATCH] [HOPSWORKS-1818] Kakfa broker env variable not available in jobs (#577) --- .../common/jobs/erasureCode/ErasureCodeJob.java | 2 +- .../common/jobs/flink/FlinkController.java | 5 +++-- .../hops/hopsworks/common/jobs/flink/FlinkJob.java | 11 +++++------ .../common/jobs/spark/SparkController.java | 14 ++++++-------- .../hops/hopsworks/common/jobs/spark/SparkJob.java | 7 +++---- .../hops/hopsworks/common/jobs/yarn/YarnJob.java | 10 +++++----- 6 files changed, 23 insertions(+), 26 deletions(-) diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/erasureCode/ErasureCodeJob.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/erasureCode/ErasureCodeJob.java index 3710d90181..147f495b95 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/erasureCode/ErasureCodeJob.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/erasureCode/ErasureCodeJob.java @@ -60,7 +60,7 @@ public class ErasureCodeJob extends YarnJob { public ErasureCodeJob(Jobs job, AsynchronousJobExecutor services, Users user, String jobUser, String hadoopDir, Settings settings) { - super(job, services, user, jobUser, hadoopDir, settings); + super(job, services, user, jobUser, hadoopDir, settings, null); if (!(job.getJobConfig() instanceof ErasureCodeJobConfiguration)) { throw new IllegalArgumentException( diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkController.java index 367670d020..01c73cbdef 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkController.java @@ -132,8 +132,9 @@ public Execution startJob(final Jobs job, final Users user) throws GenericExcept try { UserGroupInformation proxyUser = ugiService.getProxyUser(username); try { - flinkjob = proxyUser.doAs((PrivilegedExceptionAction) () -> new FlinkJob(job, submitter, user, - hdfsUsersBean.getHdfsUserName(job.getProject(), job.getCreator()), settings)); + flinkjob = proxyUser.doAs((PrivilegedExceptionAction) () -> + new FlinkJob(job, submitter, user, hdfsUsersBean.getHdfsUserName(job.getProject(), job.getCreator()), + settings, kafkaBrokers.getKafkaBrokersString())); } catch (InterruptedException ex) { LOGGER.log(Level.SEVERE, null, ex); } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkJob.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkJob.java index 49d0bab78e..ed0239c70c 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkJob.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkJob.java @@ -64,13 +64,12 @@ public class FlinkJob extends YarnJob { FlinkJob.class.getName()); private FlinkYarnRunnerBuilder flinkBuilder; - FlinkJob(Jobs job, AsynchronousJobExecutor services, - Users user, String jobUser, - Settings settings) { - super(job, services, user, jobUser, settings.getHadoopSymbolicLinkDir(), settings); + FlinkJob(Jobs job, AsynchronousJobExecutor services, Users user, String jobUser, + Settings settings, String kafkaBrokersString) { + super(job, services, user, jobUser, settings.getHadoopSymbolicLinkDir(), settings, kafkaBrokersString); + if (!(job.getJobConfig() instanceof FlinkJobConfiguration)) { - throw new IllegalArgumentException( - "Job must contain a FlinkJobConfiguration object. Received: " + throw new IllegalArgumentException("Job must contain a FlinkJobConfiguration object. Received: " + job.getJobConfig().getClass()); } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkController.java index 1200fb3597..7821ac05eb 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkController.java @@ -40,6 +40,7 @@ package io.hops.hopsworks.common.jobs.spark; import com.google.common.base.Strings; +import io.hops.hopsworks.common.kafka.KafkaBrokers; import io.hops.hopsworks.persistence.entity.jobs.history.Execution; import io.hops.hopsworks.persistence.entity.jobs.configuration.ExperimentType; import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration; @@ -96,6 +97,8 @@ public class SparkController { private JupyterController jupyterController; @EJB private DistributedFsService dfs; + @EJB + private KafkaBrokers kafkaBrokers; /** * Start the Spark job as the given user. @@ -136,14 +139,9 @@ public Execution startJob(final Jobs job, String args, final Users user) try { UserGroupInformation proxyUser = ugiService.getProxyUser(username); try { - sparkjob = proxyUser.doAs(new PrivilegedExceptionAction() { - @Override - public SparkJob run() { - return new SparkJob(job, submitter, user, settings.getHadoopSymbolicLinkDir(), - job.getProject().getName() + "__" - + user.getUsername(), settings); - } - }); + sparkjob = proxyUser.doAs((PrivilegedExceptionAction) () -> + new SparkJob(job, submitter, user, settings.getHadoopSymbolicLinkDir(), + hdfsUsersBean.getHdfsUserName(job.getProject(), user), settings, kafkaBrokers.getKafkaBrokersString())); } catch (InterruptedException ex) { LOGGER.log(Level.SEVERE, null, ex); } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkJob.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkJob.java index 8377779a40..88fed8424d 100755 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkJob.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkJob.java @@ -62,10 +62,9 @@ public class SparkJob extends YarnJob { private static final Logger LOG = Logger.getLogger(SparkJob.class.getName()); private SparkYarnRunnerBuilder runnerbuilder; - SparkJob(Jobs job, AsynchronousJobExecutor services, - Users user, final String hadoopDir, - String jobUser, Settings settings) { - super(job, services, user, jobUser, hadoopDir, settings); + SparkJob(Jobs job, AsynchronousJobExecutor services, Users user, final String hadoopDir, + String jobUser, Settings settings, String kafkaBrokersString) { + super(job, services, user, jobUser, hadoopDir, settings, kafkaBrokersString); if (!(job.getJobConfig() instanceof SparkJobConfiguration)) { throw new IllegalArgumentException( "JobDescription must contain a SparkJobConfiguration object. Received: " diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJob.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJob.java index d9b7177312..15b8515b38 100755 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJob.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJob.java @@ -93,15 +93,15 @@ public abstract class YarnJob extends HopsJob { * @throws IllegalArgumentException If the Jobs does not contain a YarnJobConfiguration object. */ - public YarnJob(Jobs job, AsynchronousJobExecutor services, - Users user, String jobUser, String hadoopDir, - Settings settings) { + public YarnJob(Jobs job, AsynchronousJobExecutor services, Users user, String jobUser, + String hadoopDir, Settings settings, String kafkaBrokersString) { super(job, services, user, hadoopDir); + if (!(job.getJobConfig() instanceof YarnJobConfiguration)) { - throw new IllegalArgumentException( - "Job must be a YarnJobConfiguration object. Received class: " + throw new IllegalArgumentException("Job must be a YarnJobConfiguration object. Received class: " + job.getJobConfig().getClass()); } + LOG.log(Level.INFO, "Instantiating Yarn job as user: {0}", hdfsUser); this.jobSystemProperties = new HashMap<>(); this.projectLocalResources = new ArrayList<>();