From ef16669b8028f872ffd199f2d3f6c76ce99890a0 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 10 May 2024 10:54:15 +0800 Subject: [PATCH] [#1678] fix(server): disk size leak on removing resources by AppPurgeEvent (#1679) ### What changes were proposed in this pull request? Descrease the disk size that calculated by local storage self on removing resources with AppPurgeEvent ### Why are the changes needed? Fix: #1678 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests --- .../uniffle/server/ShuffleTaskInfo.java | 4 + .../uniffle/server/ShuffleTaskManager.java | 5 +- .../server/ShuffleTaskManagerTest.java | 85 ++++++++++++++----- 3 files changed, 74 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java index e657ec4db2..f45b1be944 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java @@ -194,6 +194,10 @@ public void markHugePartition(int shuffleId, int partitionId) { } } + public Set getShuffleIds() { + return partitionDataSizes.keySet(); + } + @Override public String toString() { return "ShuffleTaskInfo{" diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 11d6545728..50ea004ade 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -18,6 +18,7 @@ package org.apache.uniffle.server; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -771,7 +772,9 @@ public void removeResources(String appId, boolean checkAppExpired) { partitionsToBlockIds.remove(appId); shuffleBufferManager.removeBuffer(appId); shuffleFlushManager.removeResources(appId); - storageManager.removeResources(new AppPurgeEvent(appId, shuffleTaskInfo.getUser())); + storageManager.removeResources( + new AppPurgeEvent( + appId, shuffleTaskInfo.getUser(), new ArrayList<>(shuffleTaskInfo.getShuffleIds()))); if (shuffleTaskInfo.hasHugePartition()) { ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec(); ShuffleServerMetrics.gaugeHugePartitionNum.dec(); diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java index 67410b3c05..75c49cd4dc 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java @@ -104,6 +104,71 @@ public void afterEach() throws Exception { ShuffleServerMetrics.clear(); } + private ShuffleServerConf constructServerConfWithLocalfile() { + String confFile = ClassLoader.getSystemResource("server.conf").getFile(); + ShuffleServerConf conf = new ShuffleServerConf(confFile); + conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234); + conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527"); + conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345); + conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64); + conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L); + conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0); + conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0); + conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L); + conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 100000L); + conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); + + conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name()); + conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); + conf.setString( + ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(), + tempDir1.getAbsolutePath() + "," + tempDir2.getAbsolutePath()); + return conf; + } + + /** Test the shuffleMeta's diskSize when app is removed. */ + @Test + public void appPurgeWithLocalfileTest() throws Exception { + ShuffleServerConf conf = constructServerConfWithLocalfile(); + shuffleServer = new ShuffleServer(conf); + ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); + + String appId = "removeShuffleDataWithLocalfileTest"; + + int shuffleNum = 4; + for (int i = 0; i < shuffleNum; i++) { + shuffleTaskManager.registerShuffle( + appId, + i, + Lists.newArrayList(new PartitionRange(0, 1)), + RemoteStorageInfo.EMPTY_REMOTE_STORAGE, + StringUtils.EMPTY); + + ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35); + shuffleTaskManager.requireBuffer(35); + shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0); + shuffleTaskManager.updateCachedBlockIds(appId, i, partitionedData0.getBlockList()); + } + + assertEquals(1, shuffleTaskManager.getAppIds().size()); + for (int i = 0; i < shuffleNum; i++) { + shuffleTaskManager.commitShuffle(appId, i); + } + + shuffleTaskManager.removeResources(appId, false); + for (String path : conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH)) { + String appPath = path + "/" + appId; + assertFalse(new File(appPath).exists()); + } + + // once the app is removed. the disk size should be 0 + LocalStorageManager localStorageManager = + (LocalStorageManager) shuffleServer.getStorageManager(); + for (LocalStorage localStorage : localStorageManager.getStorages()) { + assertEquals(0, localStorage.getMetaData().getDiskSize().get()); + } + } + @Test public void hugePartitionMemoryUsageLimitTest() throws Exception { String confFile = ClassLoader.getSystemResource("server.conf").getFile(); @@ -479,25 +544,7 @@ public void removeShuffleDataWithHdfsTest() throws Exception { @Test public void removeShuffleDataWithLocalfileTest() throws Exception { - String confFile = ClassLoader.getSystemResource("server.conf").getFile(); - ShuffleServerConf conf = new ShuffleServerConf(confFile); - conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234); - conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527"); - conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345); - conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64); - conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L); - conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0); - conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0); - conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L); - conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 100000L); - conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); - - conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), "LOCALFILE"); - conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); - conf.setString( - ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(), - tempDir1.getAbsolutePath() + "," + tempDir2.getAbsolutePath()); - + ShuffleServerConf conf = constructServerConfWithLocalfile(); shuffleServer = new ShuffleServer(conf); ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();