Skip to content

Commit

Permalink
[apache#1110] improvement: the value of remote storage conf should su…
Browse files Browse the repository at this point in the history
…pport comma.
  • Loading branch information
zhengchenyu committed Aug 10, 2023
1 parent 91d94e0 commit a2d1589
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public RemoteStorageInfo(String path, String confString) {
this.path = path;
this.confItems = Maps.newHashMap();
if (!StringUtils.isEmpty(confString)) {
String[] items = confString.split(Constants.COMMA_SPLIT_CHAR);
String[] items = confString.split(Constants.SEMICOLON_SPLIT_CHAR);
if (!ArrayUtils.isEmpty(items)) {
for (String item : items) {
String[] kv = item.split(Constants.EQUAL_SPLIT_CHAR);
Expand Down Expand Up @@ -81,7 +81,7 @@ public String getConfString() {
}
return confItems.entrySet().stream()
.map(e -> String.join("=", e.getKey(), e.getValue()))
.collect(Collectors.joining(","));
.collect(Collectors.joining(Constants.SEMICOLON_SPLIT_CHAR));
}

@Override
Expand Down Expand Up @@ -128,7 +128,7 @@ public String toString() {
return String.join(
Constants.COMMA_SPLIT_CHAR,
path,
Joiner.on(",").withKeyValueSeparator("=").join(confItems));
Joiner.on(Constants.SEMICOLON_SPLIT_CHAR).withKeyValueSeparator("=").join(confItems));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

public class RemoteStorageInfoTest {
private static final String TEST_PATH = "hdfs://test";
private static final String CONF_STRING = "k1=v1,k2=v2";
private static final String CONF_STRING = "k1=v1;k2=v2";
private static final Map<String, String> confMap = ImmutableMap.of("k1", "v1", "k2", "v2");

private static Stream<Arguments> confItemsParams() {
Expand Down Expand Up @@ -85,7 +85,7 @@ public void testRemoteStorageInfo(
}

@ParameterizedTest
@ValueSource(strings = {",", "=,", ",="})
@ValueSource(strings = {";", "=;", ";="})
public void testUncommonConfString(String confString) {
RemoteStorageInfo info = new RemoteStorageInfo(TEST_PATH, confString);
assertEquals(TEST_PATH, info.getPath());
Expand All @@ -106,7 +106,7 @@ public void testEquals() {
}

@ParameterizedTest
@ValueSource(strings = {"k1=v1", "k1=v1,k2=v3", "k1=v1,k3=v2"})
@ValueSource(strings = {"k1=v1", "k1=v1;k2=v3", "k1=v1;k3=v2"})
public void testNotEquals(String confString) {
RemoteStorageInfo info = new RemoteStorageInfo(TEST_PATH, CONF_STRING);
RemoteStorageInfo info2 = new RemoteStorageInfo(TEST_PATH, confString);
Expand All @@ -119,4 +119,14 @@ public void testHashCode() {
RemoteStorageInfo info1 = new RemoteStorageInfo(TEST_PATH, CONF_STRING);
assertEquals(info.hashCode(), info1.hashCode());
}

@ParameterizedTest
@ValueSource(strings = {"k1=v1,v11;k2=v2,v22"})
public void testParseReservedSymbol(String confString) {
RemoteStorageInfo info = new RemoteStorageInfo(TEST_PATH, confString);
assertEquals(TEST_PATH, info.getPath());
assertEquals("v1,v11", info.getConfItems().get("k1"));
assertEquals("v2,v22", info.getConfItems().get("k2"));
assertEquals("k1=v1,v11;k2=v2,v22", info.getConfString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -148,6 +149,13 @@ public void refreshAppId(String appId) {
}

public void refreshRemoteStorage(String remoteStoragePath, String remoteStorageConf) {
refreshRemoteStorage(remoteStoragePath, remoteStorageConf, new HashMap<>());
}

public void refreshRemoteStorage(
String remoteStoragePath,
String remoteStorageConf,
Map<String, String> remoteStorageConfByCluster) {
if (!StringUtils.isEmpty(remoteStoragePath)) {
LOG.info("Refresh remote storage with {} {}", remoteStoragePath, remoteStorageConf);
Set<String> paths = Sets.newHashSet(remoteStoragePath.split(Constants.COMMA_SPLIT_CHAR));
Expand All @@ -166,8 +174,16 @@ public void refreshRemoteStorage(String remoteStoragePath, String remoteStorageC
});
}
String storageHost = getStorageHost(path);
RemoteStorageInfo rsInfo =
new RemoteStorageInfo(path, confKVs.getOrDefault(storageHost, Maps.newHashMap()));
RemoteStorageInfo rsInfo;
String newRemoteStoragePath = remoteStorageConfByCluster.get(storageHost);
// rss.coordinator.remote.storage.conf.{cluster} have higher priority than
// cluster conf in rss.coordinator.remote.storage.cluster.conf
if (StringUtils.isNotBlank(newRemoteStoragePath)) {
rsInfo = new RemoteStorageInfo(path, newRemoteStoragePath);
} else {
rsInfo =
new RemoteStorageInfo(path, confKVs.getOrDefault(storageHost, Maps.newHashMap()));
}
availableRemoteStorageInfo.put(path, rsInfo);
}
// remove unused remote path if exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -111,6 +112,7 @@ private void updateClientConfInternal() {
boolean hasRemoteStorageConf = false;
String remoteStoragePath = "";
String remoteStorageConf = "";
Map<String, String> remoteStorageConfByCluster = new HashMap<>();
for (String item : content.split(IOUtils.LINE_SEPARATOR_UNIX)) {
String confItem = item.trim();
if (!StringUtils.isEmpty(confItem)) {
Expand All @@ -123,14 +125,22 @@ private void updateClientConfInternal() {
.key()
.equals(confKV[0])) {
remoteStorageConf = confKV[1];
} else if (confKV[0].startsWith(
CoordinatorConf.COORDINATOR_REMOTE_STORAGE_CONF_BY_CLUSTER.key())) {
hasRemoteStorageConf = true;
remoteStorageConfByCluster.put(
confKV[0].substring(
CoordinatorConf.COORDINATOR_REMOTE_STORAGE_CONF_BY_CLUSTER.key().length() + 1),
confKV[1]);
} else {
newClientConf.put(confKV[0], confKV[1]);
}
}
}
}
if (hasRemoteStorageConf) {
applicationManager.refreshRemoteStorage(remoteStoragePath, remoteStorageConf);
applicationManager.refreshRemoteStorage(
remoteStoragePath, remoteStorageConf, remoteStorageConfByCluster);
}

clientConf = newClientConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ public class CoordinatorConf extends RssBaseConf {
.noDefaultValue()
.withDescription(
"Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
public static final ConfigOption<String> COORDINATOR_REMOTE_STORAGE_CONF_BY_CLUSTER =
ConfigOptions.key("rss.coordinator.remote.storage.conf")
.stringType()
.noDefaultValue()
.withDescription(
"Remote Storage related conf in specific cluster. This config allow ',' in remote storage config. "
+ "For example: rss.coordinator.remote.storage.conf.$cluster $key1=$value1;$key2=$value21,$value22\n");
public static final ConfigOption<ApplicationManager.StrategyName>
COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY =
ConfigOptions.key("rss.coordinator.remote.storage.select.strategy")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.util.Map;
import java.util.Set;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -38,10 +40,10 @@ public class ApplicationManagerTest {

private ApplicationManager applicationManager;
private long appExpiredTime = 2000L;
private String remotePath1 = "hdfs://path1";
private String remotePath2 = "hdfs://path2";
private String remotePath3 = "hdfs://path3";
private String remoteStorageConf = "path1,k1=v1,k2=v2;path2,k3=v3";
private String remotePath1 = "hdfs://ns1/path1";
private String remotePath2 = "hdfs://ns2/path2";
private String remotePath3 = "hdfs://ns3/path3";
private String remoteStorageConf = "ns1,k1=v1,k2=v2;ns2,k3=v3";

@BeforeAll
public static void setup() {
Expand Down Expand Up @@ -104,6 +106,62 @@ public void refreshTest() {
assertEquals(
expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet());
assertFalse(applicationManager.hasErrorInStatusCheck());

// Load the remote storage conf from rss.coordinator.remote.storage.conf.{cluster}
remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2;
expectedAvailablePath = Sets.newHashSet(remotePath1, remotePath2);
Map<String, String> remoteStorageConfByCluster =
ImmutableMap.of("ns1", "k1=v1;k2=v2,v22", "ns2", "k3=v3,v33;k4=v4");
applicationManager.refreshRemoteStorage(remoteStoragePath, "", remoteStorageConfByCluster);
storages = applicationManager.getAvailableRemoteStorageInfo();
Assertions.assertEquals(2, storages.size());
remoteStorageInfo = storages.get(remotePath1);
assertEquals(2, remoteStorageInfo.getConfItems().size());
assertEquals("v1", remoteStorageInfo.getConfItems().get("k1"));
assertEquals("v2,v22", remoteStorageInfo.getConfItems().get("k2"));
remoteStorageInfo = storages.get(remotePath2);
assertEquals(2, remoteStorageInfo.getConfItems().size());
assertEquals("v3,v33", remoteStorageInfo.getConfItems().get("k3"));
assertEquals("v4", remoteStorageInfo.getConfItems().get("k4"));
assertEquals(
expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet());
assertEquals(
expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet());
assertFalse(applicationManager.hasErrorInStatusCheck());

// Load the remote storage conf from rss.coordinator.remote.storage.conf.{cluster} and
// rss.coordinator.remote.storage.cluster.conf. Notice:
// rss.coordinator.remote.storage.conf.{cluster}
// have a higher priority.
remoteStoragePath =
remotePath1
+ Constants.COMMA_SPLIT_CHAR
+ remotePath2
+ Constants.COMMA_SPLIT_CHAR
+ remotePath3;
expectedAvailablePath = Sets.newHashSet(remotePath1, remotePath2, remotePath3);
remoteStorageConfByCluster =
ImmutableMap.of("ns1", "k1=v1;k2=v2,v22", "ns3", "k3=v3,v33;k4=v4");
applicationManager.refreshRemoteStorage(
remoteStoragePath, remoteStorageConf, remoteStorageConfByCluster);
storages = applicationManager.getAvailableRemoteStorageInfo();
Assertions.assertEquals(3, storages.size());
remoteStorageInfo = storages.get(remotePath1);
assertEquals(2, remoteStorageInfo.getConfItems().size());
assertEquals("v1", remoteStorageInfo.getConfItems().get("k1"));
assertEquals("v2,v22", remoteStorageInfo.getConfItems().get("k2"));
remoteStorageInfo = storages.get(remotePath2);
assertEquals(1, remoteStorageInfo.getConfItems().size());
assertEquals("v3", remoteStorageInfo.getConfItems().get("k3"));
remoteStorageInfo = storages.get(remotePath3);
assertEquals(2, remoteStorageInfo.getConfItems().size());
assertEquals("v3,v33", remoteStorageInfo.getConfItems().get("k3"));
assertEquals("v4", remoteStorageInfo.getConfItems().get("k4"));
assertEquals(
expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet());
assertEquals(
expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet());
assertFalse(applicationManager.hasErrorInStatusCheck());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -138,6 +139,36 @@ public void test(@TempDir File tempDir) throws Exception {
assertEquals(2, clientConf.size());
assertFalse(clientConf.containsKey("spark.mock.6"));
assertFalse(clientConf.containsKey("spark.mock.7"));

// load remote storage config
cfgFileTmp = new File(cfgFileName + ".tmp");
fileWriter = new FileWriter(cfgFileTmp);
printWriter = new PrintWriter(fileWriter);
printWriter.println(
"rss.coordinator.remote.storage.path hdfs://ns1/path1,hdfs://ns2/path2,hdfs://ns3/path3");
printWriter.println("rss.coordinator.remote.storage.conf.ns1 k1=v1;k2=v2,v22");
printWriter.println("rss.coordinator.remote.storage.conf.ns3 k3=v3,v33;k4=v4");
printWriter.println("rss.coordinator.remote.storage.cluster.conf ns1,k1=v1,k2=v2;ns2,k3=v3");
printWriter.close();
assertTrue(cfgFile.delete());
FileUtils.moveFile(cfgFileTmp, cfgFile);
Set expectedAvailablePath =
Sets.newHashSet("hdfs://ns1/path1", "hdfs://ns2/path2", "hdfs://ns3/path3");
waitForUpdate(expectedAvailablePath, applicationManager);
Map<String, RemoteStorageInfo> storages = applicationManager.getAvailableRemoteStorageInfo();
Assertions.assertEquals(3, storages.size());
RemoteStorageInfo remoteStorageInfo = storages.get("hdfs://ns1/path1");
assertEquals(2, remoteStorageInfo.getConfItems().size());
assertEquals("v1", remoteStorageInfo.getConfItems().get("k1"));
assertEquals("v2,v22", remoteStorageInfo.getConfItems().get("k2"));
remoteStorageInfo = storages.get("hdfs://ns2/path2");
assertEquals(1, remoteStorageInfo.getConfItems().size());
assertEquals("v3", remoteStorageInfo.getConfItems().get("k3"));
remoteStorageInfo = storages.get("hdfs://ns3/path3");
assertEquals(2, remoteStorageInfo.getConfItems().size());
assertEquals("v3,v33", remoteStorageInfo.getConfItems().get("k3"));
assertEquals("v4", remoteStorageInfo.getConfItems().get("k4"));
assertFalse(applicationManager.hasErrorInStatusCheck());
clientConfManager.close();
}

Expand Down
1 change: 1 addition & 0 deletions docs/coordinator_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ This document will introduce how to deploy Uniffle coordinators.
|rss.coordinator.dynamicClientConf.path|-| The dynamic client conf of this cluster and can be stored in HADOOP FS or local |
|rss.coordinator.dynamicClientConf.updateIntervalSec|120| The dynamic client conf update interval in seconds |
|rss.coordinator.remote.storage.cluster.conf|-| Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';' |
|rss.coordinator.remote.storage.conf.{cluster}|-| Remote Storage related conf by cluster with format rss.coordinator.remote.storage.conf.ns1 $key1=$value1,$value11;$key2=$value2 |
|rss.rpc.server.port|-| RPC port for coordinator |
|rss.jetty.http.port|-| Http port for coordinator |
|rss.coordinator.remote.storage.select.strategy|APP_BALANCE| Strategy for selecting the remote path |
Expand Down

0 comments on commit a2d1589

Please sign in to comment.