Skip to content

Commit

Permalink
merge master (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengchenyu authored Oct 21, 2024
1 parent 7826b16 commit 8b6c234
Show file tree
Hide file tree
Showing 12 changed files with 194 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ type CoordinatorConfig struct {
// HTTPNodePort defines http port of node port service used for coordinators' external access.
// +optional
HTTPNodePort []int32 `json:"httpNodePort,omitempty"`

// NodePortServiceAnnotations is a list of annotations for the NodePort service.
// +optional
NodePortServiceAnnotations []map[string]string `json:"nodePortServiceAnnotations,omitempty"`

// HeadlessServiceAnnotations is a list of annotations for the headless service.
// +optional
HeadlessServiceAnnotations []map[string]string `json:"headlessServiceAnnotations,omitempty"`
}

// ShuffleServerConfig records configuration used to generate workload of shuffle servers.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -1786,6 +1786,14 @@ spec:
description: ExcludeNodesFilePath indicates exclude nodes file
path in coordinators' containers.
type: string
headlessServiceAnnotations:
description: HeadlessServiceAnnotations is a list of annotations
for the headless service.
items:
additionalProperties:
type: string
type: object
type: array
hostNetwork:
default: true
description: HostNetwork indicates whether we need to enable host
Expand Down Expand Up @@ -1827,6 +1835,14 @@ spec:
description: LogHostPath represents host path used to save logs
of shuffle servers.
type: string
nodePortServiceAnnotations:
description: NodePortServiceAnnotations is a list of annotations
for the NodePort service.
items:
additionalProperties:
type: string
type: object
type: array
nodeSelector:
additionalProperties:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,19 @@ func GenerateHeadlessSvc(rss *unifflev1alpha1.RemoteShuffleService, index int) *
name := GenerateNameByIndex(rss, index)
serviceName := appendHeadless(name)

annotations := map[string]string{}

if len(rss.Spec.Coordinator.HeadlessServiceAnnotations) > index {
for key, value := range rss.Spec.Coordinator.HeadlessServiceAnnotations[index] {
annotations[key] = value
}
}

svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: rss.Namespace,
Name: serviceName,
Namespace: rss.Namespace,
Annotations: annotations,
},
Spec: corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
Expand Down Expand Up @@ -140,10 +149,20 @@ func GenerateHeadlessSvc(rss *unifflev1alpha1.RemoteShuffleService, index int) *
// this function is skipped.
func GenerateSvc(rss *unifflev1alpha1.RemoteShuffleService, index int) *corev1.Service {
name := GenerateNameByIndex(rss, index)

annotations := map[string]string{}

if len(rss.Spec.Coordinator.NodePortServiceAnnotations) > index {
for key, value := range rss.Spec.Coordinator.NodePortServiceAnnotations[index] {
annotations[key] = value
}
}

svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: rss.Namespace,
Name: name,
Namespace: rss.Namespace,
Annotations: annotations,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,24 @@ var (
"key1": "value1",
"key2": "value2",
}

testSvcAnnotationsList = []map[string]string{
{
"annotation1": "value1",
},
{
"annotation2": "value2",
},
}
)

func buildRssWithSvcAnnotations() *uniffleapi.RemoteShuffleService {
rss := utils.BuildRSSWithDefaultValue()
rss.Spec.Coordinator.NodePortServiceAnnotations = testSvcAnnotationsList
rss.Spec.Coordinator.HeadlessServiceAnnotations = testSvcAnnotationsList
return rss
}

func buildRssWithLabels() *uniffleapi.RemoteShuffleService {
rss := utils.BuildRSSWithDefaultValue()
rss.Spec.Coordinator.Labels = testLabels
Expand Down Expand Up @@ -546,6 +562,35 @@ func TestGenerateSvcForCoordinator(t *testing.T) {
}
}

func TestGenerateSvcWithAnnotationsForCoordinator(t *testing.T) {
for _, tt := range []struct {
name string
rss *uniffleapi.RemoteShuffleService
expectedAnnotations []map[string]string
}{
{
name: "nodeport and headless services with annotations",
rss: buildRssWithSvcAnnotations(),
expectedAnnotations: []map[string]string{
{"annotation1": "value1"},
{"annotation1": "value1"},
{"annotation2": "value2"},
{"annotation2": "value2"}},
},
} {
t.Run(tt.name, func(tc *testing.T) {
_, _, services, _ := GenerateCoordinators(tt.rss)

for i, svc := range services {
match := reflect.DeepEqual(tt.expectedAnnotations[i], svc.Annotations)
if !match {
tc.Errorf("unexpected annotations: %v, expected: %v", svc.Annotations, tt.expectedAnnotations[i])
}
}
})
}
}

func TestGenerateAddresses(t *testing.T) {
assertion := assert.New(t)
rss := buildRssWithLabels()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.storage.util.StorageType;
Expand All @@ -41,18 +42,18 @@ protected static Map<String, String> getDynamicConf() {
Map<String, String> dynamicConf = new HashMap<>();
dynamicConf.put(RssMRConfig.RSS_REMOTE_STORAGE_PATH, HDFS_URI + "rss/test");
dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
dynamicConf.put(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
return dynamicConf;
}

@Test
public void dynamicConfTest() throws Exception {
run();
@ParameterizedTest
@MethodSource("clientTypeProvider")
public void dynamicConfTest(ClientType clientType) throws Exception {
run(clientType);
}

@Override
protected void updateRssConfiguration(Configuration jobConf) {
jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
protected void updateRssConfiguration(Configuration jobConf, ClientType clientType) {
jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.storage.util.StorageType;
Expand All @@ -41,14 +42,15 @@ protected static Map<String, String> getDynamicConf() {
return new HashMap<>();
}

@Test
public void hadoopConfTest() throws Exception {
run();
@ParameterizedTest
@MethodSource("clientTypeProvider")
public void hadoopConfTest(ClientType clientType) throws Exception {
run(clientType);
}

@Override
protected void updateRssConfiguration(Configuration jobConf) {
jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
protected void updateRssConfiguration(Configuration jobConf, ClientType clientType) {
jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
jobConf.set(RssMRConfig.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
jobConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, HDFS_URI + "rss/test");
jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.uniffle.common.ClientType;

Expand All @@ -34,14 +35,15 @@ public static void setupServers() throws Exception {
MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
}

@Test
public void largeSorterTest() throws Exception {
run();
@ParameterizedTest
@MethodSource("clientTypeProvider")
public void largeSorterTest(ClientType clientType) throws Exception {
run(clientType);
}

@Override
protected void updateRssConfiguration(Configuration jobConf) {
jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
protected void updateRssConfiguration(Configuration jobConf, ClientType clientType) {
jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
jobConf.set(
Expand Down
Loading

0 comments on commit 8b6c234

Please sign in to comment.