Skip to content

Commit

Permalink
[#2207] feat(dashboard): Add the write information of appinfo in Shuf…
Browse files Browse the repository at this point in the history
…lle Server heartbeat (#2208)

### What changes were proposed in this pull request?

Add the write information of appinfo in Shuflle Server heartbeat

### Why are the changes needed?

Fix: #2207 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Locally
Co-authored-by: wenlongwlli <[email protected]>
  • Loading branch information
lwllvyb and wenlongwlli authored Oct 21, 2024
1 parent 152ea7a commit 3a35b0f
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,8 @@ private ServerNode toServerNode(ShuffleServerHeartBeatRequest request) {
request.getServerId().getJettyPort(),
request.getStartTimeMs(),
request.getVersion(),
request.getGitCommitId());
request.getGitCommitId(),
request.getApplicationInfoList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package org.apache.uniffle.coordinator;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.ShuffleServerId;

public class ServerNode implements Comparable<ServerNode> {
Expand All @@ -46,6 +50,7 @@ public class ServerNode implements Comparable<ServerNode> {
private long startTime = -1;
private String version;
private String gitCommitId;
Map<String, RssProtos.ApplicationInfo> appIdToInfos;

public ServerNode(String id) {
this(id, "", 0, 0, 0, 0, 0, Sets.newHashSet(), ServerStatus.EXCLUDED);
Expand Down Expand Up @@ -181,7 +186,8 @@ public ServerNode(
jettyPort,
startTime,
"",
"");
"",
Collections.EMPTY_LIST);
}

public ServerNode(
Expand All @@ -199,7 +205,8 @@ public ServerNode(
int jettyPort,
long startTime,
String version,
String gitCommitId) {
String gitCommitId,
List<RssProtos.ApplicationInfo> appInfos) {
this.id = id;
this.ip = ip;
this.grpcPort = grpcPort;
Expand All @@ -221,6 +228,8 @@ public ServerNode(
this.startTime = startTime;
this.version = version;
this.gitCommitId = gitCommitId;
this.appIdToInfos = new ConcurrentHashMap<>();
appInfos.forEach(appInfo -> appIdToInfos.put(appInfo.getAppId(), appInfo));
}

public ShuffleServerId convertToGrpcProto() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public ShuffleServerHeartBeatResponse doSendHeartBeat(
Map<String, StorageInfo> storageInfo,
int nettyPort,
int jettyPort,
long startTimeMs) {
long startTimeMs,
List<RssProtos.ApplicationInfo> appInfos) {
ShuffleServerId serverId =
ShuffleServerId.newBuilder()
.setId(id)
Expand All @@ -149,6 +150,7 @@ public ShuffleServerHeartBeatResponse doSendHeartBeat(
.setStartTimeMs(startTimeMs)
.setVersion(Constants.VERSION)
.setGitCommitId(Constants.REVISION_SHORT)
.addAllApplicationInfo(appInfos)
.build();

RssProtos.StatusCode status;
Expand Down Expand Up @@ -225,7 +227,8 @@ public RssSendHeartBeatResponse sendHeartBeat(RssSendHeartBeatRequest request) {
request.getStorageInfo(),
request.getNettyPort(),
request.getJettyPort(),
request.getStartTimeMs());
request.getStartTimeMs(),
request.getAppInfos());

RssSendHeartBeatResponse response;
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.uniffle.client.request;

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.proto.RssProtos;

public class RssSendHeartBeatRequest {

Expand All @@ -39,6 +41,7 @@ public class RssSendHeartBeatRequest {
private final int nettyPort;
private final int jettyPort;
private final long startTimeMs;
private final List<RssProtos.ApplicationInfo> appInfos;

public RssSendHeartBeatRequest(
String shuffleServerId,
Expand All @@ -54,7 +57,8 @@ public RssSendHeartBeatRequest(
Map<String, StorageInfo> storageInfo,
int nettyPort,
int jettyPort,
long startTimeMs) {
long startTimeMs,
List<RssProtos.ApplicationInfo> appInfos) {
this.shuffleServerId = shuffleServerId;
this.shuffleServerIp = shuffleServerIp;
this.shuffleServerPort = shuffleServerPort;
Expand All @@ -69,6 +73,7 @@ public RssSendHeartBeatRequest(
this.nettyPort = nettyPort;
this.jettyPort = jettyPort;
this.startTimeMs = startTimeMs;
this.appInfos = appInfos;
}

public String getShuffleServerId() {
Expand Down Expand Up @@ -126,4 +131,8 @@ public int getJettyPort() {
public long getStartTimeMs() {
return startTimeMs;
}

public List<RssProtos.ApplicationInfo> getAppInfos() {
return appInfos;
}
}
12 changes: 12 additions & 0 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,17 @@ enum ServerStatus {
// todo: more status, such as UPGRADING
}

message ApplicationInfo {
string appId = 1;
int64 partitionNum = 2;
int64 memorySize = 3;
int64 localFileNum = 4;
int64 localTotalSize = 5;
int64 hadoopFileNum = 6;
int64 hadoopTotalSize = 7;
int64 totalSize = 8;
}

message ShuffleServerHeartBeatRequest {
ShuffleServerId serverId = 1;
int64 usedMemory = 2;
Expand All @@ -286,6 +297,7 @@ message ShuffleServerHeartBeatRequest {
optional string version = 22;
optional string gitCommitId = 23;
optional int64 startTimeMs = 24;
repeated ApplicationInfo applicationInfo = 25;
}

message ShuffleServerHeartBeatResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.uniffle.server;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -33,6 +34,7 @@
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.proto.RssProtos;

public class RegisterHeartBeat {

Expand Down Expand Up @@ -84,7 +86,8 @@ public void startHeartBeat() {
shuffleServer.getStorageManager().getStorageInfo(),
shuffleServer.getNettyPort(),
shuffleServer.getJettyPort(),
shuffleServer.getStartTimeMs());
shuffleServer.getStartTimeMs(),
shuffleServer.getAppInfos());
} catch (Exception e) {
LOG.warn("Error happened when send heart beat to coordinator");
}
Expand All @@ -107,7 +110,8 @@ public boolean sendHeartBeat(
Map<String, StorageInfo> localStorageInfo,
int nettyPort,
int jettyPort,
long startTimeMs) {
long startTimeMs,
List<RssProtos.ApplicationInfo> appInfos) {
// use `rss.server.heartbeat.interval` as the timeout option
RssSendHeartBeatRequest request =
new RssSendHeartBeatRequest(
Expand All @@ -124,7 +128,8 @@ public boolean sendHeartBeat(
localStorageInfo,
nettyPort,
jettyPort,
startTimeMs);
startTimeMs,
appInfos);

if (coordinatorClient.sendHeartBeat(request).getStatusCode() == StatusCode.SUCCESS) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.uniffle.server;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -57,6 +59,7 @@
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.common.web.CoalescedCollectorRegistry;
import org.apache.uniffle.common.web.JettyServer;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.buffer.ShuffleBufferType;
import org.apache.uniffle.server.merge.ShuffleMergeManager;
Expand Down Expand Up @@ -584,6 +587,26 @@ public long getStartTimeMs() {
return startTimeMs;
}

public List<RssProtos.ApplicationInfo> getAppInfos() {
List<RssProtos.ApplicationInfo> appInfos = new ArrayList<>();
Map<String, ShuffleTaskInfo> taskInfos = getShuffleTaskManager().getShuffleTaskInfos();
taskInfos.forEach(
(appId, taskInfo) -> {
RssProtos.ApplicationInfo applicationInfo =
RssProtos.ApplicationInfo.newBuilder()
.setAppId(appId)
.setPartitionNum(taskInfo.getPartitionNum())
.setMemorySize(taskInfo.getInMemoryDataSize())
.setLocalTotalSize(taskInfo.getOnLocalFileDataSize())
.setHadoopTotalSize(taskInfo.getOnHadoopDataSize())
.setTotalSize(taskInfo.getTotalDataSize())
.build();

appInfos.add(applicationInfo);
});
return appInfos;
}

@VisibleForTesting
public void sendHeartbeat() {
ShuffleServer shuffleServer = this;
Expand All @@ -600,7 +623,8 @@ public void sendHeartbeat() {
shuffleServer.getStorageManager().getStorageInfo(),
shuffleServer.getNettyPort(),
shuffleServer.getJettyPort(),
shuffleServer.getStartTimeMs());
shuffleServer.getStartTimeMs(),
shuffleServer.getAppInfos());
}

public ShuffleMergeManager getShuffleMergeManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ public ShuffleDetailInfo getShuffleDetailInfo(int shuffleId) {
return shuffleDetailInfos.get(shuffleId);
}

public long getPartitionNum() {
return partitionDataSizes.values().stream().mapToLong(Map::size).sum();
}

@Override
public String toString() {
return "ShuffleTaskInfo{"
Expand Down

0 comments on commit 3a35b0f

Please sign in to comment.