Skip to content

Commit

Permalink
[apache#1749] feat(remote merge): Introduce new reader for reading so…
Browse files Browse the repository at this point in the history
…rted data. (apache#2034)

### What changes were proposed in this pull request?

Introduce new reader for reading sorted data. Since apache#1748 already provides methods for merging blocks, we need to provide a method for reading merged block. The record obtained from the getSortedShuffleData method is sorted using the comparatorClassName which is passed by registerShuffle.

### Why are the changes needed?

Fix: apache#1749

### Does this PR introduce _any_ user-facing change?

Yes, add doc in a separated issue

### How was this patch tested?

unit test, integration test, test real job in cluster.
  • Loading branch information
zhengchenyu authored Sep 11, 2024
1 parent 9bd93f2 commit d170004
Show file tree
Hide file tree
Showing 48 changed files with 5,133 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,12 @@ public void registerShuffle(
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType distributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber) {}
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader) {}

@Override
public boolean sendCommit(
Expand Down Expand Up @@ -613,6 +618,14 @@ public void unregisterShuffle(String appId, int shuffleId) {}

@Override
public void unregisterShuffle(String appId) {}

@Override
public void startSortMerge(
Set<ShuffleServerInfo> serverInfos,
String appId,
int shuffleId,
int partitionId,
Roaring64NavigableMap expectedTaskIds) {}
}

static class Reduce extends MapReduceBase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,12 @@ public void registerShuffle(
RemoteStorageInfo storageType,
ShuffleDataDistributionType distributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber) {}
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader) {}

@Override
public boolean sendCommit(
Expand Down Expand Up @@ -582,6 +587,14 @@ public void unregisterShuffle(String appId, int shuffleId) {}

@Override
public void unregisterShuffle(String appId) {}

@Override
public void startSortMerge(
Set<ShuffleServerInfo> serverInfos,
String appId,
int shuffleId,
int partitionId,
Roaring64NavigableMap expectedTaskIds) {}
}

static class MockedShuffleReadClient implements ShuffleReadClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,12 @@ protected void registerShuffleServers(
remoteStorage,
ShuffleDataDistributionType.NORMAL,
maxConcurrencyPerPartitionToWrite,
stageAttemptNumber);
stageAttemptNumber,
null,
null,
null,
-1,
null);
});
LOG.info(
"Finish register shuffleId {} with {} ms", shuffleId, (System.currentTimeMillis() - start));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,12 @@ public void registerShuffle(
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber) {}
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader) {}

@Override
public boolean sendCommit(
Expand Down Expand Up @@ -684,5 +689,13 @@ public void unregisterShuffle(String appId, int shuffleId) {}

@Override
public void unregisterShuffle(String appId) {}

@Override
public void startSortMerge(
Set<ShuffleServerInfo> serverInfos,
String appId,
int shuffleId,
int partitionId,
Roaring64NavigableMap expectedTaskIds) {}
}
}
6 changes: 6 additions & 0 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ default void registerShuffle(
remoteStorage,
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
0);
0,
null,
null,
null,
-1,
null);
}

void registerShuffle(
Expand All @@ -82,7 +87,12 @@ void registerShuffle(
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber);
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader);

boolean sendCommit(
Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps);
Expand Down Expand Up @@ -184,4 +194,11 @@ Roaring64NavigableMap getShuffleResultForMultiPart(
void unregisterShuffle(String appId, int shuffleId);

void unregisterShuffle(String appId);

void startSortMerge(
Set<ShuffleServerInfo> serverInfos,
String appId,
int shuffleId,
int partitionId,
Roaring64NavigableMap expectedTaskIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.request.RssStartSortMergeRequest;
import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest;
import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
import org.apache.uniffle.client.response.ClientResponse;
Expand All @@ -75,6 +76,7 @@
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendCommitResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.client.response.RssStartSortMergeResponse;
import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse;
import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
import org.apache.uniffle.client.response.SendShuffleDataResult;
Expand Down Expand Up @@ -561,7 +563,12 @@ public void registerShuffle(
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber) {
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader) {
String user = null;
try {
user = UserGroupInformation.getCurrentUser().getShortUserName();
Expand All @@ -578,7 +585,12 @@ public void registerShuffle(
user,
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
stageAttemptNumber);
stageAttemptNumber,
keyClassName,
valueClassName,
comparatorClassName,
mergedBlockSize,
mergeClassLoader);
RssRegisterShuffleResponse response =
getShuffleServerClient(shuffleServerInfo).registerShuffle(request);

Expand Down Expand Up @@ -1069,6 +1081,56 @@ public void unregisterShuffle(String appId) {
}
}

@Override
public void startSortMerge(
Set<ShuffleServerInfo> serverInfos,
String appId,
int shuffleId,
int partitionId,
Roaring64NavigableMap expectedBlockIds) {
RssStartSortMergeRequest request =
new RssStartSortMergeRequest(appId, shuffleId, partitionId, expectedBlockIds);
boolean atLeastOneSucceeful = false;
for (ShuffleServerInfo ssi : serverInfos) {
RssStartSortMergeResponse response = getShuffleServerClient(ssi).startSortMerge(request);
if (response.getStatusCode() == StatusCode.SUCCESS) {
atLeastOneSucceeful = true;
LOG.info(
"Report unique blocks to "
+ ssi
+ " for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], partitionIds["
+ partitionId
+ "] successfully");
} else {
LOG.warn(
"Report unique blocks to "
+ ssi
+ " for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], partitionIds["
+ partitionId
+ "] failed with "
+ response.getStatusCode());
}
}
if (!atLeastOneSucceeful) {
throw new RssFetchFailedException(
"Report Unique Blocks failed for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], partitionIds["
+ partitionId
+ "]");
}
}

private void throwExceptionIfNecessary(ClientResponse response, String errorMsg) {
if (response != null && response.getStatusCode() != StatusCode.SUCCESS) {
LOG.error(errorMsg);
Expand Down
69 changes: 69 additions & 0 deletions client/src/main/java/org/apache/uniffle/client/record/Record.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.client.record;

import com.google.common.base.Objects;

public class Record<K, V> {

private K key;
private V value;

private Record(K key, V value) {
this.key = key;
this.value = value;
}

public static <K, V> Record create(K key, V value) {
return new Record<K, V>(key, value);
}

public K getKey() {
return key;
}

public V getValue() {
return value;
}

public void setValue(V value) {
this.value = value;
}

@Override
public String toString() {
return "Record{" + "key=" + key + ", value=" + value + '}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Record<?, ?> record = (Record<?, ?>) o;
return Objects.equal(key, record.key) && Objects.equal(value, record.value);
}

@Override
public int hashCode() {
return Objects.hashCode(key, value);
}
}
Loading

0 comments on commit d170004

Please sign in to comment.