Skip to content

Commit

Permalink
[apache#1750] feat(remote merge): Support MapReduce. (apache#2109)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

MapReduce support remote merge.

### Why are the changes needed?

Fix: apache#1750 

### Does this PR introduce _any_ user-facing change?

Yes, I will refine documentation in other PR.

### How was this patch tested?

unit test, integration test, real job in cluster.
  • Loading branch information
zhengchenyu authored Sep 29, 2024
1 parent 6b0dc76 commit 3b981ed
Show file tree
Hide file tree
Showing 19 changed files with 1,734 additions and 54 deletions.
12 changes: 12 additions & 0 deletions client-mr/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@
<artifactId>hadoop${hadoop.short.version}-shim</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-client</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.mapreduce.RssMRUtils;
Expand Down Expand Up @@ -131,7 +130,6 @@ public void init(Context context) throws IOException, ClassNotFoundException {

Map<Integer, List<ShuffleServerInfo>> partitionToServers = createAssignmentMap(rssJobConf);

SerializationFactory serializationFactory = new SerializationFactory(mrJobConf);
long maxSegmentSize =
RssMRUtils.getLong(
rssJobConf,
Expand All @@ -148,13 +146,19 @@ public void init(Context context) throws IOException, ClassNotFoundException {
RssMRConfig.RSS_WRITER_BUFFER_SIZE,
RssMRConfig.RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE);
shuffleClient = RssMRUtils.createShuffleClient(mrJobConf);
boolean isRemoteMergeEnable =
RssMRUtils.getBoolean(
rssJobConf,
RssMRConfig.RSS_REMOTE_MERGE_ENABLE,
RssMRConfig.RSS_REMOTE_MERGE_ENABLE_DEFAULT);
bufferManager =
new SortWriteBufferManager(
(long) (ByteUnit.MiB.toBytes(sortmb) * sortThreshold),
taskAttemptId,
batch,
serializationFactory.getSerializer(keyClass),
serializationFactory.getSerializer(valClass),
keyClass,
valClass,
mrJobConf,
comparator,
memoryThreshold,
appId,
Expand All @@ -174,7 +178,8 @@ public void init(Context context) throws IOException, ClassNotFoundException {
sendThreshold,
maxBufferSize,
RssMRConfig.toRssConf(rssJobConf),
combinerRunner);
combinerRunner,
isRemoteMergeEnable);
}

private Map<Integer, List<ShuffleServerInfo>> createAssignmentMap(Configuration jobConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.mapred;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
Expand All @@ -31,6 +32,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.serializer.SerializerInstance;

public class SortWriteBuffer<K, V> extends OutputStream {

private static final Logger LOG = LoggerFactory.getLogger(SortWriteBuffer.class);
Expand All @@ -47,35 +50,56 @@ public class SortWriteBuffer<K, V> extends OutputStream {
private int currentOffset = 0;
private int currentIndex = 0;

private final boolean useUniffleSerializer;
private final SerializerInstance serializerInstance;
private DataOutputStream dataOutputStream;

public SortWriteBuffer(
int partitionId,
RawComparator<K> comparator,
long maxSegmentSize,
boolean useUniffleSerializer,
Serializer<K> keySerializer,
Serializer<V> valueSerializer) {
Serializer<V> valueSerializer,
SerializerInstance serializerInstance) {
this.partitionId = partitionId;
this.comparator = comparator;
this.maxSegmentSize = maxSegmentSize;
this.useUniffleSerializer = useUniffleSerializer;
this.keySerializer = keySerializer;
this.valSerializer = valueSerializer;
this.serializerInstance = serializerInstance;
if (useUniffleSerializer) {
this.dataOutputStream = new DataOutputStream(this);
}
}

public int addRecord(K key, V value) throws IOException {
keySerializer.open(this);
valSerializer.open(this);
if (!useUniffleSerializer) {
keySerializer.open(this);
valSerializer.open(this);
}
int lastOffSet = currentOffset;
int lastIndex = currentIndex;
int lastDataLength = dataLength;
int keyIndex = lastIndex;
keySerializer.serialize(key);
if (useUniffleSerializer) {
serializerInstance.serialize(key, this.dataOutputStream);
} else {
keySerializer.serialize(key);
}
int keyLength = dataLength - lastDataLength;
int keyOffset = lastOffSet;
if (compact(lastIndex, lastOffSet, keyLength)) {
keyOffset = lastOffSet;
keyIndex = lastIndex;
}
lastDataLength = dataLength;
valSerializer.serialize(value);
if (useUniffleSerializer) {
serializerInstance.serialize(value, this.dataOutputStream);
} else {
valSerializer.serialize(value);
}
int valueLength = dataLength - lastDataLength;
records.add(new Record<K>(keyIndex, keyOffset, keyLength, valueLength));
return keyLength + valueLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapreduce.RssMRUtils;
import org.slf4j.Logger;
Expand All @@ -50,6 +51,8 @@
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.serializer.SerializerFactory;
import org.apache.uniffle.common.serializer.SerializerInstance;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;
Expand All @@ -74,8 +77,8 @@ public class SortWriteBufferManager<K, V> {
private final double sendThreshold;
private final ReentrantLock memoryLock = new ReentrantLock();
private final Condition full = memoryLock.newCondition();
private final Serializer<K> keySerializer;
private final Serializer<V> valSerializer;
private Serializer<K> keySerializer;
private Serializer<V> valSerializer;
private final RawComparator<K> comparator;
private final Set<Long> successBlockIds;
private final Set<Long> failedBlockIds;
Expand All @@ -99,13 +102,16 @@ public class SortWriteBufferManager<K, V> {
private final RssConf rssConf;
private final Optional<Codec> codec;
private final Task.CombinerRunner<K, V> combinerRunner;
private final boolean useUniffleSerializer;
private SerializerInstance serializerInstance;

public SortWriteBufferManager(
long maxMemSize,
long taskAttemptId,
int batch,
Serializer<K> keySerializer,
Serializer<V> valSerializer,
Class<K> keyClass,
Class<V> valClass,
JobConf mrJobConf,
RawComparator<K> comparator,
double memoryThreshold,
String appId,
Expand All @@ -125,12 +131,11 @@ public SortWriteBufferManager(
double sendThreshold,
long maxBufferSize,
RssConf rssConf,
Task.CombinerRunner<K, V> combinerRunner) {
Task.CombinerRunner<K, V> combinerRunner,
boolean useUniffleSerializer) {
this.maxMemSize = maxMemSize;
this.taskAttemptId = taskAttemptId;
this.batch = batch;
this.keySerializer = keySerializer;
this.valSerializer = valSerializer;
this.comparator = comparator;
this.memoryThreshold = memoryThreshold;
this.appId = appId;
Expand All @@ -152,6 +157,17 @@ public SortWriteBufferManager(
this.rssConf = rssConf;
this.codec = Codec.newInstance(rssConf);
this.combinerRunner = combinerRunner;
this.useUniffleSerializer = useUniffleSerializer;
if (useUniffleSerializer) {
SerializerFactory factory = new SerializerFactory(rssConf);
org.apache.uniffle.common.serializer.Serializer serializer = factory.getSerializer(keyClass);
this.serializerInstance = serializer.newInstance();
assert factory.getSerializer(valClass).getClass().equals(serializer.getClass());
} else {
SerializationFactory serializationFactory = new SerializationFactory(mrJobConf);
this.keySerializer = serializationFactory.getSerializer(keyClass);
this.valSerializer = serializationFactory.getSerializer(valClass);
}
}

// todo: Single Buffer should also have its size limit
Expand All @@ -178,7 +194,13 @@ public void addRecord(int partitionId, K key, V value) throws IOException, Inter
k -> {
SortWriteBuffer<K, V> sortWriterBuffer =
new SortWriteBuffer(
partitionId, comparator, maxSegmentSize, keySerializer, valSerializer);
partitionId,
comparator,
maxSegmentSize,
useUniffleSerializer,
keySerializer,
valSerializer,
serializerInstance);
waitSendBuffers.add(sortWriterBuffer);
return sortWriterBuffer;
});
Expand Down Expand Up @@ -274,7 +296,13 @@ public SortWriteBuffer<K, V> combineBuffer(SortWriteBuffer<K, V> buffer)

SortWriteBuffer<K, V> newBuffer =
new SortWriteBuffer<>(
buffer.getPartitionId(), comparator, maxSegmentSize, keySerializer, valSerializer);
buffer.getPartitionId(),
comparator,
maxSegmentSize,
useUniffleSerializer,
keySerializer,
valSerializer,
serializerInstance);

combineCollector.setWriter(newBuffer);
combinerRunner.combine(kvIterator, combineCollector);
Expand Down Expand Up @@ -384,7 +412,8 @@ ShuffleBlockInfo createShuffleBlock(SortWriteBuffer wb) {
int partitionId = wb.getPartitionId();
final int uncompressLength = data.length;
long start = System.currentTimeMillis();
final byte[] compressed = codec.map(c -> c.compress(data)).orElse(data);
final byte[] compressed =
useUniffleSerializer ? data : codec.map(c -> c.compress(data)).orElse(data);
final long crc32 = ChecksumUtils.getCrc32(compressed);
compressTime += System.currentTimeMillis() - start;
final long blockId =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,16 @@ public class RssMRConfig {
public static final String RSS_TEST_MODE_ENABLE =
MR_CONFIG_PREFIX + RssClientConfig.RSS_TEST_MODE_ENABLE;

public static final String RSS_REMOTE_MERGE_ENABLE =
MR_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_MERGE_ENABLE;
public static final boolean RSS_REMOTE_MERGE_ENABLE_DEFAULT = false;
public static final String RSS_MERGED_BLOCK_SZIE =
MR_CONFIG_PREFIX + RssClientConfig.RSS_MERGED_BLOCK_SZIE;
public static final int RSS_MERGED_BLOCK_SZIE_DEFAULT =
RssClientConfig.RSS_MERGED_BLOCK_SZIE_DEFAULT;
public static final String RSS_REMOTE_MERGE_CLASS_LOADER =
MR_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_MERGE_CLASS_LOADER;

public static RssConf toRssConf(Configuration jobConf) {
RssConf rssConf = new RssConf();
for (Map.Entry<String, String> entry : jobConf) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.mapreduce.task.reduce;

import org.apache.hadoop.mapred.Reporter;

import org.apache.uniffle.client.record.metrics.MetricsReporter;

public class MRMetricsReporter implements MetricsReporter {

Reporter reporter;

public MRMetricsReporter(Reporter reporter) {
this.reporter = reporter;
}

@Override
public void incRecordsRead(long v) {
this.reporter.incrCounter(RMRssShuffle.Counter.INPUT_RECORDS_PROCESSED, v);
}
}
Loading

0 comments on commit 3b981ed

Please sign in to comment.