Skip to content

Commit

Permalink
Introduce a timeout mechanism to get Hadoop File System.
Browse files Browse the repository at this point in the history
  • Loading branch information
yuqi1129 committed Oct 31, 2024
1 parent a2f7093 commit 58efcf6
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 1 deletion.
1 change: 1 addition & 0 deletions LICENSE.bin
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@
Apache Arrow
Rome
Jettison
Awaitility

This product bundles various third-party components also under the
Apache Software Foundation License 1.1
Expand Down
1 change: 1 addition & 0 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ dependencies {
}

implementation(libs.slf4j.api)
implementation(libs.awaitility)

testImplementation(project(":clients:client-java"))
testImplementation(project(":integration-test-common", "testArtifacts"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
Expand Down Expand Up @@ -68,6 +70,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -774,6 +778,27 @@ FileSystem getFileSystem(Path path, Map<String, String> config) throws IOExcepti
scheme, path, fileSystemProvidersMap.keySet(), fileSystemProvidersMap.values()));
}

return provider.getFileSystem(path, config);
int timeoutSeconds =
(int)
propertiesMetadata
.catalogPropertiesMetadata()
.getOrDefault(config, HadoopCatalogPropertiesMetadata.REQUEST_TIMEOUT_SECONDS);
try {
AtomicReference<FileSystem> fileSystem = new AtomicReference<>();
Awaitility.await()
.atMost(timeoutSeconds, TimeUnit.SECONDS)
.until(
() -> {
fileSystem.set(provider.getFileSystem(path, config));
return true;
});
return fileSystem.get();
} catch (ConditionTimeoutException e) {
throw new IOException(
String.format(
"Failed to get FileSystem for path: %s, scheme: %s, provider: %s, config: %s within %s seconds",
path, scheme, provider, config, timeoutSeconds),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
*/
public static final String DEFAULT_FS_PROVIDER = "default-filesystem-provider";

static final String REQUEST_TIMEOUT_SECONDS = "request-timeout-seconds";
static final int DEFAULT_REQUEST_TIMEOUT_SECONDS = 10;

public static final String BUILTIN_LOCAL_FS_PROVIDER = "builtin-local";
public static final String BUILTIN_HDFS_FS_PROVIDER = "builtin-hdfs";

Expand Down Expand Up @@ -81,6 +84,14 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
false /* immutable */,
BUILTIN_LOCAL_FS_PROVIDER, // please see LocalFileSystemProvider#name()
false /* hidden */))
.put(
REQUEST_TIMEOUT_SECONDS,
PropertyEntry.integerOptionalPropertyEntry(
REQUEST_TIMEOUT_SECONDS,
"Timeout to wait for to create the Hadoop file system",
false /* immutable */,
DEFAULT_REQUEST_TIMEOUT_SECONDS,
false /* hidden */))
// The following two are about authentication.
.putAll(KERBEROS_PROPERTY_ENTRIES)
.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
Expand Down

0 comments on commit 58efcf6

Please sign in to comment.