Skip to content

Commit

Permalink
[HOPSWORKS-1818] Kakfa broker env variable not available in jobs (#577)
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf authored May 29, 2020
1 parent aca2eba commit d6c3239
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class ErasureCodeJob extends YarnJob {
public ErasureCodeJob(Jobs job, AsynchronousJobExecutor services,
Users user, String jobUser, String hadoopDir, Settings settings) {

super(job, services, user, jobUser, hadoopDir, settings);
super(job, services, user, jobUser, hadoopDir, settings, null);

if (!(job.getJobConfig() instanceof ErasureCodeJobConfiguration)) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ public Execution startJob(final Jobs job, final Users user) throws GenericExcept
try {
UserGroupInformation proxyUser = ugiService.getProxyUser(username);
try {
flinkjob = proxyUser.doAs((PrivilegedExceptionAction<FlinkJob>) () -> new FlinkJob(job, submitter, user,
hdfsUsersBean.getHdfsUserName(job.getProject(), job.getCreator()), settings));
flinkjob = proxyUser.doAs((PrivilegedExceptionAction<FlinkJob>) () ->
new FlinkJob(job, submitter, user, hdfsUsersBean.getHdfsUserName(job.getProject(), job.getCreator()),
settings, kafkaBrokers.getKafkaBrokersString()));
} catch (InterruptedException ex) {
LOGGER.log(Level.SEVERE, null, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,12 @@ public class FlinkJob extends YarnJob {
FlinkJob.class.getName());
private FlinkYarnRunnerBuilder flinkBuilder;

FlinkJob(Jobs job, AsynchronousJobExecutor services,
Users user, String jobUser,
Settings settings) {
super(job, services, user, jobUser, settings.getHadoopSymbolicLinkDir(), settings);
FlinkJob(Jobs job, AsynchronousJobExecutor services, Users user, String jobUser,
Settings settings, String kafkaBrokersString) {
super(job, services, user, jobUser, settings.getHadoopSymbolicLinkDir(), settings, kafkaBrokersString);

if (!(job.getJobConfig() instanceof FlinkJobConfiguration)) {
throw new IllegalArgumentException(
"Job must contain a FlinkJobConfiguration object. Received: "
throw new IllegalArgumentException("Job must contain a FlinkJobConfiguration object. Received: "
+ job.getJobConfig().getClass());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
package io.hops.hopsworks.common.jobs.spark;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.persistence.entity.jobs.history.Execution;
import io.hops.hopsworks.persistence.entity.jobs.configuration.ExperimentType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration;
Expand Down Expand Up @@ -96,6 +97,8 @@ public class SparkController {
private JupyterController jupyterController;
@EJB
private DistributedFsService dfs;
@EJB
private KafkaBrokers kafkaBrokers;

/**
* Start the Spark job as the given user.
Expand Down Expand Up @@ -136,14 +139,9 @@ public Execution startJob(final Jobs job, String args, final Users user)
try {
UserGroupInformation proxyUser = ugiService.getProxyUser(username);
try {
sparkjob = proxyUser.doAs(new PrivilegedExceptionAction<SparkJob>() {
@Override
public SparkJob run() {
return new SparkJob(job, submitter, user, settings.getHadoopSymbolicLinkDir(),
job.getProject().getName() + "__"
+ user.getUsername(), settings);
}
});
sparkjob = proxyUser.doAs((PrivilegedExceptionAction<SparkJob>) () ->
new SparkJob(job, submitter, user, settings.getHadoopSymbolicLinkDir(),
hdfsUsersBean.getHdfsUserName(job.getProject(), user), settings, kafkaBrokers.getKafkaBrokersString()));
} catch (InterruptedException ex) {
LOGGER.log(Level.SEVERE, null, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ public class SparkJob extends YarnJob {
private static final Logger LOG = Logger.getLogger(SparkJob.class.getName());
private SparkYarnRunnerBuilder runnerbuilder;

SparkJob(Jobs job, AsynchronousJobExecutor services,
Users user, final String hadoopDir,
String jobUser, Settings settings) {
super(job, services, user, jobUser, hadoopDir, settings);
SparkJob(Jobs job, AsynchronousJobExecutor services, Users user, final String hadoopDir,
String jobUser, Settings settings, String kafkaBrokersString) {
super(job, services, user, jobUser, hadoopDir, settings, kafkaBrokersString);
if (!(job.getJobConfig() instanceof SparkJobConfiguration)) {
throw new IllegalArgumentException(
"JobDescription must contain a SparkJobConfiguration object. Received: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ public abstract class YarnJob extends HopsJob {
* @throws IllegalArgumentException If the Jobs does not contain a
YarnJobConfiguration object.
*/
public YarnJob(Jobs job, AsynchronousJobExecutor services,
Users user, String jobUser, String hadoopDir,
Settings settings) {
public YarnJob(Jobs job, AsynchronousJobExecutor services, Users user, String jobUser,
String hadoopDir, Settings settings, String kafkaBrokersString) {
super(job, services, user, hadoopDir);

if (!(job.getJobConfig() instanceof YarnJobConfiguration)) {
throw new IllegalArgumentException(
"Job must be a YarnJobConfiguration object. Received class: "
throw new IllegalArgumentException("Job must be a YarnJobConfiguration object. Received class: "
+ job.getJobConfig().getClass());
}

LOG.log(Level.INFO, "Instantiating Yarn job as user: {0}", hdfsUser);
this.jobSystemProperties = new HashMap<>();
this.projectLocalResources = new ArrayList<>();
Expand Down

0 comments on commit d6c3239

Please sign in to comment.