diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java b/server/src/main/java/org/apache/uniffle/server/buffer/DefaultShuffleBuffer.java similarity index 98% rename from server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java rename to server/src/main/java/org/apache/uniffle/server/buffer/DefaultShuffleBuffer.java index d158490b9c..29c47519d7 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/DefaultShuffleBuffer.java @@ -37,14 +37,14 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent; import org.apache.uniffle.server.ShuffleFlushManager; -public class ShuffleBufferWithLinkedList extends AbstractShuffleBuffer { +public class DefaultShuffleBuffer extends AbstractShuffleBuffer { // blocks will be added to inFlushBlockMap as pair // it will be removed after flush to storage // the strategy ensure that shuffle is in memory or storage private Set blocks; private Map> inFlushBlockMap; - public ShuffleBufferWithLinkedList() { + public DefaultShuffleBuffer() { this.blocks = new LinkedHashSet<>(); this.inFlushBlockMap = JavaUtils.newConcurrentMap(); } diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index 18dd94c198..6b42e71f55 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -185,7 +185,7 @@ public StatusCode registerBuffer( if (shuffleBufferType == ShuffleBufferType.SKIP_LIST) { shuffleBuffer = new ShuffleBufferWithSkipList(); } else { - shuffleBuffer = new ShuffleBufferWithLinkedList(); + shuffleBuffer = new DefaultShuffleBuffer(); } bufferRangeMap.put(Range.closed(startPartition, endPartition), shuffleBuffer); } else { diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/DefaultShuffleBufferTest.java similarity index 97% rename from server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java rename to server/src/test/java/org/apache/uniffle/server/buffer/DefaultShuffleBufferTest.java index 62617332f8..8a3cd8c655 100644 --- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java +++ b/server/src/test/java/org/apache/uniffle/server/buffer/DefaultShuffleBufferTest.java @@ -39,13 +39,13 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -public class ShuffleBufferWithLinkedListTest extends BufferTestBase { +public class DefaultShuffleBufferTest extends BufferTestBase { private static AtomicInteger atomSequenceNo = new AtomicInteger(0); @Test public void appendTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(); + ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer(); shuffleBuffer.append(createData(10)); // ShufflePartitionedBlock has constant 32 bytes overhead assertEquals(42, shuffleBuffer.getSize()); @@ -59,7 +59,7 @@ public void appendTest() { @Test public void appendMultiBlocksTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(); + ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer(); ShufflePartitionedData data1 = createData(10); ShufflePartitionedData data2 = createData(10); ShufflePartitionedBlock[] dataCombine = new ShufflePartitionedBlock[2]; @@ -71,7 +71,7 @@ public void appendMultiBlocksTest() { @Test public void toFlushEventTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(); + ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer(); ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null); assertNull(event); shuffleBuffer.append(createData(10)); @@ -85,7 +85,7 @@ public void toFlushEventTest() { @Test public void getShuffleDataWithExpectedTaskIdsFilterTest() { /** case1: all blocks in cached(or in flushed map) and size < readBufferSize */ - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(); + ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer(); ShufflePartitionedData spd1 = createData(1, 1, 15); ShufflePartitionedData spd2 = createData(1, 0, 15); ShufflePartitionedData spd3 = createData(1, 2, 55); @@ -197,7 +197,7 @@ public void getShuffleDataWithExpectedTaskIdsFilterTest() { @Test public void getShuffleDataWithLocalOrderTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(); + ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer(); ShufflePartitionedData spd1 = createData(1, 1, 15); ShufflePartitionedData spd2 = createData(1, 0, 15); ShufflePartitionedData spd3 = createData(1, 2, 15); @@ -248,7 +248,7 @@ public void getShuffleDataWithLocalOrderTest() { @Test public void getShuffleDataTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(); + ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer(); // case1: cached data only, blockId = -1, readBufferSize > buffer size ShufflePartitionedData spd1 = createData(10); ShufflePartitionedData spd2 = createData(20); @@ -261,7 +261,7 @@ public void getShuffleDataTest() { assertArrayEquals(expectedData, sdr.getData()); // case2: cached data only, blockId = -1, readBufferSize = buffer size - shuffleBuffer = new ShuffleBufferWithLinkedList(); + shuffleBuffer = new DefaultShuffleBuffer(); spd1 = createData(20); spd2 = createData(20); shuffleBuffer.append(spd1); @@ -273,7 +273,7 @@ public void getShuffleDataTest() { assertArrayEquals(expectedData, sdr.getData()); // case3-1: cached data only, blockId = -1, readBufferSize < buffer size - shuffleBuffer = new ShuffleBufferWithLinkedList(); + shuffleBuffer = new DefaultShuffleBuffer(); spd1 = createData(20); spd2 = createData(21); shuffleBuffer.append(spd1); @@ -285,7 +285,7 @@ public void getShuffleDataTest() { assertArrayEquals(expectedData, sdr.getData()); // case3-2: cached data only, blockId = -1, readBufferSize < buffer size - shuffleBuffer = new ShuffleBufferWithLinkedList(); + shuffleBuffer = new DefaultShuffleBuffer(); spd1 = createData(15); spd2 = createData(15); ShufflePartitionedData spd3 = createData(15); @@ -307,7 +307,7 @@ public void getShuffleDataTest() { assertArrayEquals(expectedData, sdr.getData()); // case5: flush data only, blockId = -1, readBufferSize < buffer size - shuffleBuffer = new ShuffleBufferWithLinkedList(); + shuffleBuffer = new DefaultShuffleBuffer(); spd1 = createData(15); spd2 = createData(15); shuffleBuffer.append(spd1); @@ -328,13 +328,13 @@ public void getShuffleDataTest() { assertEquals(0, sdr.getBufferSegments().size()); // case6: no data in buffer & flush buffer - shuffleBuffer = new ShuffleBufferWithLinkedList(); + shuffleBuffer = new DefaultShuffleBuffer(); sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 10); assertEquals(0, sdr.getBufferSegments().size()); assertEquals(0, sdr.getDataLength()); // case7: get data with multiple flush buffer and cached buffer - shuffleBuffer = new ShuffleBufferWithLinkedList(); + shuffleBuffer = new DefaultShuffleBuffer(); spd1 = createData(15); spd2 = createData(15); spd3 = createData(15);