Skip to content

Commit

Permalink
[apache#1115] improvement: Unregister shuffle explicitly when applica…
Browse files Browse the repository at this point in the history
…tion is stopped.
  • Loading branch information
zhengchenyu committed Aug 11, 2023
1 parent 0bde6ba commit 7fb0f91
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,16 @@ public class RssMRAppMaster extends MRAppMaster {
private final int rssNmHttpPort;
private final ContainerId rssContainerID;
private RssContainerAllocatorRouter rssContainerAllocator;
private ShuffleWriteClient shuffleWriteClient;

public RssMRAppMaster(
ApplicationAttemptId applicationAttemptId,
ContainerId containerId,
String nmHost,
int nmPort,
int nmHttpPort,
long appSubmitTime) {
long appSubmitTime,
ShuffleWriteClient client) {
super(
applicationAttemptId,
containerId,
Expand All @@ -111,20 +113,32 @@ public RssMRAppMaster(
rssNmHttpPort = nmHttpPort;
rssContainerID = containerId;
rssContainerAllocator = null;
shuffleWriteClient = client;
}

private static final Logger LOG = LoggerFactory.getLogger(RssMRAppMaster.class);

@Override
protected void serviceStop() throws Exception {
LOG.info("Unregister shuffle for app {}", this.getAttemptID());
if (shuffleWriteClient != null) {
shuffleWriteClient.unregisterShuffle(getAttemptID().toString(), 0);
}
super.serviceStop();
}

public static void main(String[] args) {

JobConf conf = new JobConf(new YarnConfiguration());
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));

ShuffleWriteClient shuffleWriteClient = null;
int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if (numReduceTasks > 0) {
String coordinators = conf.get(RssMRConfig.RSS_COORDINATOR_QUORUM);

ShuffleWriteClient client = RssMRUtils.createShuffleClient(conf);
shuffleWriteClient = client;

LOG.info("Registering coordinators {}", coordinators);
client.registerCoordinators(coordinators);
Expand Down Expand Up @@ -369,7 +383,8 @@ public Thread newThread(Runnable r) {
nodeHostString,
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString),
appSubmitTime);
appSubmitTime,
shuffleWriteClient);
ShutdownHookManager.get().addShutdownHook(new RssMRAppMasterShutdownHook(appMaster), 30);
MRWebAppUtil.initialize(conf);
String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,6 @@ static class RssDAGAppMasterShutdownHook implements Runnable {

@Override
public void run() {
releaseRssResources(appMaster);

LOG.info(
"RssDAGAppMaster received a signal. Signaling RMCommunicator and JobHistoryEventHandler.");
this.appMaster.stop();
Expand All @@ -301,20 +299,14 @@ static void releaseRssResources(RssDAGAppMaster appMaster) {
try {
LOG.info("RssDAGAppMaster releaseRssResources invoked");
appMaster.heartBeatExecutorService.shutdownNow();

if (appMaster.tezRemoteShuffleManager != null) {
appMaster.tezRemoteShuffleManager.shutdown();
appMaster.tezRemoteShuffleManager = null;
}
if (appMaster.shuffleWriteClient != null) {
appMaster.shuffleWriteClient.close();
appMaster.shuffleWriteClient = null;
}
appMaster.shuffleWriteClient = null;

if (appMaster.tezRemoteShuffleManager != null) {
try {
appMaster.tezRemoteShuffleManager.shutdown();
} catch (Exception e) {
LOG.info("Failed to shutdown TezRemoteShuffleManager.", e);
}
}
appMaster.tezRemoteShuffleManager = null;
} catch (Throwable t) {
LOG.error("Failed to release Rss resources.", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ public void start() throws Exception {

@Override
public void shutdown() throws Exception {
if (rssClient != null) {
LOG.info("unregister all shuffle for appid {}", appId);
Map<Integer, ShuffleAssignmentsInfo> infos =
tezRemoteShuffleUmbilical.getShuffleIdToShuffleAssignsInfo();
for (Map.Entry<Integer, ShuffleAssignmentsInfo> entry : infos.entrySet()) {
rssClient.unregisterShuffle(appId, entry.getKey());
}
}
server.stop();
}

Expand Down Expand Up @@ -173,6 +181,10 @@ public GetShuffleServerResponse getShuffleAssignments(GetShuffleServerRequest re

return response;
}

Map<Integer, ShuffleAssignmentsInfo> getShuffleIdToShuffleAssignsInfo() {
return shuffleIdToShuffleAssignsInfo;
}
}

private ShuffleAssignmentsInfo getShuffleWorks(int partitionNum, int shuffleId) {
Expand Down

0 comments on commit 7fb0f91

Please sign in to comment.