Skip to content

Commit

Permalink
[apache#1416] feat(spark): support custom hadoop config in client side
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed Jan 4, 2024
1 parent 602611c commit d4b60d9
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;

import static org.apache.uniffle.common.config.RssClientConf.HADOOP_CONFIG_KEY_PREFIX;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED;

public abstract class RssShuffleManagerBase implements RssShuffleManagerInterface, ShuffleManager {
Expand Down Expand Up @@ -161,21 +162,31 @@ private static MapOutputTrackerMaster getMapOutputTrackerMaster() {
return tracker instanceof MapOutputTrackerMaster ? (MapOutputTrackerMaster) tracker : null;
}

private Map<String, String> parseRemoteStorageConf(Configuration conf) {
private static Map<String, String> parseRemoteStorageConf(Configuration conf) {
Map<String, String> confItems = Maps.newHashMap();
for (Map.Entry<String, String> entry : conf) {
confItems.put(entry.getKey(), entry.getValue());
}
return confItems;
}

protected RemoteStorageInfo getRemoteStorageInfo(SparkConf sparkConf) {
protected static RemoteStorageInfo getDefaultRemoteStorageInfo(SparkConf sparkConf) {
Map<String, String> confItems = Maps.newHashMap();
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
if (rssConf.getBoolean(RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED)) {
confItems = parseRemoteStorageConf(new Configuration(true));
}

for (String key : rssConf.getKeySet()) {
if (key.startsWith(HADOOP_CONFIG_KEY_PREFIX)) {
String val = rssConf.getString(key, null);
if (val != null) {
String extractedKey = key.replaceFirst(HADOOP_CONFIG_KEY_PREFIX, "");
confItems.put(extractedKey, val);
}
}
}

return new RemoteStorageInfo(
sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""), confItems);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.shuffle.manager;

import org.apache.spark.SparkConf;
import org.junit.jupiter.api.Test;

import org.apache.uniffle.common.RemoteStorageInfo;

import static org.junit.jupiter.api.Assertions.*;

public class RssShuffleManagerBaseTest {

@Test
public void testGetDefaultRemoteStorageInfo() {
SparkConf sparkConf = new SparkConf();
RemoteStorageInfo remoteStorageInfo =
RssShuffleManagerBase.getDefaultRemoteStorageInfo(sparkConf);
assertTrue(remoteStorageInfo.getConfItems().isEmpty());

sparkConf.set("spark.rss.hadoop.fs.defaultFs", "hdfs://rbf-xxx/foo");
remoteStorageInfo = RssShuffleManagerBase.getDefaultRemoteStorageInfo(sparkConf);
assertEquals(remoteStorageInfo.getConfItems().size(), 1);
assertEquals(remoteStorageInfo.getConfItems().get("fs.defaultFs"), "hdfs://rbf-xxx/foo");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ public <K, V, C> ShuffleHandle registerShuffle(
}

String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
RemoteStorageInfo defaultRemoteStorage = getRemoteStorageInfo(sparkConf);
RemoteStorageInfo defaultRemoteStorage = getDefaultRemoteStorageInfo(sparkConf);
RemoteStorageInfo remoteStorage =
ClientUtils.fetchRemoteStorage(
id.get(), defaultRemoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@
import static org.apache.uniffle.common.compression.Codec.Type.LZ4;

public class RssClientConf {
/**
* The prefix key for Hadoop conf. For Spark like that:
*
* <p>key: spark.rss.hadoop.fs.defaultFS val: hdfs://rbf-x1
*
* <p>The key will be extracted to the hadoop conf: "fs.defaultFS" and inject this into Hadoop
* storage configuration.
*/
public static final String HADOOP_CONFIG_KEY_PREFIX = "rss.hadoop.";

public static final ConfigOption<Codec.Type> COMPRESSION_TYPE =
ConfigOptions.key("rss.client.io.compression.codec")
Expand Down
17 changes: 9 additions & 8 deletions docs/client_guide/spark_client_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,15 @@ Local shuffle reader as its name indicates is suitable and optimized for spark's

The important configuration is listed as following.

| Property Name | Default | Description |
|-------------------------------------------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| spark.rss.writer.buffer.spill.size | 128m | Buffer size for total partition data |
| spark.rss.client.send.size.limit | 16m | The max data size sent to shuffle server |
| spark.rss.client.unregister.thread.pool.size | 10 | The max size of thread pool of unregistering |
| spark.rss.client.unregister.request.timeout.sec | 10 | The max timeout sec when doing unregister to remote shuffle-servers |
| spark.rss.client.off.heap.memory.enable | false | The client use off heap memory to process data |
| spark.rss.client.remote.storage.useLocalConfAsDefault | false | This option is only valid when the remote storage path is specified. If ture, the remote storage conf will use the client side hadoop configuration loaded from the classpath |
| Property Name | Default | Description |
|-------------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| spark.rss.writer.buffer.spill.size | 128m | Buffer size for total partition data |
| spark.rss.client.send.size.limit | 16m | The max data size sent to shuffle server |
| spark.rss.client.unregister.thread.pool.size | 10 | The max size of thread pool of unregistering |
| spark.rss.client.unregister.request.timeout.sec | 10 | The max timeout sec when doing unregister to remote shuffle-servers |
| spark.rss.client.off.heap.memory.enable | false | The client use off heap memory to process data |
| spark.rss.client.remote.storage.useLocalConfAsDefault | false | This option is only valid when the remote storage path is specified. If ture, the remote storage conf will use the client side hadoop configuration loaded from the classpath |
| spark.rss.hadoop.* | - | The prefix key for Hadoop conf. For Spark like that: `spark.rss.hadoop.fs.defaultFS=hdfs://rbf-x1`, this will be as `fs.defaultFS=hdfs://rbf-x1` for Hadoop storage |

### Adaptive Remote Shuffle Enabling
Currently, this feature only supports Spark.
Expand Down

0 comments on commit d4b60d9

Please sign in to comment.