Skip to content

Commit

Permalink
[apache#133] feat(netty): integration-test supports netty. (apache#1008)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

For apache#133 

### Why are the changes needed?
Make integration-test support netty.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

UTs.

Co-authored-by: leixianming <[email protected]>
  • Loading branch information
leixm and leixianming authored Jul 25, 2023
1 parent 6fb2a9a commit 96eab1c
Show file tree
Hide file tree
Showing 17 changed files with 66 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,17 @@ public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockI
+ " bytes");
// Use final temporary variables for closures
final long _memoryUsed = memoryUsed;
final List<ShuffleBlockInfo> finalShuffleBlockInfosPerEvent = shuffleBlockInfoList;
events.add(
new AddBlockEvent(
taskId, shuffleBlockInfosPerEvent, () -> freeAllocatedMemory(_memoryUsed)));
taskId,
shuffleBlockInfosPerEvent,
() -> {
freeAllocatedMemory(_memoryUsed);
for (ShuffleBlockInfo shuffleBlockInfo : finalShuffleBlockInfosPerEvent) {
shuffleBlockInfo.getData().release();
}
}));
}
return events;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.apache.uniffle.common.util.ByteBufUtils;

public class ShuffleBlockInfo {

private int partitionId;
Expand Down Expand Up @@ -150,4 +152,8 @@ public String toString() {

return sb.toString();
}

public synchronized void copyDataTo(ByteBuf to) {
ByteBufUtils.copyByteBuf(data, to);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;

import org.apache.uniffle.common.util.UnitConverter;
Expand Down Expand Up @@ -665,4 +666,9 @@ public String toString() {
public String getEnv(String key) {
return System.getenv(key);
}

@VisibleForTesting
public void remove(String key) {
this.settings.remove(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
} catch (Exception e) {
LOG.error("Unexpected exception during process encode!", e);
byteBuf.release();
throw e;
}
ctx.writeAndFlush(byteBuf);
// do transferTo send data after encode buffer send.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public ChannelFuture sendRpc(Message message, RpcResponseCallback callback) {
if (logger.isTraceEnabled()) {
logger.trace("Pushing data to {}", NettyUtils.getRemoteAddress(channel));
}
long requestId = requestId();
long requestId = message.getRequestId();
handler.addResponseCallback(requestId, callback);
RpcChannelListener listener = new RpcChannelListener(requestId, callback);
return channel.writeAndFlush(message).addListener(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public static void encodeShuffleBlockInfo(ShuffleBlockInfo shuffleBlockInfo, Byt
byteBuf.writeLong(shuffleBlockInfo.getCrc());
byteBuf.writeLong(shuffleBlockInfo.getTaskAttemptId());
// todo: avoid copy
ByteBufUtils.copyByteBuf(shuffleBlockInfo.getData(), byteBuf);
shuffleBlockInfo.getData().release();
shuffleBlockInfo.copyDataTo(byteBuf);
List<ShuffleServerInfo> shuffleServerInfoList = shuffleBlockInfo.getShuffleServerInfos();
byteBuf.writeInt(shuffleServerInfoList.size());
for (ShuffleServerInfo shuffleServerInfo : shuffleServerInfoList) {
Expand All @@ -64,7 +63,8 @@ public static int encodeLengthOfShuffleBlockInfo(ShuffleBlockInfo shuffleBlockIn
int encodeLength =
4 * Long.BYTES
+ 4 * Integer.BYTES
+ ByteBufUtils.encodedLength(shuffleBlockInfo.getData())
+ Integer.BYTES
+ shuffleBlockInfo.getLength()
+ Integer.BYTES;
for (ShuffleServerInfo shuffleServerInfo : shuffleBlockInfo.getShuffleServerInfos()) {
encodeLength += encodeLengthOfShuffleServerInfo(shuffleServerInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,6 @@ public static Message decode(Type msgType, ByteBuf in) {
throw new IllegalArgumentException("Unexpected message type: " + msgType);
}
}

public abstract long getRequestId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testSendShuffleDataRequest() {
1,
1,
1,
10,
data.length,
123,
Unpooled.wrappedBuffer(data).retain(),
shuffleServerInfoList,
Expand All @@ -61,7 +61,7 @@ public void testSendShuffleDataRequest() {
1,
1,
1,
10,
data.length,
123,
Unpooled.wrappedBuffer(data).retain(),
shuffleServerInfoList,
Expand All @@ -74,7 +74,7 @@ public void testSendShuffleDataRequest() {
1,
2,
1,
10,
data.length,
123,
Unpooled.wrappedBuffer(data).retain(),
shuffleServerInfoList,
Expand All @@ -85,7 +85,7 @@ public void testSendShuffleDataRequest() {
1,
1,
2,
10,
data.length,
123,
Unpooled.wrappedBuffer(data).retain(),
shuffleServerInfoList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static void setupServers() throws Exception {
coordinatorConf.setLong("rss.coordinator.server.heartbeat.timeout", 3000);
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.remove(ShuffleServerConf.NETTY_SERVER_PORT.key());
createShuffleServer(shuffleServerConf);
shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 1);
shuffleServerConf.setInteger("rss.jetty.http.port", 18081);
Expand Down Expand Up @@ -155,6 +156,7 @@ public void getShuffleAssignmentsTest() throws Exception {
withEnvironmentVariables("RSS_ENV_KEY", storageTypeJsonSource)
.execute(
() -> {
shuffleServerConf.remove(ShuffleServerConf.NETTY_SERVER_PORT.key());
ShuffleServer ss = new ShuffleServer((ShuffleServerConf) shuffleServerConf);
ss.start();
shuffleServers.set(0, ss);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -60,6 +61,9 @@ public abstract class IntegrationTestBase extends HadoopTestBase {
protected static List<ShuffleServer> shuffleServers = Lists.newArrayList();
protected static List<CoordinatorServer> coordinators = Lists.newArrayList();

protected static final int NETTY_PORT = 21000;
protected static AtomicInteger nettyPortCounter = new AtomicInteger();

public static void startServers() throws Exception {
for (CoordinatorServer coordinator : coordinators) {
coordinator.start();
Expand Down Expand Up @@ -123,6 +127,9 @@ protected static ShuffleServerConf getShuffleServerConf() throws Exception {
serverConf.setBoolean("rss.server.health.check.enable", false);
serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
serverConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL, 500L);
serverConf.setInteger(
ShuffleServerConf.NETTY_SERVER_PORT, NETTY_PORT + nettyPortCounter.getAndIncrement());
serverConf.setString("rss.server.tags", "GRPC,GRPC_NETTY");
return serverConf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ public Map runTest(SparkSession spark, String fileName) throws Exception {
map = javaPairRDD.collectAsMap();
shufflePath = appPath + "/1";
assertTrue(fs.exists(new Path(shufflePath)));
} else {
runCounter++;
}
runCounter++;
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,8 @@ public Map runTest(SparkSession spark, String fileName) throws Exception {
map = javaPairRDD.collectAsMap();
shufflePath = appPath + "/1";
assertTrue(new File(shufflePath).exists());
} else {
runCounter++;
}
runCounter++;
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,23 @@ public void run() throws Exception {
updateSparkConfCustomer(sparkConf);
start = System.currentTimeMillis();
Map resultWithRss = runSparkApp(sparkConf, fileName);
long durationWithRss = System.currentTimeMillis() - start;
final long durationWithRss = System.currentTimeMillis() - start;

updateSparkConfWithRssNetty(sparkConf);
start = System.currentTimeMillis();
Map resultWithRssNetty = runSparkApp(sparkConf, fileName);
final long durationWithRssNetty = System.currentTimeMillis() - start;
verifyTestResult(resultWithoutRss, resultWithRss);
verifyTestResult(resultWithoutRss, resultWithRssNetty);

LOG.info(
"Test: durationWithoutRss["
+ durationWithoutRss
+ "], durationWithRss["
+ durationWithRss
+ "]"
+ "], durationWithRssNetty["
+ durationWithRssNetty
+ "]");
}

Expand Down Expand Up @@ -110,6 +118,10 @@ public void updateSparkConfWithRss(SparkConf sparkConf) {
sparkConf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE.key(), "true");
}

public void updateSparkConfWithRssNetty(SparkConf sparkConf) {
sparkConf.set(RssSparkConfig.RSS_CLIENT_TYPE, "GRPC_NETTY");
}

protected void verifyTestResult(Map expected, Map actual) {
assertEquals(expected.size(), actual.size());
for (Object expectedKey : expected.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private static void createShuffleServers() throws Exception {
ShuffleServerConf serverConf = new ShuffleServerConf();
dataFolder.deleteOnExit();
serverConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + i);
serverConf.setInteger("rss.server.netty.port", NETTY_PORT + i);
serverConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE_HDFS.name());
serverConf.setString("rss.storage.basePath", dataFolder.getAbsolutePath());
serverConf.setString("rss.server.buffer.capacity", String.valueOf(671088640 - i));
Expand All @@ -94,6 +95,7 @@ private static void createShuffleServers() throws Exception {
serverConf.setString("rss.server.hadoop.dfs.replication", "2");
serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 1024L);
serverConf.setBoolean("rss.server.health.check.enable", false);
serverConf.setString("rss.server.tags", "GRPC,GRPC_NETTY");
createMockedShuffleServer(serverConf);
}
enableRecordGetShuffleResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private static void createShuffleServers() throws Exception {
ShuffleServerConf serverConf = new ShuffleServerConf();
dataFolder.deleteOnExit();
serverConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + i);
serverConf.setInteger("rss.server.netty.port", NETTY_PORT + i);
serverConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE_HDFS.name());
serverConf.setString("rss.storage.basePath", dataFolder.getAbsolutePath());
serverConf.setString("rss.server.buffer.capacity", "671088640");
Expand All @@ -98,6 +99,7 @@ private static void createShuffleServers() throws Exception {
serverConf.setString("rss.server.hadoop.dfs.replication", "2");
serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 1024L);
serverConf.setBoolean("rss.server.health.check.enable", false);
serverConf.setString("rss.server.tags", "GRPC,GRPC_NETTY");
createMockedShuffleServer(serverConf);
}
enableRecordGetShuffleResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public CoordinatorClientFactory(ClientType clientType) {
}

public CoordinatorClient createCoordinatorClient(String host, int port) {
if (clientType.equals(ClientType.GRPC)) {
if (clientType.equals(ClientType.GRPC) || clientType.equals(ClientType.GRPC_NETTY)) {
return new CoordinatorGrpcClient(host, port);
} else {
throw new UnsupportedOperationException("Unsupported client type " + clientType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
responseMessage = errorMsg;
rpcResponse =
new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, responseMessage);
client.sendRpcSync(rpcResponse, RPC_TIMEOUT);
client.getChannel().writeAndFlush(rpcResponse);
return;
}
final long start = System.currentTimeMillis();
Expand Down Expand Up @@ -209,7 +209,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, "No data in request");
}

client.sendRpcSync(rpcResponse, RPC_TIMEOUT);
client.getChannel().writeAndFlush(rpcResponse);
}

public void handleGetMemoryShuffleDataRequest(
Expand Down Expand Up @@ -292,7 +292,7 @@ public void handleGetMemoryShuffleDataRequest(
new GetMemoryShuffleDataResponse(
req.getRequestId(), status, msg, Lists.newArrayList(), Unpooled.EMPTY_BUFFER);
}
client.sendRpcSync(response, RPC_TIMEOUT);
client.getChannel().writeAndFlush(response);
}

public void handleGetLocalShuffleIndexRequest(
Expand Down Expand Up @@ -374,7 +374,7 @@ public void handleGetLocalShuffleIndexRequest(
new GetLocalShuffleIndexResponse(
req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
}
client.sendRpcSync(response, RPC_TIMEOUT);
client.getChannel().writeAndFlush(response);
}

public void handleGetLocalShuffleData(TransportClient client, GetLocalShuffleDataRequest req) {
Expand Down Expand Up @@ -471,7 +471,7 @@ public void handleGetLocalShuffleData(TransportClient client, GetLocalShuffleDat
new GetLocalShuffleDataResponse(
req.getRequestId(), status, msg, new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
}
client.sendRpcSync(response, RPC_TIMEOUT);
client.getChannel().writeAndFlush(response);
}

private List<ShufflePartitionedData> toPartitionedData(SendShuffleDataRequest req) {
Expand Down

0 comments on commit 96eab1c

Please sign in to comment.