Skip to content

Commit

Permalink
Merge branch 'master' into issue-1749
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengchenyu authored Sep 2, 2024
2 parents 77c1d15 + 9532a46 commit b7cc5bf
Show file tree
Hide file tree
Showing 78 changed files with 1,530 additions and 497 deletions.
5 changes: 3 additions & 2 deletions bin/start-coordinator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ COORDINATOR_CONF_FILE="${RSS_CONF_DIR}/coordinator.conf"
JAR_DIR="${RSS_HOME}/jars"
LOG_CONF_FILE="${RSS_CONF_DIR}/log4j2.xml"
LOG_PATH="${RSS_LOG_DIR}/coordinator.log"
LOG_OUT_PATH="${RSS_LOG_DIR}/coordinator.out"
COORDINATOR_RPC_AUDIT_LOG_PATH="${RSS_LOG_DIR}/coordinator_rpc_audit.log"

MAIN_CLASS="org.apache.uniffle.coordinator.CoordinatorServer"
Expand Down Expand Up @@ -97,7 +98,7 @@ GC_LOG_ARGS_NEW=" -XX:+IgnoreUnrecognizedVMOptions \
JVM_LOG_ARGS=""

if [ -f ${LOG_CONF_FILE} ]; then
JVM_LOG_ARGS=" -Dlog4j2.configurationFile=file:${LOG_CONF_FILE} -Dlog.path=${LOG_PATH} -Dcoordinator.rpc.audit.log.path=${COORDINATOR_RPC_AUDIT_LOG_PATH}"
JVM_LOG_ARGS=" -Dlog4j2.configurationFile=file:${LOG_CONF_FILE} -Dlog.path=${LOG_PATH} -Drpc.audit.log.path=${COORDINATOR_RPC_AUDIT_LOG_PATH}"
else
echo "Exit with error: ${LOG_CONF_FILE} file doesn't exist."
exit 1
Expand All @@ -111,7 +112,7 @@ else
fi

COORDINATOR_JAVA_OPTS=${COORDINATOR_JAVA_OPTS:-""}
$RUNNER ${COORDINATOR_BASE_JVM_ARGS} ${COORDINATOR_JVM_GC_ARGS} ${JVM_LOG_ARGS} ${COORDINATOR_JAVA_OPTS} -cp ${CLASSPATH} ${MAIN_CLASS} --conf "${COORDINATOR_CONF_FILE}" $@ &
(nohup $RUNNER ${COORDINATOR_BASE_JVM_ARGS} ${COORDINATOR_JVM_GC_ARGS} ${JVM_LOG_ARGS} ${COORDINATOR_JAVA_OPTS} -cp ${CLASSPATH} ${MAIN_CLASS} --conf "${COORDINATOR_CONF_FILE}" $@ > ${LOG_OUT_PATH} 2>&1) &

get_pid_file_name coordinator
echo $! >${RSS_PID_DIR}/${pid_file}
4 changes: 2 additions & 2 deletions bin/start-dashboard.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ DASHBOARD_CONF_FILE="${RSS_CONF_DIR}/dashboard.conf"
JAR_DIR="${RSS_HOME}/jars"
LOG_CONF_FILE="${RSS_CONF_DIR}/log4j2.xml"
LOG_PATH="${RSS_LOG_DIR}/dashboard.log"
LOG_OUT_PATH="${RSS_LOG_DIR}/dashboard.out"

MAIN_CLASS="org.apache.uniffle.dashboard.web.Dashboard"

Expand Down Expand Up @@ -91,7 +92,6 @@ else
fi

DASHBOARD_JAVA_OPTS=${DASHBOARD_JAVA_OPTS:-""}
$RUNNER ${DASHBOARD_BASE_JVM_ARGS} ${DASHBOARD_JVM_GC_ARGS} ${JVM_LOG_ARGS} ${DASHBOARD_JAVA_OPTS} -cp ${CLASSPATH} ${MAIN_CLASS} --conf "${DASHBOARD_CONF_FILE}" $@ &

