From e82b2303ece1f7947e76c317fd447f37cdb2fc2f Mon Sep 17 00:00:00 2001 From: wenlongwlli Date: Mon, 12 Aug 2024 14:53:51 +0800 Subject: [PATCH] fix(server): Fix memory leak when reach memory limit --- .../netty/protocol/SendShuffleDataRequest.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java index 9fefb98f6b..cde9a3ee26 100644 --- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java +++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java @@ -103,7 +103,21 @@ private static Map> decodePartitionData(ByteBuf int lengthOfShuffleBlocks = byteBuf.readInt(); List shuffleBlockInfoList = Lists.newArrayList(); for (int j = 0; j < lengthOfShuffleBlocks; j++) { - shuffleBlockInfoList.add(Decoders.decodeShuffleBlockInfo(byteBuf)); + try { + shuffleBlockInfoList.add(Decoders.decodeShuffleBlockInfo(byteBuf)); + } catch (Throwable t) { + // An OutOfDirectMemoryError will be thrown, when the direct memory reaches the limit. + // OutOfDirectMemoryError will not cause the JVM to exit, but may lead to direct memory + // leaks. + // Note: You can refer to docs/server_guide.md to set MAX_DIRECT_MEMORY_SIZE to a + // reasonable value. + shuffleBlockInfoList.forEach(sbi -> sbi.getData().release()); + partitionToBlocks.forEach( + (integer, shuffleBlockInfos) -> { + shuffleBlockInfos.forEach(sbi -> sbi.getData().release()); + }); + throw t; + } } partitionToBlocks.put(partitionId, shuffleBlockInfoList); }