Skip to content

Commit

Permalink
Revert to calc data size twice to keep consistent with original logic
Browse files Browse the repository at this point in the history
  • Loading branch information
maobaolong committed Jul 23, 2024
1 parent fb03322 commit 95f5364
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ public int getLength() {
return length;
}

// calculate the data size for this block in memory including metadata which are
// partitionId, blockId, crc, taskAttemptId, length, uncompressLength
/**
* Calculate the data size for this block in memory including metadata which are
* partitionId, blockId, crc, taskAttemptId, uncompressLength and data length.
* This should be consistent with {@link ShufflePartitionedBlock#getSize()}.
* @return the encoded size of this object in memory
*/
public int getSize() {
return length + 3 * 8 + 2 * 4;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ public ShufflePartitionedBlock(
this.data = data;
}

// calculate the data size for this block in memory including metadata which are
// blockId, crc, taskAttemptId, length, uncompressLength
/**
* Calculate the data size for this block in memory including metadata which are
* partitionId, blockId, crc, taskAttemptId, uncompressLength and data length.
* This should be consistent with {@link ShuffleBlockInfo#getSize()}.
* @return the encoded size of this object in memory
*/
public long getSize() {
return length + 3 * 8 + 2 * 4;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,14 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb :
shuffleIdToBlocks.entrySet()) {
int shuffleId = stb.getKey();
int size = 0;
int blockNum = 0;
List<Integer> partitionIds = new ArrayList<>();
for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : stb.getValue().entrySet()) {
blockNum += ptb.getValue().size();
for (ShuffleBlockInfo sbi : ptb.getValue()) {
size += sbi.getSize();
blockNum++;
}
partitionIds.add(ptb.getKey());
}

Expand All @@ -163,11 +167,16 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
0L,
stb.getValue(),
System.currentTimeMillis());
// allocateSize = messageEncodedLength + messageTypeEncodedLength + bodyLength +
// messageEncodedLength
// allocateSize = MESSAGE_HEADER_SIZE + requestMessage encodedLength + data size
// {@link org.apache.uniffle.common.netty.MessageEncoder#encode}
// We calculated the size again, even though sendShuffleDataRequest.encodedLength()
// already included the data size, because there is a brief moment when decoding
// sendShuffleDataRequest at the shuffle server, where there are two copies of data
// in direct memory.
int allocateSize =
MessageEncoder.MESSAGE_HEADER_SIZE + sendShuffleDataRequest.encodedLength();
MessageEncoder.MESSAGE_HEADER_SIZE
+ sendShuffleDataRequest.encodedLength()
+ size;
int finalBlockNum = blockNum;
try {
RetryUtils.retryWithCondition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.netty.channel.ChannelFutureListener;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.uniffle.common.netty.MessageEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -112,9 +113,10 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
// otherwise we need to release the required size.
PreAllocatedBufferInfo info =
shuffleTaskManager.getAndRemovePreAllocatedBuffer(requireBufferId);
int encodedLength = req.encodedLength() + MessageEncoder.MESSAGE_HEADER_SIZE;
int requireSize = info == null ? 0 : info.getRequireSize();
int requireBlocksSize =
requireSize - req.encodedLength() < 0 ? 0 : requireSize - req.encodedLength();
requireSize - encodedLength < 0 ? 0 : requireSize - encodedLength;

boolean isPreAllocated = info != null;

Expand Down Expand Up @@ -217,7 +219,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
return;
}
final long start = System.currentTimeMillis();
shuffleBufferManager.releaseMemory(req.encodedLength(), false, true);
shuffleBufferManager.releaseMemory(encodedLength, false, true);
List<ShufflePartitionedData> shufflePartitionedData = toPartitionedData(req);
long alreadyReleasedSize = 0;
boolean hasFailureOccurred = false;
Expand Down

0 comments on commit 95f5364

Please sign in to comment.