Skip to content

Commit

Permalink
[apache#1750] feat(remote merge): Support Tez. (apache#2160)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Tez support remote merge.

### Why are the changes needed?

Fix: apache#1750 

### Does this PR introduce _any_ user-facing change?

Yes, I will refine documentation in other PR.

### How was this patch tested?

unit test, integration test, real job in cluster.
  • Loading branch information
zhengchenyu authored Oct 10, 2024
1 parent 267353e commit be23f61
Show file tree
Hide file tree
Showing 25 changed files with 2,208 additions and 46 deletions.
34 changes: 23 additions & 11 deletions client-tez/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,29 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-client</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,40 @@
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.tez.dag.records.TezTaskAttemptID;

public class GetShuffleServerRequest implements Writable {
private TezTaskAttemptID currentTaskAttemptID;
private int startIndex;
private int partitionNum;
private int shuffleId;
private String keyClassName;
private String valueClassName;
private String comparatorClassName;

public GetShuffleServerRequest() {}

public GetShuffleServerRequest(
TezTaskAttemptID currentTaskAttemptID, int startIndex, int partitionNum, int shuffleId) {
this(currentTaskAttemptID, startIndex, partitionNum, shuffleId, "", "", "");
}

public GetShuffleServerRequest(
TezTaskAttemptID currentTaskAttemptID,
int startIndex,
int partitionNum,
int shuffleId,
String keyClassName,
String valueClassName,
String comparatorClassName) {
this.currentTaskAttemptID = currentTaskAttemptID;
this.startIndex = startIndex;
this.partitionNum = partitionNum;
this.shuffleId = shuffleId;
this.keyClassName = keyClassName;
this.valueClassName = valueClassName;
this.comparatorClassName = comparatorClassName;
}

@Override
Expand All @@ -51,6 +69,9 @@ public void write(DataOutput output) throws IOException {
} else {
output.writeBoolean(false);
}
WritableUtils.writeString(output, keyClassName);
WritableUtils.writeString(output, valueClassName);
WritableUtils.writeString(output, comparatorClassName);
}

@Override
Expand All @@ -63,6 +84,9 @@ public void readFields(DataInput dataInput) throws IOException {
currentTaskAttemptID = new TezTaskAttemptID();
currentTaskAttemptID.readFields(dataInput);
}
keyClassName = WritableUtils.readString(dataInput);
valueClassName = WritableUtils.readString(dataInput);
comparatorClassName = WritableUtils.readString(dataInput);
}

@Override
Expand Down Expand Up @@ -94,4 +118,16 @@ public int getPartitionNum() {
public int getShuffleId() {
return shuffleId;
}

public String getKeyClassName() {
return keyClassName;
}

public String getValueClassName() {
return valueClassName;
}

public String getComparatorClassName() {
return comparatorClassName;
}
}
10 changes: 10 additions & 0 deletions client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ public class RssTezConfig {
public static final String RSS_SHUFFLE_MODE = TEZ_RSS_CONFIG_PREFIX + "shuffle.mode";
public static final String DEFAULT_RSS_SHUFFLE_MODE = "remote";

public static final String RSS_REMOTE_MERGE_ENABLE =
TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_MERGE_ENABLE;
public static final boolean RSS_REMOTE_MERGE_ENABLE_DEFAULT = false;
public static final String RSS_MERGED_BLOCK_SZIE =
TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_MERGED_BLOCK_SZIE;
public static final int RSS_MERGED_BLOCK_SZIE_DEFAULT =
RssClientConfig.RSS_MERGED_BLOCK_SZIE_DEFAULT;
public static final String RSS_REMOTE_MERGE_CLASS_LOADER =
TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_MERGE_CLASS_LOADER;

public static RssConf toRssConf(Configuration jobConf) {
RssConf rssConf = new RssConf();
for (Map.Entry<String, String> entry : jobConf) {
Expand Down
17 changes: 10 additions & 7 deletions client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput;
import org.apache.tez.runtime.library.input.RMRssOrderedGroupedKVInput;
import org.apache.tez.runtime.library.input.RssConcatenatedMergedKeyValueInput;
import org.apache.tez.runtime.library.input.RssConcatenatedMergedKeyValuesInput;
import org.apache.tez.runtime.library.input.RssOrderedGroupedInputLegacy;
Expand Down Expand Up @@ -122,7 +123,8 @@ public static ShuffleWriteClient createShuffleClient(Configuration conf) {
.replicaRead(replicaRead)
.replicaSkipEnabled(replicaSkipEnabled)
.dataTransferPoolSize(dataTransferPoolSize)
.dataCommitPoolSize(dataCommitPoolSize));
.dataCommitPoolSize(dataCommitPoolSize)
.rssConf(RssTezConfig.toRssConf(conf)));
return client;
}

Expand Down Expand Up @@ -423,13 +425,14 @@ public static String replaceRssOutputClassName(String className) {
}
}

