From a6aefcb384395b96add867a2a309d1059a1e918e Mon Sep 17 00:00:00 2001 From: kqhzz Date: Mon, 9 Sep 2024 19:18:07 +0800 Subject: [PATCH] [#2094] feat(client): Introduce retry mechanism for coordinator client (#2095) ### Why are the changes needed? Fix: #2094 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UTs --- .../mapred/SortWriteBufferManagerTest.java | 6 +- .../mapreduce/task/reduce/FetcherTest.java | 6 +- .../spark/shuffle/RssSparkShuffleUtils.java | 9 +- .../manager/RssShuffleManagerBase.java | 103 ++++--- .../manager/RssShuffleManagerBaseTest.java | 7 + .../shuffle/DelegationRssShuffleManager.java | 76 ++---- .../spark/shuffle/RssShuffleManager.java | 5 +- .../DelegationRssShuffleManagerTest.java | 11 +- .../shuffle/DelegationRssShuffleManager.java | 71 ++--- .../spark/shuffle/RssShuffleManager.java | 5 +- .../shuffle/RssShuffleManagerTestBase.java | 5 +- .../sort/buffer/WriteBufferManagerTest.java | 6 +- .../client/api/ShuffleWriteClient.java | 15 +- .../client/impl/ShuffleWriteClientImpl.java | 156 ++++------- .../factory/CoordinatorClientFactory.java | 12 + .../grpc/CoordinatorGrpcRetryableClient.java | 256 ++++++++++++++++++ 16 files changed, 486 insertions(+), 263 deletions(-) create mode 100644 internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java diff --git a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java index 62f1f86f85..c0da9ec977 100644 --- a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java +++ b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java @@ -534,7 +534,7 @@ public boolean sendCommit( } @Override - public void registerCoordinators(String coordinators) {} + public void registerCoordinators(String coordinators, long retryIntervalMs, int retryTimes) {} @Override public Map fetchClientConf(int timeoutMs) { @@ -578,7 +578,9 @@ public ShuffleAssignmentsInfo getShuffleAssignments( Set faultyServerIds, int stageId, int stageAttemptNumber, - boolean reassign) { + boolean reassign, + long retryIntervalMs, + int retryTimes) { return null; } diff --git a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java index 9ca680cb3d..7a58ff45d6 100644 --- a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java +++ b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java @@ -515,7 +515,7 @@ public boolean sendCommit( } @Override - public void registerCoordinators(String coordinators) {} + public void registerCoordinators(String coordinators, long retryIntervalMs, int retryTimes) {} @Override public Map fetchClientConf(int timeoutMs) { @@ -547,7 +547,9 @@ public ShuffleAssignmentsInfo getShuffleAssignments( Set faultyServerIds, int stageId, int stageAttemptNumber, - boolean reassign) { + boolean reassign, + long retryIntervalMs, + int retryTimes) { return null; } diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java index feee2a3312..e47a655c70 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java @@ -40,9 +40,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.uniffle.client.api.CoordinatorClient; import org.apache.uniffle.client.api.ShuffleManagerClient; import org.apache.uniffle.client.factory.CoordinatorClientFactory; +import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest; import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse; import org.apache.uniffle.client.util.ClientUtils; @@ -103,12 +103,15 @@ public static ShuffleManager loadShuffleManager(String name, SparkConf conf, boo return instance; } - public static List createCoordinatorClients(SparkConf sparkConf) { + public static CoordinatorGrpcRetryableClient createCoordinatorClients(SparkConf sparkConf) { String clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE); String coordinators = sparkConf.get(RssSparkConfig.RSS_COORDINATOR_QUORUM); + long retryIntervalMs = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX); + int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX); + int heartbeatThread = sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM); CoordinatorClientFactory coordinatorClientFactory = CoordinatorClientFactory.getInstance(); return coordinatorClientFactory.createCoordinatorClient( - ClientType.valueOf(clientType), coordinators); + ClientType.valueOf(clientType), coordinators, retryIntervalMs, retryTimes, heartbeatThread); } public static void applyDynamicClientConf(SparkConf sparkConf, Map confItems) { diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java index b3bd138568..077ffe9d92 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java @@ -60,11 +60,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.uniffle.client.api.CoordinatorClient; import org.apache.uniffle.client.api.ShuffleManagerClient; import org.apache.uniffle.client.api.ShuffleWriteClient; import org.apache.uniffle.client.factory.CoordinatorClientFactory; import org.apache.uniffle.client.factory.ShuffleManagerClientFactory; +import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; import org.apache.uniffle.client.request.RssFetchClientConfRequest; import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest; import org.apache.uniffle.client.response.RssFetchClientConfResponse; @@ -399,10 +399,17 @@ protected static long getTaskAttemptIdForBlockId( protected static void fetchAndApplyDynamicConf(SparkConf sparkConf) { String clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE); String coordinators = sparkConf.get(RssSparkConfig.RSS_COORDINATOR_QUORUM.key()); + long retryIntervalMs = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX); + int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX); + int heartbeatThread = sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM); CoordinatorClientFactory coordinatorClientFactory = CoordinatorClientFactory.getInstance(); - List coordinatorClients = + CoordinatorGrpcRetryableClient coordinatorClients = coordinatorClientFactory.createCoordinatorClient( - ClientType.valueOf(clientType), coordinators); + ClientType.valueOf(clientType), + coordinators, + retryIntervalMs, + retryTimes, + heartbeatThread); int timeoutMs = sparkConf.getInt( @@ -416,18 +423,11 @@ protected static void fetchAndApplyDynamicConf(SparkConf sparkConf) { } RssFetchClientConfRequest request = new RssFetchClientConfRequest(timeoutMs, user, Collections.emptyMap()); - for (CoordinatorClient client : coordinatorClients) { - RssFetchClientConfResponse response = client.fetchClientConf(request); - if (response.getStatusCode() == StatusCode.SUCCESS) { - LOG.info("Success to get conf from {}", client.getDesc()); - RssSparkShuffleUtils.applyDynamicClientConf(sparkConf, response.getClientConf()); - break; - } else { - LOG.warn("Fail to get conf from {}", client.getDesc()); - } + RssFetchClientConfResponse response = coordinatorClients.fetchClientConf(request); + if (response.getStatusCode() == StatusCode.SUCCESS) { + RssSparkShuffleUtils.applyDynamicClientConf(sparkConf, response.getClientConf()); } - - coordinatorClients.forEach(CoordinatorClient::close); + coordinatorClients.close(); } @Override @@ -902,31 +902,28 @@ private Map> requestShuffleAssignment( int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES); faultyServerIds.addAll(rssStageResubmitManager.getServerIdBlackList()); try { - return RetryUtils.retry( - () -> { - ShuffleAssignmentsInfo response = - shuffleWriteClient.getShuffleAssignments( - id.get(), - shuffleId, - partitionNum, - partitionNumPerRange, - assignmentTags, - assignmentShuffleServerNumber, - estimateTaskConcurrency, - faultyServerIds, - stageId, - stageAttemptNumber, - reassign); - LOG.info("Finished reassign"); - if (reassignmentHandler != null) { - response = reassignmentHandler.apply(response); - } - registerShuffleServers( - id.get(), shuffleId, response.getServerToPartitionRanges(), getRemoteStorageInfo()); - return response.getPartitionToServers(); - }, - retryInterval, - retryTimes); + ShuffleAssignmentsInfo response = + shuffleWriteClient.getShuffleAssignments( + id.get(), + shuffleId, + partitionNum, + partitionNumPerRange, + assignmentTags, + assignmentShuffleServerNumber, + estimateTaskConcurrency, + faultyServerIds, + stageId, + stageAttemptNumber, + reassign, + retryInterval, + retryTimes); + LOG.info("Finished reassign"); + if (reassignmentHandler != null) { + response = reassignmentHandler.apply(response); + } + registerShuffleServers( + id.get(), shuffleId, response.getServerToPartitionRanges(), getRemoteStorageInfo()); + return response.getPartitionToServers(); } catch (Throwable throwable) { throw new RssException("registerShuffle failed!", throwable); } @@ -950,21 +947,23 @@ protected Map> requestShuffleAssignment( int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES); faultyServerIds.addAll(rssStageResubmitManager.getServerIdBlackList()); try { + ShuffleAssignmentsInfo response = + shuffleWriteClient.getShuffleAssignments( + appId, + shuffleId, + partitionNum, + partitionNumPerRange, + assignmentTags, + assignmentShuffleServerNumber, + estimateTaskConcurrency, + faultyServerIds, + stageId, + stageAttemptNumber, + reassign, + retryInterval, + retryTimes); return RetryUtils.retry( () -> { - ShuffleAssignmentsInfo response = - shuffleWriteClient.getShuffleAssignments( - appId, - shuffleId, - partitionNum, - partitionNumPerRange, - assignmentTags, - assignmentShuffleServerNumber, - estimateTaskConcurrency, - faultyServerIds, - stageId, - stageAttemptNumber, - reassign); registerShuffleServers( appId, shuffleId, diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java index 610b42c8c3..80c47ee6d2 100644 --- a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java +++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java @@ -35,6 +35,7 @@ import org.apache.uniffle.client.api.CoordinatorClient; import org.apache.uniffle.client.factory.CoordinatorClientFactory; +import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; import org.apache.uniffle.client.request.RssFetchClientConfRequest; import org.apache.uniffle.client.response.RssFetchClientConfResponse; import org.apache.uniffle.common.ClientType; @@ -647,6 +648,12 @@ void testFetchAndApplyDynamicConf() { try (MockedStatic mockFactoryStatic = mockStatic(CoordinatorClientFactory.class)) { mockFactoryStatic.when(CoordinatorClientFactory::getInstance).thenReturn(mockFactoryInstance); + long interval = conf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX); + int retry = conf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX); + int num = conf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM); + when(mockFactoryInstance.createCoordinatorClient( + clientType, coordinators, interval, retry, num)) + .thenReturn(new CoordinatorGrpcRetryableClient(mockClients, interval, retry, num)); RssShuffleManagerBase.fetchAndApplyDynamicConf(conf); } diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index cf19877de5..fa09ae8789 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -17,11 +17,9 @@ package org.apache.spark.shuffle; -import java.util.List; import java.util.Map; import java.util.Set; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -31,13 +29,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.uniffle.client.api.CoordinatorClient; +import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; import org.apache.uniffle.client.request.RssAccessClusterRequest; import org.apache.uniffle.client.response.RssAccessClusterResponse; import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.Constants; -import org.apache.uniffle.common.util.RetryUtils; import static org.apache.uniffle.common.util.Constants.ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM; @@ -46,7 +43,7 @@ public class DelegationRssShuffleManager implements ShuffleManager { private static final Logger LOG = LoggerFactory.getLogger(DelegationRssShuffleManager.class); private final ShuffleManager delegate; - private final List coordinatorClients; + private final CoordinatorGrpcRetryableClient coordinatorClients; private final int accessTimeoutMs; private final SparkConf sparkConf; private String user; @@ -61,7 +58,7 @@ public DelegationRssShuffleManager(SparkConf sparkConf, boolean isDriver) throws coordinatorClients = RssSparkShuffleUtils.createCoordinatorClients(sparkConf); delegate = createShuffleManagerInDriver(); } else { - coordinatorClients = Lists.newArrayList(); + coordinatorClients = null; delegate = createShuffleManagerInExecutor(); } @@ -127,50 +124,31 @@ private boolean tryAccessCluster() { extraProperties.put( ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum)); - for (CoordinatorClient coordinatorClient : coordinatorClients) { - Set assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf); - boolean canAccess; - try { - canAccess = - RetryUtils.retry( - () -> { - RssAccessClusterResponse response = - coordinatorClient.accessCluster( - new RssAccessClusterRequest( - accessId, assignmentTags, accessTimeoutMs, extraProperties, user)); - if (response.getStatusCode() == StatusCode.SUCCESS) { - LOG.warn( - "Success to access cluster {} using {}", - coordinatorClient.getDesc(), - accessId); - uuid = response.getUuid(); - return true; - } else if (response.getStatusCode() == StatusCode.ACCESS_DENIED) { - throw new RssException( - "Request to access cluster " - + coordinatorClient.getDesc() - + " is denied using " - + accessId - + " for " - + response.getMessage()); - } else { - throw new RssException( - "Fail to reach cluster " - + coordinatorClient.getDesc() - + " for " - + response.getMessage()); - } - }, + Set assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf); + try { + if (coordinatorClients != null) { + RssAccessClusterResponse response = + coordinatorClients.accessCluster( + new RssAccessClusterRequest( + accessId, assignmentTags, accessTimeoutMs, extraProperties, user), retryInterval, retryTimes); - return canAccess; - } catch (Throwable e) { - LOG.warn( - "Fail to access cluster {} using {} for {}", - coordinatorClient.getDesc(), - accessId, - e.getMessage()); + if (response.getStatusCode() == StatusCode.SUCCESS) { + LOG.warn("Success to access cluster using {}", accessId); + uuid = response.getUuid(); + return true; + } else if (response.getStatusCode() == StatusCode.ACCESS_DENIED) { + throw new RssException( + "Request to access cluster is denied using " + + accessId + + " for " + + response.getMessage()); + } else { + throw new RssException("Fail to reach cluster for " + response.getMessage()); + } } + } catch (Throwable e) { + LOG.warn("Fail to access cluster using {} for ", accessId, e); } return false; @@ -227,7 +205,9 @@ public boolean unregisterShuffle(int shuffleId) { @Override public void stop() { delegate.stop(); - coordinatorClients.forEach(CoordinatorClient::close); + if (coordinatorClients != null) { + coordinatorClients.close(); + } } @Override diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index 27db614bf7..68f3bd4b70 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -401,7 +401,10 @@ private void startHeartbeat() { protected void registerCoordinator() { String coordinators = sparkConf.get(RssSparkConfig.RSS_COORDINATOR_QUORUM.key()); LOG.info("Registering coordinators {}", coordinators); - shuffleWriteClient.registerCoordinators(coordinators); + shuffleWriteClient.registerCoordinators( + coordinators, + this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX), + this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX)); } public CompletableFuture sendData(AddBlockEvent event) { diff --git a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java index 8f24c5e5c9..05586cf4e0 100644 --- a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java +++ b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java @@ -30,6 +30,7 @@ import org.mockito.Mockito; import org.apache.uniffle.client.api.CoordinatorClient; +import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; import org.apache.uniffle.client.response.RssAccessClusterResponse; import org.apache.uniffle.storage.util.StorageType; @@ -66,7 +67,7 @@ public void testCreateInDriverDenied() throws Exception { coordinatorClients.add(mockCoordinatorClient); mockedStaticRssShuffleUtils .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any())) - .thenReturn(coordinatorClients); + .thenReturn(new CoordinatorGrpcRetryableClient(coordinatorClients, 0, 0, 1)); SparkConf conf = new SparkConf(); conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); assertCreateSortShuffleManager(conf); @@ -81,7 +82,7 @@ public void testCreateInDriver() throws Exception { coordinatorClients.add(mockCoordinatorClient); mockedStaticRssShuffleUtils .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any())) - .thenReturn(coordinatorClients); + .thenReturn(new CoordinatorGrpcRetryableClient(coordinatorClients, 0, 0, 1)); SparkConf conf = new SparkConf(); assertCreateSortShuffleManager(conf); @@ -119,7 +120,7 @@ public void testCreateFallback() throws Exception { coordinatorClients.add(mockCoordinatorClient); mockedStaticRssShuffleUtils .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any())) - .thenReturn(coordinatorClients); + .thenReturn(new CoordinatorGrpcRetryableClient(coordinatorClients, 0, 0, 1)); SparkConf conf = new SparkConf(); conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); @@ -153,7 +154,7 @@ public void testTryAccessCluster() throws Exception { coordinatorClients.add(mockDeniedCoordinatorClient); mockedStaticRssShuffleUtils .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any())) - .thenReturn(coordinatorClients); + .thenReturn(new CoordinatorGrpcRetryableClient(coordinatorClients, 0, 0, 1)); SparkConf conf = new SparkConf(); conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L); conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3); @@ -173,7 +174,7 @@ public void testTryAccessCluster() throws Exception { secondCoordinatorClients.add(mockCoordinatorClient); mockedStaticRssShuffleUtils .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any())) - .thenReturn(secondCoordinatorClients); + .thenReturn(new CoordinatorGrpcRetryableClient(secondCoordinatorClients, 0, 0, 1)); SparkConf secondConf = new SparkConf(); secondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L); secondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3); diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index 9978f93816..81c6de7dcc 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -17,11 +17,9 @@ package org.apache.spark.shuffle; -import java.util.List; import java.util.Map; import java.util.Set; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -31,13 +29,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.uniffle.client.api.CoordinatorClient; +import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; import org.apache.uniffle.client.request.RssAccessClusterRequest; import org.apache.uniffle.client.response.RssAccessClusterResponse; import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.Constants; -import org.apache.uniffle.common.util.RetryUtils; import static org.apache.uniffle.common.util.Constants.ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM; @@ -46,7 +43,7 @@ public class DelegationRssShuffleManager implements ShuffleManager { private static final Logger LOG = LoggerFactory.getLogger(DelegationRssShuffleManager.class); private final ShuffleManager delegate; - private final List coordinatorClients; + private final CoordinatorGrpcRetryableClient coordinatorClients; private final int accessTimeoutMs; private final SparkConf sparkConf; private String user; @@ -61,7 +58,7 @@ public DelegationRssShuffleManager(SparkConf sparkConf, boolean isDriver) throws coordinatorClients = RssSparkShuffleUtils.createCoordinatorClients(sparkConf); delegate = createShuffleManagerInDriver(); } else { - coordinatorClients = Lists.newArrayList(); + coordinatorClients = null; delegate = createShuffleManagerInExecutor(); } @@ -127,47 +124,31 @@ private boolean tryAccessCluster() { extraProperties.put( ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum)); - for (CoordinatorClient coordinatorClient : coordinatorClients) { - Set assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf); - boolean canAccess; - try { - canAccess = - RetryUtils.retry( - () -> { - RssAccessClusterResponse response = - coordinatorClient.accessCluster( - new RssAccessClusterRequest( - accessId, assignmentTags, accessTimeoutMs, extraProperties, user)); - if (response.getStatusCode() == StatusCode.SUCCESS) { - LOG.warn( - "Success to access cluster {} using {}", - coordinatorClient.getDesc(), - accessId); - uuid = response.getUuid(); - return true; - } else if (response.getStatusCode() == StatusCode.ACCESS_DENIED) { - throw new RssException( - "Request to access cluster " - + coordinatorClient.getDesc() - + " is denied using " - + accessId - + " for " - + response.getMessage()); - } else { - throw new RssException( - "Fail to reach cluster " - + coordinatorClient.getDesc() - + " for " - + response.getMessage()); - } - }, + Set assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf); + try { + if (coordinatorClients != null) { + RssAccessClusterResponse response = + coordinatorClients.accessCluster( + new RssAccessClusterRequest( + accessId, assignmentTags, accessTimeoutMs, extraProperties, user), retryInterval, retryTimes); - return canAccess; - } catch (Throwable e) { - LOG.warn( - "Fail to access cluster {} using {} for ", coordinatorClient.getDesc(), accessId, e); + if (response.getStatusCode() == StatusCode.SUCCESS) { + LOG.warn("Success to access cluster using {}", accessId); + uuid = response.getUuid(); + return true; + } else if (response.getStatusCode() == StatusCode.ACCESS_DENIED) { + throw new RssException( + "Request to access cluster is denied using " + + accessId + + " for " + + response.getMessage()); + } else { + throw new RssException("Fail to reach cluster for " + response.getMessage()); + } } + } catch (Throwable e) { + LOG.warn("Fail to access cluster using {} for ", accessId, e); } return false; @@ -304,7 +285,7 @@ public boolean unregisterShuffle(int shuffleId) { @Override public void stop() { delegate.stop(); - coordinatorClients.forEach(CoordinatorClient::close); + coordinatorClients.close(); } @Override diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index 1c08dd3b03..d7725113f0 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -891,7 +891,10 @@ public void clearTaskMeta(String taskId) { protected void registerCoordinator() { String coordinators = sparkConf.get(RssSparkConfig.RSS_COORDINATOR_QUORUM.key()); LOG.info("Start Registering coordinators {}", coordinators); - shuffleWriteClient.registerCoordinators(coordinators); + shuffleWriteClient.registerCoordinators( + coordinators, + this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX), + this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX)); } @VisibleForTesting diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java index fc04819424..09cf24cf88 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java +++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java @@ -26,6 +26,7 @@ import org.mockito.Mockito; import org.apache.uniffle.client.api.CoordinatorClient; +import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; import org.apache.uniffle.client.response.RssAccessClusterResponse; import org.apache.uniffle.common.rpc.StatusCode; @@ -59,8 +60,10 @@ void setupMockedRssShuffleUtils(StatusCode status) { CoordinatorClient mockCoordinatorClient = createCoordinatorClient(status); List coordinatorClients = Lists.newArrayList(); coordinatorClients.add(mockCoordinatorClient); + CoordinatorGrpcRetryableClient client = + new CoordinatorGrpcRetryableClient(coordinatorClients, 0, 1, 1); mockedStaticRssShuffleUtils .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any())) - .thenReturn(coordinatorClients); + .thenReturn(client); } } diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java index 70c1a8aed8..f6ddad7cf5 100644 --- a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java +++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java @@ -605,7 +605,7 @@ public boolean sendCommit( } @Override - public void registerCoordinators(String coordinators) {} + public void registerCoordinators(String coordinators, long retryIntervalMs, int retryTimes) {} @Override public Map fetchClientConf(int timeoutMs) { @@ -649,7 +649,9 @@ public ShuffleAssignmentsInfo getShuffleAssignments( Set faultyServerIds, int stageId, int stageAttemptNumber, - boolean reassign) { + boolean reassign, + long retryIntervalMs, + int retryTimes) { return null; } diff --git a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java index db0c914848..548f348f5a 100644 --- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java +++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java @@ -87,7 +87,12 @@ void registerShuffle( boolean sendCommit( Set shuffleServerInfoSet, String appId, int shuffleId, int numMaps); - void registerCoordinators(String coordinators); + @Deprecated + default void registerCoordinators(String coordinators) { + registerCoordinators(coordinators, 0, 0); + } + + void registerCoordinators(String coordinators, long retryIntervalMs, int retryTimes); Map fetchClientConf(int timeoutMs); @@ -111,7 +116,9 @@ ShuffleAssignmentsInfo getShuffleAssignments( Set faultyServerIds, int stageId, int stageAttemptNumber, - boolean reassign); + boolean reassign, + long retryIntervalMs, + int retryTimes); default ShuffleAssignmentsInfo getShuffleAssignments( String appId, @@ -133,7 +140,9 @@ default ShuffleAssignmentsInfo getShuffleAssignments( faultyServerIds, -1, 0, - false); + false, + 0, + 0); } default ShuffleAssignmentsInfo getShuffleAssignments( diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index b16b88168d..2178f365b4 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -46,12 +46,12 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.client.PartitionDataReplicaRequirementTracking; -import org.apache.uniffle.client.api.CoordinatorClient; import org.apache.uniffle.client.api.ShuffleServerClient; import org.apache.uniffle.client.api.ShuffleWriteClient; import org.apache.uniffle.client.factory.CoordinatorClientFactory; import org.apache.uniffle.client.factory.ShuffleClientFactory; import org.apache.uniffle.client.factory.ShuffleServerClientFactory; +import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; import org.apache.uniffle.client.request.RssAppHeartBeatRequest; import org.apache.uniffle.client.request.RssApplicationInfoRequest; import org.apache.uniffle.client.request.RssFetchClientConfRequest; @@ -68,9 +68,6 @@ import org.apache.uniffle.client.request.RssUnregisterShuffleRequest; import org.apache.uniffle.client.response.ClientResponse; import org.apache.uniffle.client.response.RssAppHeartBeatResponse; -import org.apache.uniffle.client.response.RssApplicationInfoResponse; -import org.apache.uniffle.client.response.RssFetchClientConfResponse; -import org.apache.uniffle.client.response.RssFetchRemoteStorageResponse; import org.apache.uniffle.client.response.RssFinishShuffleResponse; import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse; import org.apache.uniffle.client.response.RssGetShuffleResultResponse; @@ -104,11 +101,12 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient { private String clientType; private int retryMax; private long retryIntervalMax; - private List coordinatorClients = Lists.newLinkedList(); + private CoordinatorGrpcRetryableClient coordinatorClients; // appId -> shuffleId -> servers private Map>> shuffleServerInfoMap = JavaUtils.newConcurrentMap(); private CoordinatorClientFactory coordinatorClientFactory; + private int heartBeatThreadNum; private ExecutorService heartBeatExecutorService; private int replica; private int replicaWrite; @@ -141,8 +139,9 @@ public ShuffleWriteClientImpl(ShuffleClientFactory.WriteClientBuilder builder) { this.retryMax = builder.getRetryMax(); this.retryIntervalMax = builder.getRetryIntervalMax(); this.coordinatorClientFactory = CoordinatorClientFactory.getInstance(); + this.heartBeatThreadNum = builder.getHeartBeatThreadNum(); this.heartBeatExecutorService = - ThreadUtils.getDaemonFixedThreadPool(builder.getHeartBeatThreadNum(), "client-heartbeat"); + ThreadUtils.getDaemonFixedThreadPool(heartBeatThreadNum, "client-heartbeat"); this.replica = builder.getReplica(); this.replicaWrite = builder.getReplicaWrite(); this.replicaRead = builder.getReplicaRead(); @@ -595,44 +594,40 @@ public void registerShuffle( } @Override - public void registerCoordinators(String coordinators) { - List clients = + public void registerCoordinators(String coordinators, long retryIntervalMs, int retryTimes) { + coordinatorClients = coordinatorClientFactory.createCoordinatorClient( - ClientType.valueOf(this.clientType), coordinators); - coordinatorClients.addAll(clients); + ClientType.valueOf(this.clientType), + coordinators, + retryIntervalMs, + retryTimes, + this.heartBeatThreadNum); } @Override public Map fetchClientConf(int timeoutMs) { - RssFetchClientConfResponse response = - new RssFetchClientConfResponse(StatusCode.INTERNAL_ERROR, "Empty coordinator clients"); - for (CoordinatorClient coordinatorClient : coordinatorClients) { - response = coordinatorClient.fetchClientConf(new RssFetchClientConfRequest(timeoutMs)); - if (response.getStatusCode() == StatusCode.SUCCESS) { - LOG.info("Success to get conf from {}", coordinatorClient.getDesc()); - break; - } else { - LOG.warn("Fail to get conf from {}", coordinatorClient.getDesc()); - } + if (coordinatorClients == null) { + return Maps.newHashMap(); + } + try { + return coordinatorClients + .fetchClientConf(new RssFetchClientConfRequest(timeoutMs)) + .getClientConf(); + } catch (RssException e) { + return Maps.newHashMap(); } - return response.getClientConf(); } @Override public RemoteStorageInfo fetchRemoteStorage(String appId) { - RemoteStorageInfo remoteStorage = new RemoteStorageInfo(""); - for (CoordinatorClient coordinatorClient : coordinatorClients) { - RssFetchRemoteStorageResponse response = - coordinatorClient.fetchRemoteStorage(new RssFetchRemoteStorageRequest(appId)); - if (response.getStatusCode() == StatusCode.SUCCESS) { - remoteStorage = response.getRemoteStorageInfo(); - LOG.info("Success to get storage {} from {}", remoteStorage, coordinatorClient.getDesc()); - break; - } else { - LOG.warn("Fail to get conf from {}", coordinatorClient.getDesc()); - } + if (coordinatorClients == null) { + return new RemoteStorageInfo(""); + } + try { + return coordinatorClients.fetchRemoteStorage(new RssFetchRemoteStorageRequest(appId)); + } catch (RssException e) { + return new RemoteStorageInfo(""); } - return remoteStorage; } @Override @@ -647,7 +642,9 @@ public ShuffleAssignmentsInfo getShuffleAssignments( Set faultyServerIds, int stageId, int stageAttemptNumber, - boolean reassign) { + boolean reassign, + long retryIntervalMs, + int retryTimes) { RssGetShuffleAssignmentsRequest request = new RssGetShuffleAssignmentsRequest( appId, @@ -665,31 +662,26 @@ public ShuffleAssignmentsInfo getShuffleAssignments( RssGetShuffleAssignmentsResponse response = new RssGetShuffleAssignmentsResponse(StatusCode.INTERNAL_ERROR); - for (CoordinatorClient coordinatorClient : coordinatorClients) { - try { - response = coordinatorClient.getShuffleAssignments(request); - } catch (Exception e) { - LOG.error(e.getMessage()); - } - - if (response.getStatusCode() == StatusCode.SUCCESS) { - LOG.info("Success to get shuffle server assignment from {}", coordinatorClient.getDesc()); - break; + try { + if (coordinatorClients != null) { + response = coordinatorClients.getShuffleAssignments(request, retryIntervalMs, retryTimes); } + } catch (RssException e) { + String msg = + "Error happened when getShuffleAssignments with appId[" + + appId + + "], shuffleId[" + + shuffleId + + "], numMaps[" + + partitionNum + + "], partitionNumPerRange[" + + partitionNumPerRange + + "] to coordinator. " + + "Error message: " + + response.getMessage(); + LOG.error(msg); + throw new RssException(msg); } - String msg = - "Error happened when getShuffleAssignments with appId[" - + appId - + "], shuffleId[" - + shuffleId - + "], numMaps[" - + partitionNum - + "], partitionNumPerRange[" - + partitionNumPerRange - + "] to coordinator. " - + "Error message: " - + response.getMessage(); - throwExceptionIfNecessary(response, msg); return new ShuffleAssignmentsInfo( response.getPartitionToServers(), response.getServerToPartitionRanges()); @@ -891,27 +883,9 @@ public Roaring64NavigableMap getShuffleResultForMultiPart( @Override public void registerApplicationInfo(String appId, long timeoutMs, String user) { RssApplicationInfoRequest request = new RssApplicationInfoRequest(appId, timeoutMs, user); - - ThreadUtils.executeTasks( - heartBeatExecutorService, - coordinatorClients, - coordinatorClient -> { - try { - RssApplicationInfoResponse response = - coordinatorClient.registerApplicationInfo(request); - if (response.getStatusCode() != StatusCode.SUCCESS) { - LOG.error("Failed to send applicationInfo to " + coordinatorClient.getDesc()); - } else { - LOG.info("Successfully send applicationInfo to " + coordinatorClient.getDesc()); - } - } catch (Exception e) { - LOG.warn( - "Error happened when send applicationInfo to " + coordinatorClient.getDesc(), e); - } - return null; - }, - timeoutMs, - "register application"); + if (coordinatorClients != null) { + coordinatorClients.registerApplicationInfo(request, timeoutMs); + } } @Override @@ -938,31 +912,17 @@ public void sendAppHeartbeat(String appId, long timeoutMs) { }, timeoutMs, "send heartbeat to shuffle server"); - - ThreadUtils.executeTasks( - heartBeatExecutorService, - coordinatorClients, - coordinatorClient -> { - try { - RssAppHeartBeatResponse response = coordinatorClient.sendAppHeartBeat(request); - if (response.getStatusCode() != StatusCode.SUCCESS) { - LOG.warn("Failed to send heartbeat to " + coordinatorClient.getDesc()); - } else { - LOG.info("Successfully send heartbeat to " + coordinatorClient.getDesc()); - } - } catch (Exception e) { - LOG.warn("Error happened when send heartbeat to " + coordinatorClient.getDesc(), e); - } - return null; - }, - timeoutMs, - "send heartbeat to coordinator"); + if (coordinatorClients != null) { + coordinatorClients.sendAppHeartBeat(request, timeoutMs); + } } @Override public void close() { heartBeatExecutorService.shutdownNow(); - coordinatorClients.forEach(CoordinatorClient::close); + if (coordinatorClients != null) { + coordinatorClients.close(); + } dataTransferPool.shutdownNow(); } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java index c01cb8d764..c04cac8a1f 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java @@ -27,6 +27,7 @@ import org.apache.uniffle.client.api.CoordinatorClient; import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcClient; +import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.exception.RssException; @@ -85,4 +86,15 @@ public synchronized List createCoordinatorClient( .collect(Collectors.joining(", "))); return coordinatorClients; } + + public synchronized CoordinatorGrpcRetryableClient createCoordinatorClient( + ClientType clientType, + String coordinators, + long retryIntervalMs, + int retryTimes, + int heartBeatThreadNum) { + List coordinatorClients = createCoordinatorClient(clientType, coordinators); + return new CoordinatorGrpcRetryableClient( + coordinatorClients, retryIntervalMs, retryTimes, heartBeatThreadNum); + } } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java new file mode 100644 index 0000000000..12663badd8 --- /dev/null +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client.impl.grpc; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.client.api.CoordinatorClient; +import org.apache.uniffle.client.request.RssAccessClusterRequest; +import org.apache.uniffle.client.request.RssAppHeartBeatRequest; +import org.apache.uniffle.client.request.RssApplicationInfoRequest; +import org.apache.uniffle.client.request.RssFetchClientConfRequest; +import org.apache.uniffle.client.request.RssFetchRemoteStorageRequest; +import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest; +import org.apache.uniffle.client.request.RssSendHeartBeatRequest; +import org.apache.uniffle.client.response.RssAccessClusterResponse; +import org.apache.uniffle.client.response.RssAppHeartBeatResponse; +import org.apache.uniffle.client.response.RssApplicationInfoResponse; +import org.apache.uniffle.client.response.RssFetchClientConfResponse; +import org.apache.uniffle.client.response.RssFetchRemoteStorageResponse; +import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse; +import org.apache.uniffle.common.RemoteStorageInfo; +import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.rpc.StatusCode; +import org.apache.uniffle.common.util.RetryUtils; +import org.apache.uniffle.common.util.ThreadUtils; + +public class CoordinatorGrpcRetryableClient { + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorGrpcRetryableClient.class); + private List coordinatorClients; + private long retryIntervalMs; + private int retryTimes; + private ExecutorService heartBeatExecutorService; + + public CoordinatorGrpcRetryableClient( + List coordinatorClients, + long retryIntervalMs, + int retryTimes, + int heartBeatThreadNum) { + this.coordinatorClients = coordinatorClients; + this.retryIntervalMs = retryIntervalMs; + this.retryTimes = retryTimes; + this.heartBeatExecutorService = + ThreadUtils.getDaemonFixedThreadPool(heartBeatThreadNum, "client-heartbeat"); + } + + public void sendAppHeartBeat(RssAppHeartBeatRequest request, long timeoutMs) { + ThreadUtils.executeTasks( + heartBeatExecutorService, + coordinatorClients, + coordinatorClient -> { + try { + RssAppHeartBeatResponse response = coordinatorClient.sendAppHeartBeat(request); + if (response.getStatusCode() != StatusCode.SUCCESS) { + LOG.warn("Failed to send heartbeat to " + coordinatorClient.getDesc()); + } else { + LOG.info("Successfully send heartbeat to " + coordinatorClient.getDesc()); + } + } catch (Exception e) { + LOG.warn("Error happened when send heartbeat to " + coordinatorClient.getDesc(), e); + } + return null; + }, + timeoutMs, + "send heartbeat to coordinator"); + } + + public void registerApplicationInfo(RssApplicationInfoRequest request, long timeoutMs) { + ThreadUtils.executeTasks( + heartBeatExecutorService, + coordinatorClients, + coordinatorClient -> { + try { + RssApplicationInfoResponse response = + coordinatorClient.registerApplicationInfo(request); + if (response.getStatusCode() != StatusCode.SUCCESS) { + LOG.error("Failed to send applicationInfo to " + coordinatorClient.getDesc()); + } else { + LOG.info("Successfully send applicationInfo to " + coordinatorClient.getDesc()); + } + } catch (Exception e) { + LOG.warn( + "Error happened when send applicationInfo to " + coordinatorClient.getDesc(), e); + } + return null; + }, + timeoutMs, + "register application"); + } + + public boolean sendHeartBeat(RssSendHeartBeatRequest request) { + AtomicBoolean sendSuccessfully = new AtomicBoolean(false); + ThreadUtils.executeTasks( + heartBeatExecutorService, + coordinatorClients, + client -> client.sendHeartBeat(request), + request.getTimeout() * 2, + "send heartbeat", + future -> { + try { + if (future.get(request.getTimeout() * 2, TimeUnit.MILLISECONDS).getStatusCode() + == StatusCode.SUCCESS) { + sendSuccessfully.set(true); + } + } catch (Exception e) { + LOG.error(e.getMessage()); + return null; + } + return null; + }); + + return sendSuccessfully.get(); + } + + public RssGetShuffleAssignmentsResponse getShuffleAssignments( + RssGetShuffleAssignmentsRequest request, long retryIntervalMs, int retryTimes) { + try { + return RetryUtils.retry( + () -> { + RssGetShuffleAssignmentsResponse response = null; + for (CoordinatorClient coordinatorClient : this.coordinatorClients) { + try { + response = coordinatorClient.getShuffleAssignments(request); + } catch (Exception e) { + LOG.error(e.getMessage()); + } + + if (response.getStatusCode() == StatusCode.SUCCESS) { + LOG.info( + "Success to get shuffle server assignment from {}", + coordinatorClient.getDesc()); + break; + } + } + if (response.getStatusCode() != StatusCode.SUCCESS) { + throw new RssException(response.getMessage()); + } + return response; + }, + retryIntervalMs, + retryTimes); + } catch (Throwable throwable) { + throw new RssException("getShuffleAssignments failed!", throwable); + } + } + + public RssAccessClusterResponse accessCluster( + RssAccessClusterRequest request, long retryIntervalMs, int retryTimes) { + try { + return RetryUtils.retry( + () -> { + RssAccessClusterResponse response = null; + for (CoordinatorClient coordinatorClient : this.coordinatorClients) { + response = coordinatorClient.accessCluster(request); + if (response.getStatusCode() == StatusCode.SUCCESS) { + LOG.warn( + "Success to access cluster {} using {}", + coordinatorClient.getDesc(), + request.getAccessId()); + break; + } + } + if (response.getStatusCode() == StatusCode.SUCCESS) { + return response; + } else if (response.getStatusCode() == StatusCode.ACCESS_DENIED) { + throw new RssException( + "Request to access cluster is denied using " + + request.getAccessId() + + " for " + + response.getMessage()); + } else { + throw new RssException("Fail to reach cluster for " + response.getMessage()); + } + }, + retryIntervalMs, + retryTimes); + } catch (Throwable throwable) { + throw new RssException("getShuffleAssignments failed!", throwable); + } + } + + public RssFetchClientConfResponse fetchClientConf(RssFetchClientConfRequest request) { + try { + return RetryUtils.retry( + () -> { + RssFetchClientConfResponse response = null; + for (CoordinatorClient coordinatorClient : this.coordinatorClients) { + response = coordinatorClient.fetchClientConf(request); + if (response.getStatusCode() == StatusCode.SUCCESS) { + LOG.info("Success to get conf from {}", coordinatorClient.getDesc()); + break; + } + } + if (response.getStatusCode() != StatusCode.SUCCESS) { + throw new RssException(response.getMessage()); + } + return response; + }, + this.retryIntervalMs, + this.retryTimes); + } catch (Throwable throwable) { + throw new RssException("Fail to get conf", throwable); + } + } + + public RemoteStorageInfo fetchRemoteStorage(RssFetchRemoteStorageRequest request) { + try { + return RetryUtils.retry( + () -> { + RssFetchRemoteStorageResponse response = null; + for (CoordinatorClient coordinatorClient : this.coordinatorClients) { + response = coordinatorClient.fetchRemoteStorage(request); + if (response.getStatusCode() == StatusCode.SUCCESS) { + LOG.info( + "Success to get storage {} from {}", + response.getRemoteStorageInfo(), + coordinatorClient.getDesc()); + break; + } + } + if (response.getStatusCode() != StatusCode.SUCCESS) { + throw new RssException(response.getMessage()); + } + return response.getRemoteStorageInfo(); + }, + this.retryIntervalMs, + this.retryTimes); + } catch (Throwable throwable) { + throw new RssException("Fail to get conf", throwable); + } + } + + public void close() { + coordinatorClients.forEach(CoordinatorClient::close); + } +}