From 95f536456dd58309438415ef74a2d795642cbba2 Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Tue, 23 Jul 2024 09:33:20 +0800 Subject: [PATCH] Revert to calc data size twice to keep consistent with original logic --- .../apache/uniffle/common/ShuffleBlockInfo.java | 8 ++++++-- .../uniffle/common/ShufflePartitionedBlock.java | 8 ++++++-- .../impl/grpc/ShuffleServerGrpcNettyClient.java | 17 +++++++++++++---- .../server/netty/ShuffleServerNettyHandler.java | 6 ++++-- 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java b/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java index 36dec5e257..c84fcbe40e 100644 --- a/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java +++ b/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java @@ -95,8 +95,12 @@ public int getLength() { return length; } - // calculate the data size for this block in memory including metadata which are - // partitionId, blockId, crc, taskAttemptId, length, uncompressLength + /** + * Calculate the data size for this block in memory including metadata which are + * partitionId, blockId, crc, taskAttemptId, uncompressLength and data length. + * This should be consistent with {@link ShufflePartitionedBlock#getSize()}. + * @return the encoded size of this object in memory + */ public int getSize() { return length + 3 * 8 + 2 * 4; } diff --git a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java index 1ce68b6b6b..326f6dd0f5 100644 --- a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java +++ b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java @@ -51,8 +51,12 @@ public ShufflePartitionedBlock( this.data = data; } - // calculate the data size for this block in memory including metadata which are - // blockId, crc, taskAttemptId, length, uncompressLength + /** + * Calculate the data size for this block in memory including metadata which are + * partitionId, blockId, crc, taskAttemptId, uncompressLength and data length. + * This should be consistent with {@link ShuffleBlockInfo#getSize()}. + * @return the encoded size of this object in memory + */ public long getSize() { return length + 3 * 8 + 2 * 4; } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java index 3cb28f5ddc..12d703fd71 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java @@ -147,10 +147,14 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ for (Map.Entry>> stb : shuffleIdToBlocks.entrySet()) { int shuffleId = stb.getKey(); + int size = 0; int blockNum = 0; List partitionIds = new ArrayList<>(); for (Map.Entry> ptb : stb.getValue().entrySet()) { - blockNum += ptb.getValue().size(); + for (ShuffleBlockInfo sbi : ptb.getValue()) { + size += sbi.getSize(); + blockNum++; + } partitionIds.add(ptb.getKey()); } @@ -163,11 +167,16 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ 0L, stb.getValue(), System.currentTimeMillis()); - // allocateSize = messageEncodedLength + messageTypeEncodedLength + bodyLength + - // messageEncodedLength + // allocateSize = MESSAGE_HEADER_SIZE + requestMessage encodedLength + data size // {@link org.apache.uniffle.common.netty.MessageEncoder#encode} + // We calculated the size again, even though sendShuffleDataRequest.encodedLength() + // already included the data size, because there is a brief moment when decoding + // sendShuffleDataRequest at the shuffle server, where there are two copies of data + // in direct memory. int allocateSize = - MessageEncoder.MESSAGE_HEADER_SIZE + sendShuffleDataRequest.encodedLength(); + MessageEncoder.MESSAGE_HEADER_SIZE + + sendShuffleDataRequest.encodedLength() + + size; int finalBlockNum = blockNum; try { RetryUtils.retryWithCondition( diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java index 27a3f1dc48..5fb5193e18 100644 --- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java +++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java @@ -28,6 +28,7 @@ import io.netty.channel.ChannelFutureListener; import org.apache.commons.collections.MapUtils; import org.apache.commons.collections4.CollectionUtils; +import org.apache.uniffle.common.netty.MessageEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,9 +113,10 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData // otherwise we need to release the required size. PreAllocatedBufferInfo info = shuffleTaskManager.getAndRemovePreAllocatedBuffer(requireBufferId); + int encodedLength = req.encodedLength() + MessageEncoder.MESSAGE_HEADER_SIZE; int requireSize = info == null ? 0 : info.getRequireSize(); int requireBlocksSize = - requireSize - req.encodedLength() < 0 ? 0 : requireSize - req.encodedLength(); + requireSize - encodedLength < 0 ? 0 : requireSize - encodedLength; boolean isPreAllocated = info != null; @@ -217,7 +219,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData return; } final long start = System.currentTimeMillis(); - shuffleBufferManager.releaseMemory(req.encodedLength(), false, true); + shuffleBufferManager.releaseMemory(encodedLength, false, true); List shufflePartitionedData = toPartitionedData(req); long alreadyReleasedSize = 0; boolean hasFailureOccurred = false;