(nohup $RUNNER ${DASHBOARD_BASE_JVM_ARGS} ${DASHBOARD_JVM_GC_ARGS} ${JVM_LOG_ARGS} ${DASHBOARD_JAVA_OPTS} -cp ${CLASSPATH} ${MAIN_CLASS} --conf "${DASHBOARD_CONF_FILE}" $@ > ${LOG_OUT_PATH} 2>&1) &
get_pid_file_name dashboard
echo $! >${RSS_PID_DIR}/${pid_file}
5 changes: 3 additions & 2 deletions bin/start-shuffle-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ SHUFFLE_SERVER_CONF_FILE="${RSS_CONF_DIR}/server.conf"
JAR_DIR="${RSS_HOME}/jars"
LOG_CONF_FILE="${RSS_CONF_DIR}/log4j2.xml"
LOG_PATH="${RSS_LOG_DIR}/shuffle_server.log"
LOG_OUT_PATH="${RSS_LOG_DIR}/shuffle_server.out"
SHUFFLE_SERVER_STORAGE_AUDIT_LOG_PATH=${SHUFFLE_SERVER_STORAGE_AUDIT_LOG_PATH:-"${RSS_LOG_DIR}/shuffle_server_storage_audit.log"}
SHUFFLE_SERVER_RPC_AUDIT_LOG_PATH=${SHUFFLE_SERVER_RPC_AUDIT_LOG_PATH:-"${RSS_LOG_DIR}/shuffle_server_rpc_audit.log"}

Expand Down Expand Up @@ -135,7 +136,7 @@ GC_LOG_ARGS_NEW=" -XX:+IgnoreUnrecognizedVMOptions \
JVM_LOG_ARGS=""

if [ -f ${LOG_CONF_FILE} ]; then
JVM_LOG_ARGS=" -Dlog4j2.configurationFile=file:${LOG_CONF_FILE} -Dlog.path=${LOG_PATH} -Dshuffle.server.storage.audit.log.path=${SHUFFLE_SERVER_STORAGE_AUDIT_LOG_PATH} -Dshuffle.server.rpc.audit.log.path=${SHUFFLE_SERVER_RPC_AUDIT_LOG_PATH}"
JVM_LOG_ARGS=" -Dlog4j2.configurationFile=file:${LOG_CONF_FILE} -Dlog.path=${LOG_PATH} -Dstorage.audit.log.path=${SHUFFLE_SERVER_STORAGE_AUDIT_LOG_PATH} -Drpc.audit.log.path=${SHUFFLE_SERVER_RPC_AUDIT_LOG_PATH}"
else
echo "Exit with error: ${LOG_CONF_FILE} file doesn't exist."
exit 1
Expand All @@ -149,7 +150,7 @@ else
fi

SHUFFLE_SERVER_JAVA_OPTS=${SHUFFLE_SERVER_JAVA_OPTS:-""}
$RUNNER ${SHUFFLE_SERVER_BASE_JVM_ARGS} ${SHUFFLE_SERVER_JVM_GC_ARGS} ${JVM_LOG_ARGS} ${JAVA_LIB_PATH} ${SHUFFLE_SERVER_JAVA_OPTS} -cp ${CLASSPATH} ${MAIN_CLASS} --conf "${SHUFFLE_SERVER_CONF_FILE}" $@ &
(nohup $RUNNER ${SHUFFLE_SERVER_BASE_JVM_ARGS} ${SHUFFLE_SERVER_JVM_GC_ARGS} ${JVM_LOG_ARGS} ${JAVA_LIB_PATH} ${SHUFFLE_SERVER_JAVA_OPTS} -cp ${CLASSPATH} ${MAIN_CLASS} --conf "${SHUFFLE_SERVER_CONF_FILE}" $@ > ${LOG_OUT_PATH} 2>&1) &

