Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2173] feat(remote merge): support netty for remote merge. #2202

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

zhengchenyu
Copy link
Collaborator

What changes were proposed in this pull request?

Support netty for remote merge. Use direct ByteBuf to replace with byte[] when netty is enable. And optimized code structure to avoid memory leaks

Why are the changes needed?

Fix: #2173

Does this PR introduce any user-facing change?

No.

How was this patch tested?

unit test, integration test, real job in cluster.

@zhengchenyu zhengchenyu marked this pull request as draft October 18, 2024 13:01
Copy link

github-actions bot commented Oct 18, 2024

Test Results

 2 926 files  ±  0   2 926 suites  ±0   6h 13m 57s ⏱️ + 3m 21s
 1 088 tests + 39   1 086 ✅ + 39   2 💤 ±0  0 ❌ ±0 
13 630 runs  +585  13 600 ✅ +585  30 💤 ±0  0 ❌ ±0 

Results for commit 3a68303. ± Comparison against base commit 43323bb.

This pull request removes 20 and adds 59 tests. Note that renamed tests count towards both.
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile1{String, File}[1]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile1{String, File}[2]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile2{String, File}[1]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile2{String, File}[2]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile3{String, File}[1]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile3{String, File}[2]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile4{String, File}[1]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile4{String, File}[2]
org.apache.uniffle.common.serializer.PartialInputStreamTest ‑ testReadFileInputStream
org.apache.uniffle.common.serializer.PartialInputStreamTest ‑ testReadMemroyInputStream
…
org.apache.uniffle.common.merger.MergerTest ‑ testMergeSegmentToFile{String, File}[2]
org.apache.uniffle.common.merger.MergerTest ‑ testMergeSegmentToFile{String, File}[3]
org.apache.uniffle.common.merger.MergerTest ‑ testMergeSegmentToFile{String, File}[4]
org.apache.uniffle.common.netty.protocol.NettyProtocolTest ‑ testGetSortedShuffleDataRequest
org.apache.uniffle.common.netty.protocol.NettyProtocolTest ‑ testGetSortedShuffleDataResponse
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFileUseDirect{String, File}[1]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFileUseDirect{String, File}[2]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile{String, File}[1]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile{String, File}[2]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile{String, File}[3]
…

♻️ This comment has been updated with latest results.

@zhengchenyu zhengchenyu deleted the issue-2173 branch October 21, 2024 06:52
@zhengchenyu zhengchenyu reopened this Oct 21, 2024
@zhengchenyu zhengchenyu marked this pull request as ready for review October 21, 2024 07:07
@zhengchenyu
Copy link
Collaborator Author

@jerqi Can you please review this PR?

if (raw) {
return new RawWritableSerializationStream(this, output);
if (shared) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use a shared serialization stream?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In client side, the parsed record will be used by reduce, so we need a deep copy instance, every record have their own buffer. if we use shared buffer, error will occur.

But in server side, we write the record to the mergedblock immediately after parsing the record, so there is no need for each record to have a separate memory copy. For a segment/block, we can use only two shared buffer, this saves more memory.

BTW, although this PR is about Netty, a lot of work has actually been done on saving memory

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name makes me confused. Because SharedSerializationStream needs to be operated by multiple threads. It will need the lock.

Copy link
Collaborator Author

@zhengchenyu zhengchenyu Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A SharedSerializationStream corresponds to a block. SharedSerializationStream will not be accessed by multiple threads, can only be executed under the merge thread corresponding to one partition. Here, 'Shared' means use shared buffer to merge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we give a better name for it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we give a better name for it?

In fact, I used to use shallow as name before. But I change to 'shared'.
Compare to RawWritableDeserializationStream, SharedRawWritableDeserializationStream use shared buffer to store record, but RawWritableDeserializationStream allocates a new buffer for each record.
If we need changed, RawWritableDeserializationStream rename to DeepRawWritableDeserializationStream, and SharedRawWritableDeserializationStream rename to ShallowRawWritableDeserializationStream. How about it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferDeserializationStream and PartitionDeserializationStream may be better.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have three stream:

  • WritableSerializationStream
  • RawWritableDeserializationStream
  • SharedRawWritableDeserializationStream

WritableSerializationStream is used to parse bytes into Java objects, mainly used on the reduce side of tez and spark.

RawWritableSerializationStream directly copies bytes without doing any actual deserialization. It is mainly used on the reduce side of mr, because mr requires raw interface.

SharedRawWritableDeserializationStream is similar to RawWritableSerializationStream, but uses some memory optimization methods. Mainly used for server-side merge. So deserialization is no needed.

The Raw prefix means that the bytes are copied directly without unnecessary serialization. I think it should not be deleted.

Now that, I think WritableSerializationStream and RawWritableDeserializationStream names are not changed, rename SharedRawWritableDeserializationStream to BufferRawDeserializationStream. How about this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE] [remote merge] support netty for remote merge.
2 participants