Skip to content

Commit

Permalink
feature[server] Support config buffer size for LocalFileWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
wenlongwlli committed Oct 21, 2024
1 parent f9b4c0e commit 2fcc749
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,18 @@ public class RssBaseConf extends RssConf {
+ " first combining the username and the password with a colon (uniffle:uniffle123)"
+ ", and then by encoding the resulting string in base64 (dW5pZmZsZTp1bmlmZmxlMTIz).");

public static final ConfigOption<String> RSS_STORAGE_WRITE_DATA_BUFFER_SIZE =
ConfigOptions.key("rss.storage.write.dataBufferSize")
.stringType()
.defaultValue("8k")
.withDescription("The buffer size to cache the write data content.");

public static final ConfigOption<String> RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE =
ConfigOptions.key("rss.storage.write.indexBufferSize")
.stringType()
.defaultValue("8k")
.withDescription("The buffer size to cache the write index content.");

public boolean loadConfFromFile(String fileName, List<ConfigOption<Object>> configOptions) {
Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
if (properties == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception {
int maxConcurrencyPerPartitionToWrite = getMaxConcurrencyPerPartitionWrite(event);
CreateShuffleWriteHandlerRequest request =
new CreateShuffleWriteHandlerRequest(
this.shuffleServerConf,
storageType,
event.getAppId(),
event.getShuffleId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public void updateReadMetrics(StorageReadMetrics metrics) {
@Override
ShuffleWriteHandler newWriteHandler(CreateShuffleWriteHandlerRequest request) {
return new LocalFileWriteHandler(
request.getRssBaseConf(),
request.getAppId(),
request.getShuffleId(),
request.getStartPartition(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
Expand All @@ -37,24 +38,57 @@ public class LocalFileWriteHandler implements ShuffleWriteHandler {

private static final Logger LOG = LoggerFactory.getLogger(LocalFileWriteHandler.class);

private final RssBaseConf rssBaseConf;
private String fileNamePrefix;
private String basePath;
private final int dataBufferSize;
private final int indexBufferSize;

public LocalFileWriteHandler(
RssBaseConf rssBaseConf,
String appId,
int shuffleId,
int startPartition,
int endPartition,
String storageBasePath,
String fileNamePrefix) {
this.rssBaseConf = rssBaseConf;
this.fileNamePrefix = fileNamePrefix;
this.basePath =
ShuffleStorageUtils.getFullShuffleDataFolder(
storageBasePath,
ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId, startPartition, endPartition));
this.dataBufferSize =
(int)
this.rssBaseConf.getSizeAsBytes(
RssBaseConf.RSS_STORAGE_WRITE_DATA_BUFFER_SIZE.key(),
RssBaseConf.RSS_STORAGE_WRITE_DATA_BUFFER_SIZE.defaultValue());
this.indexBufferSize =
(int)
this.rssBaseConf.getSizeAsBytes(
RssBaseConf.RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE.key(),
RssBaseConf.RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE.defaultValue());
createBasePath();
}

@VisibleForTesting
public LocalFileWriteHandler(
String appId,
int shuffleId,
int startPartition,
int endPartition,
String storageBasePath,
String fileNamePrefix) {
this(
new RssBaseConf(),
appId,
shuffleId,
startPartition,
endPartition,
storageBasePath,
fileNamePrefix);
}

private void createBasePath() {
File baseFolder = new File(basePath);
if (baseFolder.isDirectory()) {
Expand Down Expand Up @@ -96,8 +130,8 @@ public synchronized void write(Collection<ShufflePartitionedBlock> shuffleBlocks
String dataFileName = ShuffleStorageUtils.generateDataFileName(fileNamePrefix);
String indexFileName = ShuffleStorageUtils.generateIndexFileName(fileNamePrefix);

try (LocalFileWriter dataWriter = createWriter(dataFileName);
LocalFileWriter indexWriter = createWriter(indexFileName)) {
try (LocalFileWriter dataWriter = createWriter(dataFileName, dataBufferSize);
LocalFileWriter indexWriter = createWriter(indexFileName, indexBufferSize); ) {

long startTime = System.currentTimeMillis();
for (ShufflePartitionedBlock block : shuffleBlocks) {
Expand Down Expand Up @@ -131,9 +165,10 @@ public synchronized void write(Collection<ShufflePartitionedBlock> shuffleBlocks
}
}

private LocalFileWriter createWriter(String fileName) throws IOException, IllegalStateException {
private LocalFileWriter createWriter(String fileName, int bufferSize)
throws IOException, IllegalStateException {
File file = new File(basePath, fileName);
return new LocalFileWriter(file);
return new LocalFileWriter(file, bufferSize);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.io.FileOutputStream;
import java.io.IOException;

import com.google.common.annotations.VisibleForTesting;

import org.apache.uniffle.storage.api.FileWriter;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;

Expand All @@ -33,10 +35,15 @@ public class LocalFileWriter implements FileWriter, Closeable {
private FileOutputStream fileOutputStream;
private long nextOffset;

@VisibleForTesting
public LocalFileWriter(File file) throws IOException {
this(file, 8 * 1024);
}

public LocalFileWriter(File file, int bufferSize) throws IOException {
fileOutputStream = new FileOutputStream(file, true);
// init fsDataOutputStream
dataOutputStream = new DataOutputStream(new BufferedOutputStream(fileOutputStream));
dataOutputStream = new DataOutputStream(new BufferedOutputStream(fileOutputStream, bufferSize));
nextOffset = file.length();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.uniffle.storage.request;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;

import org.apache.uniffle.common.config.RssBaseConf;

public class CreateShuffleWriteHandlerRequest {

private RssBaseConf rssBaseConf;
private String storageType;
private String appId;
private int shuffleId;
Expand All @@ -33,6 +37,7 @@ public class CreateShuffleWriteHandlerRequest {
private String user;
private int maxFileNumber;

@VisibleForTesting
public CreateShuffleWriteHandlerRequest(
String storageType,
String appId,
Expand All @@ -45,6 +50,7 @@ public CreateShuffleWriteHandlerRequest(
int storageDataReplica,
String user) {
this(
new RssBaseConf(),
storageType,
appId,
shuffleId,
Expand All @@ -59,6 +65,7 @@ public CreateShuffleWriteHandlerRequest(
}

public CreateShuffleWriteHandlerRequest(
RssBaseConf rssBaseConf,
String storageType,
String appId,
int shuffleId,
Expand All @@ -70,6 +77,7 @@ public CreateShuffleWriteHandlerRequest(
int storageDataReplica,
String user,
int maxFileNumber) {
this.rssBaseConf = rssBaseConf;
this.storageType = storageType;
this.appId = appId;
this.shuffleId = shuffleId;
Expand All @@ -83,6 +91,10 @@ public CreateShuffleWriteHandlerRequest(
this.maxFileNumber = maxFileNumber;
}

public RssBaseConf getRssBaseConf() {
return rssBaseConf;
}

public String getStorageType() {
return storageType;
}
Expand Down

0 comments on commit 2fcc749

Please sign in to comment.