diff --git a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java index 4b4c6eca09..d1998eeb70 100644 --- a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java +++ b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -34,6 +35,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.token.Token; import org.apache.tez.common.GetShuffleServerRequest; @@ -76,6 +78,7 @@ public class TezRemoteShuffleManager implements ServicePluginLifecycle { private TezRemoteShuffleUmbilicalProtocolImpl tezRemoteShuffleUmbilical; private ShuffleWriteClient rssClient; private String appId; + private UserGroupInformation requestUgi; private RemoteStorageInfo remoteStorage; public TezRemoteShuffleManager( @@ -84,13 +87,15 @@ public TezRemoteShuffleManager( Configuration conf, String appId, ShuffleWriteClient rssClient, - RemoteStorageInfo remoteStorage) { + RemoteStorageInfo remoteStorage) + throws IOException { this.tokenIdentifier = tokenIdentifier; this.sessionToken = sessionToken; this.conf = conf; this.appId = appId; this.rssClient = rssClient; this.tezRemoteShuffleUmbilical = new TezRemoteShuffleUmbilicalProtocolImpl(); + this.requestUgi = UserGroupInformation.getCurrentUser(); this.remoteStorage = remoteStorage; } @@ -197,42 +202,54 @@ private ShuffleAssignmentsInfo getShuffleWorks(int partitionNum, int shuffleId) try { shuffleAssignmentsInfo = RetryUtils.retry( - () -> { - ShuffleAssignmentsInfo shuffleAssignments = - rssClient.getShuffleAssignments( - appId, - shuffleId, - partitionNum, - 1, - Sets.newHashSet(assignmentTags), - requiredAssignmentShuffleServersNum, - -1); - - Map> serverToPartitionRanges = - shuffleAssignments.getServerToPartitionRanges(); - - if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) { - return null; - } - LOG.info("Start to register shuffle"); - long start = System.currentTimeMillis(); - serverToPartitionRanges - .entrySet() - .forEach( - entry -> - rssClient.registerShuffle( - entry.getKey(), - appId, - shuffleId, - entry.getValue(), - remoteStorage, - ShuffleDataDistributionType.NORMAL, - RssTezConfig.toRssConf(conf) - .get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE))); - LOG.info( - "Finish register shuffle with " + (System.currentTimeMillis() - start) + " ms"); - return shuffleAssignments; - }, + // When communicate with TezRemoteShuffleUmbilicalProtocol, tez use applicationId + // as ugi name. In security hdfs cluster, if we communicate with shuffle server with + // applicationId ugi, the user of remote storage will be application_xxx_xx + // As we knonw, the max id of hadoop user is 16777215. So we should use execute ugi. + () -> + requestUgi.doAs( + new PrivilegedExceptionAction() { + @Override + public ShuffleAssignmentsInfo run() throws Exception { + ShuffleAssignmentsInfo shuffleAssignments = + rssClient.getShuffleAssignments( + appId, + shuffleId, + partitionNum, + 1, + Sets.newHashSet(assignmentTags), + requiredAssignmentShuffleServersNum, + -1); + + Map> serverToPartitionRanges = + shuffleAssignments.getServerToPartitionRanges(); + + if (serverToPartitionRanges == null + || serverToPartitionRanges.isEmpty()) { + return null; + } + LOG.info("Start to register shuffle"); + long start = System.currentTimeMillis(); + serverToPartitionRanges + .entrySet() + .forEach( + entry -> + rssClient.registerShuffle( + entry.getKey(), + appId, + shuffleId, + entry.getValue(), + remoteStorage, + ShuffleDataDistributionType.NORMAL, + RssTezConfig.toRssConf(conf) + .get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE))); + LOG.info( + "Finish register shuffle with " + + (System.currentTimeMillis() - start) + + " ms"); + return shuffleAssignments; + } + }), retryInterval, retryTimes); } catch (Throwable throwable) {