Skip to content

Commit

Permalink
[apache#1109] fix(tez): Fix the user of remote storage. (apache#1128)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

tez application should use login ugi to  communicate with shuffle sever, but not applicationid ugi.

### Why are the changes needed?

Fix: apache#1109

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

test in real cluster.
  • Loading branch information
zhengchenyu authored Aug 10, 2023
1 parent 2f46358 commit 0bde6ba
Showing 1 changed file with 54 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -34,6 +35,7 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.common.GetShuffleServerRequest;
Expand Down Expand Up @@ -76,6 +78,7 @@ public class TezRemoteShuffleManager implements ServicePluginLifecycle {
private TezRemoteShuffleUmbilicalProtocolImpl tezRemoteShuffleUmbilical;
private ShuffleWriteClient rssClient;
private String appId;
private UserGroupInformation requestUgi;
private RemoteStorageInfo remoteStorage;

public TezRemoteShuffleManager(
Expand All @@ -84,13 +87,15 @@ public TezRemoteShuffleManager(
Configuration conf,
String appId,
ShuffleWriteClient rssClient,
RemoteStorageInfo remoteStorage) {
RemoteStorageInfo remoteStorage)
throws IOException {
this.tokenIdentifier = tokenIdentifier;
this.sessionToken = sessionToken;
this.conf = conf;
this.appId = appId;
this.rssClient = rssClient;
this.tezRemoteShuffleUmbilical = new TezRemoteShuffleUmbilicalProtocolImpl();
this.requestUgi = UserGroupInformation.getCurrentUser();
this.remoteStorage = remoteStorage;
}

Expand Down Expand Up @@ -197,42 +202,54 @@ private ShuffleAssignmentsInfo getShuffleWorks(int partitionNum, int shuffleId)
try {
shuffleAssignmentsInfo =
RetryUtils.retry(
() -> {
ShuffleAssignmentsInfo shuffleAssignments =
rssClient.getShuffleAssignments(
appId,
shuffleId,
partitionNum,
1,
Sets.newHashSet(assignmentTags),
requiredAssignmentShuffleServersNum,
-1);

Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges =
shuffleAssignments.getServerToPartitionRanges();

if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) {
return null;
}
LOG.info("Start to register shuffle");
long start = System.currentTimeMillis();
serverToPartitionRanges
.entrySet()
.forEach(
entry ->
rssClient.registerShuffle(
entry.getKey(),
appId,
shuffleId,
entry.getValue(),
remoteStorage,
ShuffleDataDistributionType.NORMAL,
RssTezConfig.toRssConf(conf)
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE)));
LOG.info(
"Finish register shuffle with " + (System.currentTimeMillis() - start) + " ms");
return shuffleAssignments;
},
// When communicate with TezRemoteShuffleUmbilicalProtocol, tez use applicationId
// as ugi name. In security hdfs cluster, if we communicate with shuffle server with
// applicationId ugi, the user of remote storage will be application_xxx_xx
// As we knonw, the max id of hadoop user is 16777215. So we should use execute ugi.
() ->
requestUgi.doAs(
new PrivilegedExceptionAction<ShuffleAssignmentsInfo>() {
@Override
public ShuffleAssignmentsInfo run() throws Exception {
ShuffleAssignmentsInfo shuffleAssignments =
rssClient.getShuffleAssignments(
appId,
shuffleId,
partitionNum,
1,
Sets.newHashSet(assignmentTags),
requiredAssignmentShuffleServersNum,
-1);

Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges =
shuffleAssignments.getServerToPartitionRanges();

if (serverToPartitionRanges == null
|| serverToPartitionRanges.isEmpty()) {
return null;
}
LOG.info("Start to register shuffle");
long start = System.currentTimeMillis();
serverToPartitionRanges
.entrySet()
.forEach(
entry ->
rssClient.registerShuffle(
entry.getKey(),
appId,
shuffleId,
entry.getValue(),
remoteStorage,
ShuffleDataDistributionType.NORMAL,
RssTezConfig.toRssConf(conf)
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE)));
LOG.info(
"Finish register shuffle with "
+ (System.currentTimeMillis() - start)
+ " ms");
return shuffleAssignments;
}
}),
retryInterval,
retryTimes);
} catch (Throwable throwable) {
Expand Down

0 comments on commit 0bde6ba

Please sign in to comment.