Skip to content

Commit

Permalink
[apache#2015] improvement(netty): Support Netty for MR integration te…
Browse files Browse the repository at this point in the history
…st. (apache#2016)

### What changes were proposed in this pull request?

Support Netty for MR integration test.

### Why are the changes needed?

Fix: apache#2015

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Integration tests themselves.
  • Loading branch information
qijiale76 authored Oct 21, 2024
1 parent 28fd03a commit 43323bb
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.storage.util.StorageType;
Expand All @@ -41,18 +42,18 @@ protected static Map<String, String> getDynamicConf() {
Map<String, String> dynamicConf = new HashMap<>();
dynamicConf.put(RssMRConfig.RSS_REMOTE_STORAGE_PATH, HDFS_URI + "rss/test");
dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
dynamicConf.put(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
return dynamicConf;
}

@Test
public void dynamicConfTest() throws Exception {
run();
@ParameterizedTest
@MethodSource("clientTypeProvider")
public void dynamicConfTest(ClientType clientType) throws Exception {
run(clientType);
}

@Override
protected void updateRssConfiguration(Configuration jobConf) {
jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
protected void updateRssConfiguration(Configuration jobConf, ClientType clientType) {
jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.storage.util.StorageType;
Expand All @@ -41,14 +42,15 @@ protected static Map<String, String> getDynamicConf() {
return new HashMap<>();
}

@Test
public void hadoopConfTest() throws Exception {
run();
@ParameterizedTest
@MethodSource("clientTypeProvider")
public void hadoopConfTest(ClientType clientType) throws Exception {
run(clientType);
}

@Override
protected void updateRssConfiguration(Configuration jobConf) {
jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
protected void updateRssConfiguration(Configuration jobConf, ClientType clientType) {
jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
jobConf.set(RssMRConfig.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
jobConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, HDFS_URI + "rss/test");
jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.uniffle.common.ClientType;

Expand All @@ -34,14 +35,15 @@ public static void setupServers() throws Exception {
MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
}

@Test
public void largeSorterTest() throws Exception {
run();
@ParameterizedTest
@MethodSource("clientTypeProvider")
public void largeSorterTest(ClientType clientType) throws Exception {
run(clientType);
}

@Override
protected void updateRssConfiguration(Configuration jobConf) {
jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
protected void updateRssConfiguration(Configuration jobConf, ClientType clientType) {
jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
jobConf.set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -45,6 +46,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.provider.Arguments;

import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.rpc.ServerType;
Expand Down Expand Up @@ -75,6 +77,10 @@ public class MRIntegrationTestBase extends IntegrationTestBase {
private static final String OUTPUT_ROOT_DIR = "/tmp/" + TestMRJobs.class.getSimpleName();
private static final Path TEST_RESOURCES_DIR = new Path(TEST_ROOT_DIR, "localizedResources");

static Stream<Arguments> clientTypeProvider() {
return Stream.of(Arguments.of(ClientType.GRPC), Arguments.of(ClientType.GRPC_NETTY));
}

@BeforeAll
public static void setUpMRYarn() throws IOException {
mrYarnCluster = new MiniMRYarnCluster("test");
Expand All @@ -99,29 +105,29 @@ public static void tearDown() throws IOException {
}
}

public void run() throws Exception {
public void run(ClientType clientType) throws Exception {
JobConf appConf = new JobConf(mrYarnCluster.getConfig());
updateCommonConfiguration(appConf);
runOriginApp(appConf);
final String originPath = appConf.get("mapreduce.output.fileoutputformat.outputdir");
appConf = new JobConf(mrYarnCluster.getConfig());
updateCommonConfiguration(appConf);
runRssApp(appConf);
runRssApp(appConf, clientType);
String rssPath = appConf.get("mapreduce.output.fileoutputformat.outputdir");
verifyResults(originPath, rssPath);

appConf = new JobConf(mrYarnCluster.getConfig());
appConf.set("mapreduce.rss.reduce.remote.spill.enable", "true");
runRssApp(appConf);
runRssApp(appConf, clientType);
String rssRemoteSpillPath = appConf.get("mapreduce.output.fileoutputformat.outputdir");
verifyResults(originPath, rssRemoteSpillPath);
}

public void runWithRemoteMerge() throws Exception {
public void runWithRemoteMerge(ClientType clientType) throws Exception {
// 1 run application when remote merge is enable
JobConf appConf = new JobConf(mrYarnCluster.getConfig());
updateCommonConfiguration(appConf);
runRssApp(appConf, true);
runRssApp(appConf, true, clientType);
final String rssPath1 = appConf.get("mapreduce.output.fileoutputformat.outputdir");

// 2 run original application
Expand All @@ -142,11 +148,12 @@ private void runOriginApp(Configuration jobConf) throws Exception {
runMRApp(jobConf, getTestTool(), getTestArgs());
}

private void runRssApp(Configuration jobConf) throws Exception {
runRssApp(jobConf, false);
private void runRssApp(Configuration jobConf, ClientType clientType) throws Exception {
runRssApp(jobConf, false, clientType);
}

private void runRssApp(Configuration jobConf, boolean remoteMerge) throws Exception {
private void runRssApp(Configuration jobConf, boolean remoteMerge, ClientType clientType)
throws Exception {
URL url = MRIntegrationTestBase.class.getResource("/");
final String parentPath =
new Path(url.getPath()).getParent().getParent().getParent().getParent().toString();
Expand Down Expand Up @@ -185,19 +192,19 @@ private void runRssApp(Configuration jobConf, boolean remoteMerge) throws Except
}
assertNotNull(localFile);
String props = System.getProperty("java.class.path");
String newProps = "";
StringBuilder newProps = new StringBuilder();
String[] splittedProps = props.split(":");
for (String prop : splittedProps) {
if (!prop.contains("classes")
&& !prop.contains("grpc")
&& !prop.contains("rss-")
&& !prop.contains("shuffle-storage")) {
newProps = newProps + ":" + prop;
newProps.append(":").append(prop);
} else if (prop.contains("mr") && prop.contains("integration-test")) {
newProps = newProps + ":" + prop;
newProps.append(":").append(prop);
}
}
System.setProperty("java.class.path", newProps);
System.setProperty("java.class.path", newProps.toString());
Path newPath = new Path(HDFS_URI + "/rss.jar");
FileUtil.copy(file, fs, newPath, false, jobConf);
DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()), jobConf, fs);
Expand All @@ -208,8 +215,9 @@ private void runRssApp(Configuration jobConf, boolean remoteMerge) throws Except
+ ","
+ MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH);
jobConf.set(RssMRConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM);
updateRssConfiguration(jobConf);
updateRssConfiguration(jobConf, clientType);
runMRApp(jobConf, getTestTool(), getTestArgs());
fs.delete(newPath, true);
}

protected String[] getTestArgs() {
Expand All @@ -225,11 +233,14 @@ protected static void setupServers(Map<String, String> dynamicConf, ShuffleServe
CoordinatorConf coordinatorConf = getCoordinatorConf();
addDynamicConf(coordinatorConf, dynamicConf);
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC);
ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC);
ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY);
if (serverConf != null) {
shuffleServerConf.addAll(serverConf);
grpcShuffleServerConf.addAll(serverConf);
nettyShuffleServerConf.addAll(serverConf);
}
createShuffleServer(shuffleServerConf);
createShuffleServer(grpcShuffleServerConf);
createShuffleServer(nettyShuffleServerConf);
startServers();
}

Expand All @@ -240,8 +251,8 @@ protected static Map<String, String> getDynamicConf() {
return dynamicConf;
}

protected void updateRssConfiguration(Configuration jobConf) {
jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
protected void updateRssConfiguration(Configuration jobConf, ClientType clientType) {
jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
}

private void runMRApp(Configuration conf, Tool tool, String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.buffer.ShuffleBufferType;

Expand All @@ -51,10 +53,11 @@ public static void setupServers() throws Exception {
MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf(), serverConf);
}

@Test
public void wordCountTest() throws Exception {
@ParameterizedTest
@MethodSource("clientTypeProvider")
public void wordCountTest(ClientType clientType) throws Exception {
generateInputFile();
runWithRemoteMerge();
runWithRemoteMerge(clientType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.uniffle.common.ClientType;

public class SecondarySortTest extends MRIntegrationTestBase {

Expand All @@ -43,10 +46,11 @@ public static void setupServers() throws Exception {
MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
}

@Test
public void secondarySortTest() throws Exception {
@ParameterizedTest
@MethodSource("clientTypeProvider")
public void secondarySortTest(ClientType clientType) throws Exception {
generateInputFile();
run();
run(clientType);
}

private void generateInputFile() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.uniffle.common.ClientType;

public class WordCountTest extends MRIntegrationTestBase {

Expand All @@ -46,10 +49,11 @@ public static void setupServers() throws Exception {
MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
}

@Test
public void wordCountTest() throws Exception {
@ParameterizedTest
@MethodSource("clientTypeProvider")
public void wordCountTest(ClientType clientType) throws Exception {
generateInputFile();
run();
run(clientType);
}

@Override
Expand Down

0 comments on commit 43323bb

Please sign in to comment.