public static String replaceRssInputClassName(String className) {
public static String replaceRssInputClassName(String className, boolean isRemoteMergeEnable) {
if (className.equals(OrderedGroupedKVInput.class.getName())) {
LOG.info(
"Input class name will transient from {} to {}",
className,
RssOrderedGroupedKVInput.class.getName());
return RssOrderedGroupedKVInput.class.getName();
String orderedInputClasName =
isRemoteMergeEnable
? RMRssOrderedGroupedKVInput.class.getName()
: RssOrderedGroupedKVInput.class.getName();
LOG.info("Input class name will transient from {} to {}", className, orderedInputClasName);
return orderedInputClasName;
} else if (className.equals(OrderedGroupedMergedKVInput.class.getName())) {
LOG.info(
"Input class name will transient from {} to {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,14 @@ public void onStateChanged(DAGImpl dag, DAGState dagState) {
outputDescriptor.getClass().getSuperclass().getDeclaredField("className");
inputClassNameField.setAccessible(true);
String inputClassName = (String) outputClassNameField.get(inputDescriptor);
String rssInputClassName = RssTezUtils.replaceRssInputClassName(inputClassName);
String rssInputClassName =
RssTezUtils.replaceRssInputClassName(
inputClassName,
appMaster
.getConfig()
.getBoolean(
RssTezConfig.RSS_REMOTE_MERGE_ENABLE,
RssTezConfig.RSS_REMOTE_MERGE_ENABLE_DEFAULT));
outputClassNameField.set(inputDescriptor, rssInputClassName);
}
} catch (IOException | IllegalAccessException | NoSuchFieldException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,13 @@ public GetShuffleServerResponse getShuffleAssignments(GetShuffleServerRequest re
if (shuffleIdToShuffleAssignsInfo.containsKey(shuffleId)) {
shuffleAssignmentsInfo = shuffleIdToShuffleAssignsInfo.get(shuffleId);
} else {
shuffleAssignmentsInfo = getShuffleWorks(request.getPartitionNum(), shuffleId);
shuffleAssignmentsInfo =
getShuffleWorks(
request.getPartitionNum(),
shuffleId,
request.getKeyClassName(),
request.getValueClassName(),
request.getComparatorClassName());
}

if (shuffleAssignmentsInfo == null) {
Expand Down Expand Up @@ -221,7 +227,12 @@ void removeShuffleInfo(int shuffleId) {
}
}

private ShuffleAssignmentsInfo getShuffleWorks(int partitionNum, int shuffleId) {
private ShuffleAssignmentsInfo getShuffleWorks(
int partitionNum,
int shuffleId,
String keyClassName,
String valueClassName,
String comparatorClassName) {
ShuffleAssignmentsInfo shuffleAssignmentsInfo;
int requiredAssignmentShuffleServersNum =
RssTezUtils.getRequiredShuffleServerNumber(conf, 200, partitionNum);
Expand Down Expand Up @@ -292,7 +303,15 @@ public ShuffleAssignmentsInfo run() throws Exception {
remoteStorage,
ShuffleDataDistributionType.NORMAL,
RssTezConfig.toRssConf(conf)
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE)));
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE),
0,
keyClassName,
valueClassName,
comparatorClassName,
conf.getInt(
RssTezConfig.RSS_MERGED_BLOCK_SZIE,
RssTezConfig.RSS_MERGED_BLOCK_SZIE_DEFAULT),
conf.get(RssTezConfig.RSS_REMOTE_MERGE_CLASS_LOADER)));
LOG.info(
"Finish register shuffle with "
+ (System.currentTimeMillis() - start)
Expand Down
Loading

0 comments on commit be23f61

Please sign in to comment.