Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2148] improvement(server): Rename ShuffleBufferWithLinkedList to DefaultShuffleBuffer #2149

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleFlushManager;

public class ShuffleBufferWithLinkedList extends AbstractShuffleBuffer {
public class DefaultShuffleBuffer extends AbstractShuffleBuffer {
// blocks will be added to inFlushBlockMap as <eventId, blocks> pair
// it will be removed after flush to storage
// the strategy ensure that shuffle is in memory or storage
private Set<ShufflePartitionedBlock> blocks;
private Map<Long, Set<ShufflePartitionedBlock>> inFlushBlockMap;

public ShuffleBufferWithLinkedList() {
public DefaultShuffleBuffer() {
this.blocks = new LinkedHashSet<>();
this.inFlushBlockMap = JavaUtils.newConcurrentMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public StatusCode registerBuffer(
if (shuffleBufferType == ShuffleBufferType.SKIP_LIST) {
shuffleBuffer = new ShuffleBufferWithSkipList();
} else {
shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer = new DefaultShuffleBuffer();
}
bufferRangeMap.put(Range.closed(startPartition, endPartition), shuffleBuffer);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ShuffleBufferWithLinkedListTest extends BufferTestBase {
public class DefaultShuffleBufferTest extends BufferTestBase {

private static AtomicInteger atomSequenceNo = new AtomicInteger(0);

@Test
public void appendTest() {
ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer();
shuffleBuffer.append(createData(10));
// ShufflePartitionedBlock has constant 32 bytes overhead
assertEquals(42, shuffleBuffer.getSize());
Expand All @@ -59,7 +59,7 @@ public void appendTest() {

@Test
public void appendMultiBlocksTest() {
ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer();
ShufflePartitionedData data1 = createData(10);
ShufflePartitionedData data2 = createData(10);
ShufflePartitionedBlock[] dataCombine = new ShufflePartitionedBlock[2];
Expand All @@ -71,7 +71,7 @@ public void appendMultiBlocksTest() {

@Test
public void toFlushEventTest() {
ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer();
ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
assertNull(event);
shuffleBuffer.append(createData(10));
Expand All @@ -85,7 +85,7 @@ public void toFlushEventTest() {
@Test
public void getShuffleDataWithExpectedTaskIdsFilterTest() {
/** case1: all blocks in cached(or in flushed map) and size < readBufferSize */
ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer();
ShufflePartitionedData spd1 = createData(1, 1, 15);
ShufflePartitionedData spd2 = createData(1, 0, 15);
ShufflePartitionedData spd3 = createData(1, 2, 55);
Expand Down Expand Up @@ -197,7 +197,7 @@ public void getShuffleDataWithExpectedTaskIdsFilterTest() {

@Test
public void getShuffleDataWithLocalOrderTest() {
ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer();
ShufflePartitionedData spd1 = createData(1, 1, 15);
ShufflePartitionedData spd2 = createData(1, 0, 15);
ShufflePartitionedData spd3 = createData(1, 2, 15);
Expand Down Expand Up @@ -248,7 +248,7 @@ public void getShuffleDataWithLocalOrderTest() {

@Test
public void getShuffleDataTest() {
ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer();
// case1: cached data only, blockId = -1, readBufferSize > buffer size
ShufflePartitionedData spd1 = createData(10);
ShufflePartitionedData spd2 = createData(20);
Expand All @@ -261,7 +261,7 @@ public void getShuffleDataTest() {
assertArrayEquals(expectedData, sdr.getData());

// case2: cached data only, blockId = -1, readBufferSize = buffer size
shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer = new DefaultShuffleBuffer();
spd1 = createData(20);
spd2 = createData(20);
shuffleBuffer.append(spd1);
Expand All @@ -273,7 +273,7 @@ public void getShuffleDataTest() {
assertArrayEquals(expectedData, sdr.getData());

// case3-1: cached data only, blockId = -1, readBufferSize < buffer size
shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer = new DefaultShuffleBuffer();
spd1 = createData(20);
spd2 = createData(21);
shuffleBuffer.append(spd1);
Expand All @@ -285,7 +285,7 @@ public void getShuffleDataTest() {
assertArrayEquals(expectedData, sdr.getData());

// case3-2: cached data only, blockId = -1, readBufferSize < buffer size
shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer = new DefaultShuffleBuffer();
spd1 = createData(15);
spd2 = createData(15);
ShufflePartitionedData spd3 = createData(15);
Expand All @@ -307,7 +307,7 @@ public void getShuffleDataTest() {
assertArrayEquals(expectedData, sdr.getData());

// case5: flush data only, blockId = -1, readBufferSize < buffer size
shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer = new DefaultShuffleBuffer();
spd1 = createData(15);
spd2 = createData(15);
shuffleBuffer.append(spd1);
Expand All @@ -328,13 +328,13 @@ public void getShuffleDataTest() {
assertEquals(0, sdr.getBufferSegments().size());

// case6: no data in buffer & flush buffer
shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer = new DefaultShuffleBuffer();
sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 10);
assertEquals(0, sdr.getBufferSegments().size());
assertEquals(0, sdr.getDataLength());

// case7: get data with multiple flush buffer and cached buffer
shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer = new DefaultShuffleBuffer();
spd1 = createData(15);
spd2 = createData(15);
spd3 = createData(15);
Expand Down
Loading