Skip to content

Commit

Permalink
[#5220] improvment(hadoop-catalog): Optimize the name properties keys…
Browse files Browse the repository at this point in the history
… for Hadoop catalog. (#5372)

### What changes were proposed in this pull request?

Replace the properties keys with `gravitino.bypass` prefix with a more
elegant one.

### Why are the changes needed?

For better user experience.

Fix: #5220 

### Does this PR introduce _any_ user-facing change?

N/A

### How was this patch tested?

Existing UTs and ITs.

Co-authored-by: Qi Yu <[email protected]>
  • Loading branch information
github-actions[bot] and yuqi1129 authored Oct 31, 2024
1 parent b913e98 commit 1d7e9d6
Show file tree
Hide file tree
Showing 26 changed files with 590 additions and 91 deletions.
3 changes: 3 additions & 0 deletions bundles/aliyun-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ dependencies {
// org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:323)
// org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3611)
implementation(libs.commons.lang)
implementation(project(":catalogs:catalog-common")) {
exclude("*")
}
}

tasks.withType(ShadowJar::class.java) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,49 @@
*/
package org.apache.gravitino.oss.fs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.storage.OSSProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
import org.apache.hadoop.fs.aliyun.oss.Constants;

public class OSSFileSystemProvider implements FileSystemProvider {

private static final String OSS_FILESYSTEM_IMPL = "fs.oss.impl";

// This map maintains the mapping relationship between the OSS properties in Gravitino and
// the Hadoop properties. Through this map, users can customize the OSS properties in Gravitino
// and map them to the corresponding Hadoop properties.
// For example, User can use oss-endpoint to set the endpoint of OSS 'fs.oss.endpoint' in
// Gravitino.
// GCS and S3 also have similar mapping relationship.

@VisibleForTesting
public static final Map<String, String> GRAVITINO_KEY_TO_OSS_HADOOP_KEY =
ImmutableMap.of(
OSSProperties.GRAVITINO_OSS_ENDPOINT, Constants.ENDPOINT_KEY,
OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, Constants.ACCESS_KEY_ID,
OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, Constants.ACCESS_KEY_SECRET);

@Override
public FileSystem getFileSystem(Path path, Map<String, String> config) throws IOException {
Configuration configuration = new Configuration();
config.forEach(
(k, v) -> {
configuration.set(k.replace("gravitino.bypass.", ""), v);
});

Map<String, String> hadoopConfMap =
FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_OSS_HADOOP_KEY);
// OSS do not use service loader to load the file system, so we need to set the impl class
if (!hadoopConfMap.containsKey(OSS_FILESYSTEM_IMPL)) {
hadoopConfMap.put(OSS_FILESYSTEM_IMPL, AliyunOSSFileSystem.class.getCanonicalName());
}

hadoopConfMap.forEach(configuration::set);
return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
}

Expand Down
3 changes: 3 additions & 0 deletions bundles/aws-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ dependencies {
implementation(libs.aws.policy)
implementation(libs.aws.sts)
implementation(libs.hadoop3.aws)
implementation(project(":catalogs:catalog-common")) {
exclude("*")
}
}

tasks.withType(ShadowJar::class.java) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,39 @@

package org.apache.gravitino.s3.fs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.storage.S3Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;

public class S3FileSystemProvider implements FileSystemProvider {

@VisibleForTesting
public static final Map<String, String> GRAVITINO_KEY_TO_S3_HADOOP_KEY =
ImmutableMap.of(
S3Properties.GRAVITINO_S3_ENDPOINT, Constants.ENDPOINT,
S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, Constants.ACCESS_KEY,
S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, Constants.SECRET_KEY);

@Override
public FileSystem getFileSystem(Path path, Map<String, String> config) throws IOException {
Configuration configuration = new Configuration();
config.forEach(
(k, v) -> {
configuration.set(k.replace("gravitino.bypass.", ""), v);
});
Map<String, String> hadoopConfMap =
FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_S3_HADOOP_KEY);

if (!hadoopConfMap.containsKey(Constants.AWS_CREDENTIALS_PROVIDER)) {
configuration.set(
Constants.AWS_CREDENTIALS_PROVIDER, Constants.ASSUMED_ROLE_CREDENTIALS_DEFAULT);
}
hadoopConfMap.forEach(configuration::set);
return S3AFileSystem.newInstance(path.toUri(), configuration);
}

Expand Down
3 changes: 3 additions & 0 deletions bundles/gcp-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ dependencies {
// runtime used
implementation(libs.commons.logging)
implementation(libs.hadoop3.gcs)
implementation(project(":catalogs:catalog-common")) {
exclude("*")
}
implementation(libs.google.auth.http)
implementation(libs.google.auth.credentials)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package org.apache.gravitino.gcs.fs;

import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.storage.GCSProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -30,15 +34,18 @@

public class GCSFileSystemProvider implements FileSystemProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(GCSFileSystemProvider.class);
private static final String GCS_SERVICE_ACCOUNT_JSON_FILE =
"fs.gs.auth.service.account.json.keyfile";

@VisibleForTesting
public static final Map<String, String> GRAVITINO_KEY_TO_GCS_HADOOP_KEY =
ImmutableMap.of(GCSProperties.GCS_SERVICE_ACCOUNT_JSON_PATH, GCS_SERVICE_ACCOUNT_JSON_FILE);

@Override
public FileSystem getFileSystem(Path path, Map<String, String> config) throws IOException {
Configuration configuration = new Configuration();
config.forEach(
(k, v) -> {
configuration.set(k.replace("gravitino.bypass.", ""), v);
});

FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_GCS_HADOOP_KEY)
.forEach(configuration::set);
LOGGER.info("Creating GCS file system with config: {}", config);
return GoogleHadoopFileSystem.newInstance(path.toUri(), configuration);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.storage;

public class GCSProperties {

// The path of service account JSON file of Google Cloud Storage.
public static final String GCS_SERVICE_ACCOUNT_JSON_PATH = "gcs-service-account-file";

private GCSProperties() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class OSSProperties {
// The static access key ID used to access OSS data.
public static final String GRAVITINO_OSS_ACCESS_KEY_ID = "oss-access-key-id";
// The static access key secret used to access OSS data.
public static final String GRAVITINO_OSS_ACCESS_KEY_SECRET = "oss-access-key-secret";
public static final String GRAVITINO_OSS_ACCESS_KEY_SECRET = "oss-secret-access-key";

private OSSProperties() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,8 @@ public class S3Properties {
// S3 external id
public static final String GRAVITINO_S3_EXTERNAL_ID = "s3-external-id";

// The S3 credentials provider class name.
public static final String GRAVITINO_S3_CREDS_PROVIDER = "s3-creds-provider";

private S3Properties() {}
}
4 changes: 4 additions & 0 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ dependencies {
exclude(group = "*")
}

implementation(project(":catalogs:catalog-common")) {
exclude(group = "*")
}

compileOnly(libs.guava)

implementation(libs.hadoop3.common) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@
*/
public interface FileSystemProvider {

/**
* The prefix of the configuration key that should be bypassed when setting the configuration to
* the FileSystem instance.
*
* <p>For example, if the configuration key passed to {@link
* FileSystemProvider#getFileSystem(Path, Map)} 'gravitino.bypass.fs.s3a.endpoint', the prefix
* 'gravitino.bypass.' should be removed when setting the configuration to the FileSystem
* instance.
*
* <p>User can use this prefix to pass the configuration item that has not been defined in
* Gravitino.
*/
String GRAVITINO_BYPASS = "gravitino.bypass.";

/**
* Get the FileSystem instance according to the configuration map and file path.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_HDFS_FS_PROVIDER;
import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_LOCAL_FS_PROVIDER;
import static org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider.GRAVITINO_BYPASS;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -101,4 +102,62 @@ public static FileSystemProvider getFileSystemProviderByName(
"File system provider with name '%s' not found in the file system provider list.",
fileSystemProviderName)));
}

/**
* Convert the Gravitino configuration to Hadoop configuration.
*
* <p>Predefined keys have the highest priority. If the key does not exist in the predefined keys,
* it will be set to the configuration. Keys with prefixes 'gravitino.bypass' has the lowest
* priority.
*
* <p>Consider the following example:
*
* <pre>
* config:
* k1=v1
* gravitino.bypass.k1=v2
* custom-k1=v3
* predefinedKeys:
* custom-k1=k1
* then the result will be:
* k1=v3
* </pre>
*
* @param config Gravitino configuration
* @return Hadoop configuration Map
*/
public static Map<String, String> toHadoopConfigMap(
Map<String, String> config, Map<String, String> predefinedKeys) {
Map<String, String> result = Maps.newHashMap();

// First, add those keys that start with 'gravitino.bypass' to the result map as it has the
// lowest priority.
config.forEach(
(k, v) -> {
if (k.startsWith(GRAVITINO_BYPASS)) {
String key = k.replace(GRAVITINO_BYPASS, "");
result.put(key, v);
}
});

// Then add those keys that are not in the predefined keys and not start with 'gravitino.bypass'
// to the result map.
config.forEach(
(k, v) -> {
if (!predefinedKeys.containsKey(k) && !k.startsWith(GRAVITINO_BYPASS)) {
result.put(k, v);
}
});

// Last, add those keys that are in the predefined keys to the result map.
config.forEach(
(k, v) -> {
if (predefinedKeys.containsKey(k)) {
String key = predefinedKeys.get(k);
result.put(key, v);
}
});

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -796,6 +797,39 @@ public void testTestConnection() {
ImmutableMap.of()));
}

@Test
void testTrailSlash() throws IOException {
try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) {

String location = "hdfs://localhost:9000";
Map<String, String> catalogProperties = Maps.newHashMap();
catalogProperties.put(HadoopCatalogPropertiesMetadata.LOCATION, location);

ops.initialize(catalogProperties, randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);

String schemaName = "schema1024";
NameIdentifier nameIdentifier = NameIdentifierUtil.ofSchema("m1", "c1", schemaName);

Map<String, String> schemaProperties = Maps.newHashMap();
schemaProperties.put(HadoopCatalogPropertiesMetadata.LOCATION, "hdfs://localhost:9000/user1");
StringIdentifier stringId = StringIdentifier.fromId(idGenerator.nextId());
schemaProperties =
Maps.newHashMap(StringIdentifier.newPropertiesWithId(stringId, schemaProperties));

Map<String, String> finalSchemaProperties = schemaProperties;

// If not fixed by #5296, this method will throw java.lang.IllegalArgumentException:
// java.net.URISyntaxException: Relative path in absolute URI: hdfs://localhost:9000schema1024
// After #5296, this method will throw java.lang.RuntimeException: Failed to create
// schema m1.c1.schema1024 location hdfs://localhost:9000/user1
Exception exception =
Assertions.assertThrows(
Exception.class,
() -> ops.createSchema(nameIdentifier, "comment", finalSchemaProperties));
Assertions.assertTrue(exception.getCause() instanceof ConnectException);
}
}

@Test
public void testGetFileLocation() throws IOException {
String schemaName = "schema1024";
Expand Down
Loading

0 comments on commit 1d7e9d6

Please sign in to comment.