diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java index 33e08c4742..e494300013 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java @@ -537,7 +537,8 @@ private ServerNode toServerNode(ShuffleServerHeartBeatRequest request) { request.getServerId().getJettyPort(), request.getStartTimeMs(), request.getVersion(), - request.getGitCommitId()); + request.getGitCommitId(), + request.getApplicationInfoList()); } /** diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java index ad992f0bc3..356c4bfe9d 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java @@ -17,14 +17,18 @@ package org.apache.uniffle.coordinator; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.uniffle.common.ServerStatus; import org.apache.uniffle.common.storage.StorageInfo; +import org.apache.uniffle.proto.RssProtos; import org.apache.uniffle.proto.RssProtos.ShuffleServerId; public class ServerNode implements Comparable { @@ -46,6 +50,7 @@ public class ServerNode implements Comparable { private long startTime = -1; private String version; private String gitCommitId; + Map appIdToInfos; public ServerNode(String id) { this(id, "", 0, 0, 0, 0, 0, Sets.newHashSet(), ServerStatus.EXCLUDED); @@ -181,7 +186,8 @@ public ServerNode( jettyPort, startTime, "", - ""); + "", + Collections.EMPTY_LIST); } public ServerNode( @@ -199,7 +205,8 @@ public ServerNode( int jettyPort, long startTime, String version, - String gitCommitId) { + String gitCommitId, + List appInfos) { this.id = id; this.ip = ip; this.grpcPort = grpcPort; @@ -221,6 +228,8 @@ public ServerNode( this.startTime = startTime; this.version = version; this.gitCommitId = gitCommitId; + this.appIdToInfos = new ConcurrentHashMap<>(); + appInfos.forEach(appInfo -> appIdToInfos.put(appInfo.getAppId(), appInfo)); } public ShuffleServerId convertToGrpcProto() { diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java index 8583e952e1..fbfe578247 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java @@ -127,7 +127,8 @@ public ShuffleServerHeartBeatResponse doSendHeartBeat( Map storageInfo, int nettyPort, int jettyPort, - long startTimeMs) { + long startTimeMs, + List appInfos) { ShuffleServerId serverId = ShuffleServerId.newBuilder() .setId(id) @@ -149,6 +150,7 @@ public ShuffleServerHeartBeatResponse doSendHeartBeat( .setStartTimeMs(startTimeMs) .setVersion(Constants.VERSION) .setGitCommitId(Constants.REVISION_SHORT) + .addAllApplicationInfo(appInfos) .build(); RssProtos.StatusCode status; @@ -225,7 +227,8 @@ public RssSendHeartBeatResponse sendHeartBeat(RssSendHeartBeatRequest request) { request.getStorageInfo(), request.getNettyPort(), request.getJettyPort(), - request.getStartTimeMs()); + request.getStartTimeMs(), + request.getAppInfos()); RssSendHeartBeatResponse response; RssProtos.StatusCode statusCode = rpcResponse.getStatus(); diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java index a31164195d..a4f23ba738 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java @@ -17,11 +17,13 @@ package org.apache.uniffle.client.request; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.uniffle.common.ServerStatus; import org.apache.uniffle.common.storage.StorageInfo; +import org.apache.uniffle.proto.RssProtos; public class RssSendHeartBeatRequest { @@ -39,6 +41,7 @@ public class RssSendHeartBeatRequest { private final int nettyPort; private final int jettyPort; private final long startTimeMs; + private final List appInfos; public RssSendHeartBeatRequest( String shuffleServerId, @@ -54,7 +57,8 @@ public RssSendHeartBeatRequest( Map storageInfo, int nettyPort, int jettyPort, - long startTimeMs) { + long startTimeMs, + List appInfos) { this.shuffleServerId = shuffleServerId; this.shuffleServerIp = shuffleServerIp; this.shuffleServerPort = shuffleServerPort; @@ -69,6 +73,7 @@ public RssSendHeartBeatRequest( this.nettyPort = nettyPort; this.jettyPort = jettyPort; this.startTimeMs = startTimeMs; + this.appInfos = appInfos; } public String getShuffleServerId() { @@ -126,4 +131,8 @@ public int getJettyPort() { public long getStartTimeMs() { return startTimeMs; } + + public List getAppInfos() { + return appInfos; + } } diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index 06d781e134..7e4b19696b 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -273,6 +273,17 @@ enum ServerStatus { // todo: more status, such as UPGRADING } +message ApplicationInfo { + string appId = 1; + int64 partitionNum = 2; + int64 memorySize = 3; + int64 localFileNum = 4; + int64 localTotalSize = 5; + int64 hadoopFileNum = 6; + int64 hadoopTotalSize = 7; + int64 totalSize = 8; +} + message ShuffleServerHeartBeatRequest { ShuffleServerId serverId = 1; int64 usedMemory = 2; @@ -286,6 +297,7 @@ message ShuffleServerHeartBeatRequest { optional string version = 22; optional string gitCommitId = 23; optional int64 startTimeMs = 24; + repeated ApplicationInfo applicationInfo = 25; } message ShuffleServerHeartBeatResponse { diff --git a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java index 4b2d4607ad..5a44728b59 100644 --- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java +++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java @@ -17,6 +17,7 @@ package org.apache.uniffle.server; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; @@ -33,6 +34,7 @@ import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.storage.StorageInfo; import org.apache.uniffle.common.util.ThreadUtils; +import org.apache.uniffle.proto.RssProtos; public class RegisterHeartBeat { @@ -84,7 +86,8 @@ public void startHeartBeat() { shuffleServer.getStorageManager().getStorageInfo(), shuffleServer.getNettyPort(), shuffleServer.getJettyPort(), - shuffleServer.getStartTimeMs()); + shuffleServer.getStartTimeMs(), + shuffleServer.getAppInfos()); } catch (Exception e) { LOG.warn("Error happened when send heart beat to coordinator"); } @@ -107,7 +110,8 @@ public boolean sendHeartBeat( Map localStorageInfo, int nettyPort, int jettyPort, - long startTimeMs) { + long startTimeMs, + List appInfos) { // use `rss.server.heartbeat.interval` as the timeout option RssSendHeartBeatRequest request = new RssSendHeartBeatRequest( @@ -124,7 +128,8 @@ public boolean sendHeartBeat( localStorageInfo, nettyPort, jettyPort, - startTimeMs); + startTimeMs, + appInfos); if (coordinatorClient.sendHeartBeat(request).getStatusCode() == StatusCode.SUCCESS) { return true; diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java index 92fa6b36bc..59f53c97a2 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java @@ -17,8 +17,10 @@ package org.apache.uniffle.server; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -57,6 +59,7 @@ import org.apache.uniffle.common.util.ThreadUtils; import org.apache.uniffle.common.web.CoalescedCollectorRegistry; import org.apache.uniffle.common.web.JettyServer; +import org.apache.uniffle.proto.RssProtos; import org.apache.uniffle.server.buffer.ShuffleBufferManager; import org.apache.uniffle.server.buffer.ShuffleBufferType; import org.apache.uniffle.server.merge.ShuffleMergeManager; @@ -584,6 +587,26 @@ public long getStartTimeMs() { return startTimeMs; } + public List getAppInfos() { + List appInfos = new ArrayList<>(); + Map taskInfos = getShuffleTaskManager().getShuffleTaskInfos(); + taskInfos.forEach( + (appId, taskInfo) -> { + RssProtos.ApplicationInfo applicationInfo = + RssProtos.ApplicationInfo.newBuilder() + .setAppId(appId) + .setPartitionNum(taskInfo.getPartitionNum()) + .setMemorySize(taskInfo.getInMemoryDataSize()) + .setLocalTotalSize(taskInfo.getOnLocalFileDataSize()) + .setHadoopTotalSize(taskInfo.getOnHadoopDataSize()) + .setTotalSize(taskInfo.getTotalDataSize()) + .build(); + + appInfos.add(applicationInfo); + }); + return appInfos; + } + @VisibleForTesting public void sendHeartbeat() { ShuffleServer shuffleServer = this; @@ -600,7 +623,8 @@ public void sendHeartbeat() { shuffleServer.getStorageManager().getStorageInfo(), shuffleServer.getNettyPort(), shuffleServer.getJettyPort(), - shuffleServer.getStartTimeMs()); + shuffleServer.getStartTimeMs(), + shuffleServer.getAppInfos()); } public ShuffleMergeManager getShuffleMergeManager() { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java index d4e6eeb326..94987d6614 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java @@ -283,6 +283,10 @@ public ShuffleDetailInfo getShuffleDetailInfo(int shuffleId) { return shuffleDetailInfos.get(shuffleId); } + public long getPartitionNum() { + return partitionDataSizes.values().stream().mapToLong(Map::size).sum(); + } + @Override public String toString() { return "ShuffleTaskInfo{"