get_pid_file_name shuffle-server
echo $! >${RSS_PID_DIR}/${pid_file}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -96,7 +97,7 @@ public class SortWriteBufferManager<K, V> {
private final long maxBufferSize;
private final ExecutorService sendExecutorService;
private final RssConf rssConf;
private final Codec codec;
private final Optional<Codec> codec;
private final Task.CombinerRunner<K, V> combinerRunner;

public SortWriteBufferManager(
Expand Down Expand Up @@ -383,7 +384,7 @@ ShuffleBlockInfo createShuffleBlock(SortWriteBuffer wb) {
int partitionId = wb.getPartitionId();
final int uncompressLength = data.length;
long start = System.currentTimeMillis();
final byte[] compressed = codec.compress(data);
final byte[] compressed = codec.map(c -> c.compress(data)).orElse(data);
final long crc32 = ChecksumUtils.getCrc32(compressed);
compressTime += System.currentTimeMillis() - start;
final long blockId =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.Optional;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.mapred.Counters;
Expand All @@ -38,6 +39,7 @@
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ByteBufferUtils;
import org.apache.uniffle.common.util.ByteUnit;

public class RssFetcher<K, V> {
Expand Down Expand Up @@ -90,7 +92,7 @@ private enum ShuffleErrors {
private int waitCount = 0;
private byte[] uncompressedData = null;
private RssConf rssConf;
private Codec codec;
private Optional<Codec> codec;

RssFetcher(
JobConf job,
Expand Down Expand Up @@ -161,14 +163,19 @@ public void copyFromRssServer() throws IOException {

// uncompress the block
if (!hasPendingData && compressedData != null) {
final long startDecompress = System.currentTimeMillis();
int uncompressedLen = compressedBlock.getUncompressLength();
ByteBuffer decompressedBuffer = ByteBuffer.allocate(uncompressedLen);
codec.decompress(compressedData, uncompressedLen, decompressedBuffer, 0);
uncompressedData = decompressedBuffer.array();
unCompressionLength += compressedBlock.getUncompressLength();
long decompressDuration = System.currentTimeMillis() - startDecompress;
decompressTime += decompressDuration;
if (codec.isPresent()) {
final long startDecompress = System.currentTimeMillis();
int uncompressedLen = compressedBlock.getUncompressLength();
ByteBuffer decompressedBuffer = ByteBuffer.allocate(uncompressedLen);
codec.get().decompress(compressedData, uncompressedLen, decompressedBuffer, 0);
uncompressedData = decompressedBuffer.array();
unCompressionLength += compressedBlock.getUncompressLength();
long decompressDuration = System.currentTimeMillis() - startDecompress;
decompressTime += decompressDuration;
} else {
uncompressedData = ByteBufferUtils.bufferToArray(compressedData);
unCompressionLength += uncompressedData.length;
}
}

if (uncompressedData != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;

import scala.Product2;
import scala.Tuple2;
Expand Down Expand Up @@ -59,7 +60,7 @@ public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C
private long totalRawBytesLength = 0;
private long unCompressedBytesLength = 0;
private ByteBuffer uncompressedData;
private Codec codec;
private Optional<Codec> codec;

public RssShuffleDataIterator(
Serializer serializer,
Expand All @@ -74,7 +75,7 @@ public RssShuffleDataIterator(
RssSparkConfig.SPARK_SHUFFLE_COMPRESS_KEY.substring(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()),
RssSparkConfig.SPARK_SHUFFLE_COMPRESS_DEFAULT);
this.codec = compress ? Codec.newInstance(rssConf) : null;
this.codec = compress ? Codec.newInstance(rssConf) : Optional.empty();
}

public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data) {
Expand Down Expand Up @@ -131,7 +132,7 @@ public boolean hasNext() {
shuffleReadClient.checkProcessedBlockIds();
shuffleReadClient.logStatics();
String decInfo =
codec == null
!codec.isPresent()
? "."
: (", "
+ decompressTime
Expand Down Expand Up @@ -160,7 +161,7 @@ private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) {
shuffleReadMetrics.incRemoteBytesRead(rawDataLength);

int uncompressedLen = rawBlock.getUncompressLength();
if (codec != null) {
if (codec.isPresent()) {
if (uncompressedData == null
|| uncompressedData.capacity() < uncompressedLen
|| !isSameMemoryType(uncompressedData, rawData)) {
Expand All @@ -185,7 +186,7 @@ private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) {
}
uncompressedData.clear();
long startDecompress = System.currentTimeMillis();
codec.decompress(rawData, uncompressedLen, uncompressedData, 0);
codec.get().decompress(rawData, uncompressedLen, uncompressedData, 0);
unCompressedBytesLength += uncompressedLen;
long decompressDuration = System.currentTimeMillis() - startDecompress;
decompressTime += decompressDuration;
Expand All @@ -210,7 +211,7 @@ public BoxedUnit cleanup() {
// Uncompressed data is released in this class, Compressed data is release in the class
// ShuffleReadClientImpl
// So if codec is null, we don't release the data when the stream is closed
if (codec != null) {
if (codec.isPresent()) {
RssUtils.releaseByteBuffer(uncompressedData);
}
if (shuffleReadClient != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -91,7 +92,7 @@ public class WriteBufferManager extends MemoryConsumer {
private long uncompressedDataLen = 0;
private long requireMemoryInterval;
private int requireMemoryRetryMax;
private Codec codec;
private Optional<Codec> codec;
private Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc;
private long sendSizeLimit;
private boolean memorySpillEnabled;
Expand Down Expand Up @@ -159,7 +160,7 @@ public WriteBufferManager(
RssSparkConfig.SPARK_SHUFFLE_COMPRESS_KEY.substring(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()),
RssSparkConfig.SPARK_SHUFFLE_COMPRESS_DEFAULT);
this.codec = compress ? Codec.newInstance(rssConf) : null;
this.codec = compress ? Codec.newInstance(rssConf) : Optional.empty();
this.spillFunc = spillFunc;
this.sendSizeLimit = rssConf.get(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMITATION);
this.memorySpillTimeoutSec = rssConf.get(RssSparkConfig.RSS_MEMORY_SPILL_TIMEOUT);
Expand Down Expand Up @@ -384,9 +385,9 @@ protected ShuffleBlockInfo createShuffleBlock(int partitionId, WriterBuffer wb)
byte[] data = wb.getData();
final int uncompressLength = data.length;
byte[] compressed = data;
if (codec != null) {
if (codec.isPresent()) {
long start = System.currentTimeMillis();
compressed = codec.compress(data);
compressed = codec.get().compress(data);
compressTime += System.currentTimeMillis() - start;
}
final long crc32 = ChecksumUtils.getCrc32(compressed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.MapOutputTracker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -171,8 +172,9 @@ protected ShufflePartitionedBlock createShuffleBlock(byte[] data, long blockId)
protected ShufflePartitionedBlock createShuffleBlock(
byte[] data, long blockId, boolean compress) {
byte[] compressData = data;
if (compress) {
compressData = Codec.newInstance(new RssConf()).compress(data);
Optional<Codec> codec = Codec.newInstance(new RssConf());
if (compress && codec.isPresent()) {
compressData = codec.get().compress(data);
}
long crc = ChecksumUtils.getCrc32(compressData);
return new ShufflePartitionedBlock(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import com.google.common.collect.Lists;
Expand All @@ -46,6 +47,7 @@
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.ChecksumUtils;
Expand Down Expand Up @@ -321,11 +323,12 @@ private void readTestCompressOrNot(String path, boolean compress) throws Excepti
RssShuffleDataIterator rssShuffleDataIterator =
getDataIterator(
basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1, ssi2), compress);
Object codec = FieldUtils.readField(rssShuffleDataIterator, "codec", true);
Optional<Codec> codec =
(Optional<Codec>) FieldUtils.readField(rssShuffleDataIterator, "codec", true);
if (compress) {
Assertions.assertNotNull(codec);
Assertions.assertTrue(codec.isPresent());
} else {
Assertions.assertNull(codec);
Assertions.assertFalse(codec.isPresent());
}

validateResult(rssShuffleDataIterator, expectedData, 20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand All @@ -46,6 +47,7 @@
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.BlockIdLayout;
Expand Down Expand Up @@ -122,11 +124,11 @@ private void addRecord(boolean compress, BlockIdLayout layout) throws IllegalAcc
conf.set(RssSparkConfig.SPARK_SHUFFLE_COMPRESS_KEY, String.valueOf(false));
}
WriteBufferManager wbm = createManager(conf);
Object codec = FieldUtils.readField(wbm, "codec", true);
Optional<Codec> codec = (Optional<Codec>) FieldUtils.readField(wbm, "codec", true);
if (compress) {
Assertions.assertNotNull(codec);
Assertions.assertTrue(codec.isPresent());
} else {
Assertions.assertNull(codec);
Assertions.assertFalse(codec.isPresent());
}
wbm.setShuffleWriteMetrics(new ShuffleWriteMetrics());
String testKey = "Key";
Expand Down
3 changes: 3 additions & 0 deletions client-spark/spark2-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@
<echo message="repackaging netty jar"></echo>
<jar destfile="${project.build.directory}/${project.artifactId}-${project.version}.jar"
basedir="${project.build.directory}/unpacked"/>
<exec executable="bash" failonerror="true" dir="${project.build.directory}/">
<arg line="${project.basedir}/../../dev/scripts/checkshade.sh ${project.build.directory}/${project.artifactId}-${project.version}.jar"/>
</exec>
</target>
</configuration>
</execution>
Expand Down
3 changes: 3 additions & 0 deletions client-spark/spark3-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@
<echo message="repackaging netty jar"></echo>
<jar destfile="${project.build.directory}/${project.artifactId}-${project.version}.jar"
basedir="${project.build.directory}/unpacked"/>
<exec executable="bash" failonerror="true" dir="${project.build.directory}/">
<arg line="${project.basedir}/../../dev/scripts/checkshade.sh ${project.build.directory}/${project.artifactId}-${project.version}.jar"/>
</exec>
</target>
</configuration>
</execution>
Expand Down
Loading

0 comments on commit b7cc5bf

Please sign in to comment.