Skip to content

Commit

Permalink
[FSTORE-920] support Jdbc test-connection (#1525)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhananjay-mk authored May 7, 2024
1 parent f2f44b9 commit 413f0b6
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@

package io.hops.hopsworks.common.featurestore.storageconnectors.connectionChecker;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.featurestore.OptionDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.jdbc.FeaturestoreJdbcConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.jdbc.FeaturestoreJdbcConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.snowflake.FeaturestoreSnowflakeConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.snowflake.FeaturestoreSnowflakeConnectorDTO;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.proxies.client.HttpClient;
import io.hops.hopsworks.common.util.OSProcessExecutor;
import io.hops.hopsworks.common.util.ProcessDescriptor;
Expand All @@ -31,8 +39,8 @@
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.file.PathUtils;
import org.opensearch.common.Strings;

import javax.annotation.PostConstruct;
Expand All @@ -45,6 +53,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -61,71 +71,131 @@ public class ConnectionChecker {
private FeaturestoreSnowflakeConnectorController snowflakeConnectorController;
@EJB
private ProjectUtils projectUtils;
private Path STAGING_PATH;
@EJB
private HttpClient httpClient;
@EJB
private DistributedFsService dfs;
@EJB
private HdfsUsersController hdfsUsersController;
@EJB
private FeaturestoreJdbcConnectorController featurestoreJdbcConnectorController;
private Path STAGING_PATH;
private Path REQUEST_SCRATCH_DIR;

@PostConstruct
public void init() {
STAGING_PATH = Paths.get(settings.getStagingDir(), "connectors");
}

public ConnectionCheckerDTO checkConnection(Users user, Project project, Featurestore featurestore,
FeaturestoreStorageConnectorDTO storageConnectorDto) throws FeaturestoreException {
FeaturestoreStorageConnectorDTO storageConnectorDto)
throws FeaturestoreException {

File jsonFile = null;
// create request scratch directory under STAGING_PATH
REQUEST_SCRATCH_DIR = Paths.get(STAGING_PATH.toString(),
String.format("user_%s_project_%d_fs_%d_connector_%s", user.getUsername(), project.getId(),
featurestore.getId(), storageConnectorDto.getName()));
try {
Files.createDirectories(REQUEST_SCRATCH_DIR);
} catch (IOException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CONNECTION_CHECKER_ERROR, Level.SEVERE,
"Failed to create staging directory", e.getMessage(), e);
}

switch (storageConnectorDto.getStorageConnectorType()) {
case SNOWFLAKE:
// verify dto
snowflakeConnectorController.verifyConnectorDTO((FeaturestoreSnowflakeConnectorDTO) storageConnectorDto);
break;
case JDBC:
FeaturestoreJdbcConnectorDTO dto = (FeaturestoreJdbcConnectorDTO) storageConnectorDto;
featurestoreJdbcConnectorController.validationDTO(dto);
List<OptionDTO> optionsList = dto.getArguments();
if (!optionsList.isEmpty()) {
// append arguments as query parameters to connection string
dto.setConnectionString(getQueryParamsUrl(optionsList, dto.getConnectionString()));
}
if (!Strings.isNullOrEmpty(dto.getDriverPath())) {
copyKeyFile(project, user, dto.getDriverPath());
}
break;
default:
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_TYPE, Level.FINE,
"Storage connector type '" + storageConnectorDto.getStorageConnectorType() + "' is not yet supported");
}

try {
Files.createDirectories(STAGING_PATH);
} catch (IOException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CONNECTION_CHECKER_ERROR, Level.SEVERE,
"Failed to create staging directory", e.getMessage(), e);
}

try {
jsonFile = new File(buildInputFilePath(user, project, featurestore).toUri());
LOGGER.log(Level.FINE, String.format("Creating input JSON at path %S", jsonFile.getAbsolutePath()));
LOGGER.log(Level.FINE, "Creating input JSON at path {}", jsonFile.getAbsolutePath());
// create input json file from dto
httpClient.getObjectMapper().writeValue(jsonFile, storageConnectorDto);
ObjectMapper objMapper = httpClient.getObjectMapper();
// necessary for java.time.Instant which is not supported by default and used by RedshiftDTO expiration field
objMapper.registerModule(new JavaTimeModule());
objMapper.writeValue(jsonFile, storageConnectorDto);
// execute transaction
ProcessResult result = execute(jsonFile.toPath());
LOGGER.log(
Level.FINE,
String.format("Output response for connection test: %s%s", result.getStdout(), result.getStderr())
Level.FINE, () ->
String.format("Output response for connection test: %s%s", result.getStdout(), result.getStderr())
);
// set result
ConnectionCheckerDTO outputDto = new ConnectionCheckerDTO();
outputDto.setConnectionOutput(String.format("%s%s", result.getStdout(), result.getStderr()));
outputDto.setStatusCode(result.getExitCode());
outputDto.setStorageConnectorDTO(storageConnectorDto);

return outputDto;
} catch (IOException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CONNECTION_CHECKER_ERROR, Level.SEVERE,
"Failed to create JSON file from input request", e.getMessage(), e);
} finally {
deleteStagedFile(jsonFile);
deleteStagedDir();
}
}

private Path buildInputFilePath(Users user, Project project, Featurestore fs) {
return Paths.get(STAGING_PATH.toString(), String.format("%s_%s_%s_%s.json", user.getUsername(),
private String getQueryParamsUrl(List<OptionDTO> optionsList, String connectionString) {
StringJoiner sj;
if (connectionString.contains("?")) {
sj = new StringJoiner("&", "&", "");
} else {
sj = new StringJoiner("&", "?", "");
}
// Loop through the optionsList and add each name-value pair to the StringJoiner
for (OptionDTO args : optionsList) {
sj.add(args.getName() + "=" + args.getValue());
}
// Append the StringJoiner result to the connectionString
connectionString += sj.toString();
return connectionString;
}

private void copyKeyFile(Project project, Users user, String keyPath) throws FeaturestoreException {
DistributedFileSystemOps udfso = dfs.getDfsOps(hdfsUsersController.getHdfsUserName(project, user));
try {
udfso.copyToLocal(keyPath, REQUEST_SCRATCH_DIR.toString());
} catch (IOException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CONNECTION_CHECKER_ERROR, Level.SEVERE,
"Failed to copy key file from HDFS", "", e);
} finally {
dfs.closeDfsClient(udfso);
}
}


private Path buildInputFilePath(Users user, Project project, Featurestore fs) throws FeaturestoreException {
try {
Files.createDirectories(REQUEST_SCRATCH_DIR);
} catch (IOException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CONNECTION_CHECKER_ERROR, Level.SEVERE,
"Failed to create staging directory", e.getMessage(), e);
}
return Paths.get(REQUEST_SCRATCH_DIR.toString(), String.format("%s_%s_%s_%s.json", user.getUsername(),
project.getId(), fs.getId(), System.currentTimeMillis()));
}

private void getContainerLogs(Path logFile) throws FeaturestoreException {
if (Files.exists(logFile)) {
try {
LOGGER.log(Level.FINE, String.format("Reading container logs from path %s", logFile.toAbsolutePath()));
LOGGER.log(Level.FINE, "Reading container logs from path {}", logFile.toAbsolutePath());
String logLines = String.join("\n", Files.readAllLines(logFile));
if (!Strings.isNullOrEmpty(logLines)) {
LOGGER.log(Level.INFO, logLines);
Expand All @@ -134,20 +204,18 @@ private void getContainerLogs(Path logFile) throws FeaturestoreException {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FILE_READ_ERROR, Level.SEVERE,
String.format("Failure during reading container logs %s", logFile), "", e);
} finally {
deleteStagedFile(logFile.toFile());
deleteStagedDir();
}
}
}

private void deleteStagedFile(File file) throws FeaturestoreException {
if (file != null) {
try {
FileUtils.delete(file);
LOGGER.log(Level.FINE, "Deleted staged file: " + file.getAbsolutePath());
} catch (IOException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FILE_DELETION_ERROR, Level.SEVERE,
String.format("Failure during cleaning up staging file %s", file.getAbsolutePath()), "", e);
}
private void deleteStagedDir() throws FeaturestoreException {
try {
PathUtils.delete(REQUEST_SCRATCH_DIR);
LOGGER.log(Level.FINE, "Deleted staged directory: {}", REQUEST_SCRATCH_DIR);
} catch (IOException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FILE_DELETION_ERROR, Level.SEVERE,
String.format("Failure during cleaning up staged directory %s", REQUEST_SCRATCH_DIR), "", e);
}
}

Expand All @@ -165,8 +233,8 @@ protected ProcessResult execute(Path jsonFile) throws FeaturestoreException {
String containerLogFileName = FilenameUtils.getBaseName(jsonFile.toString()).concat(".log");
LOGGER.log(Level.INFO, "Executing process to start docker container for testing connection");
ProcessResult processResult = osProcessExecutor.execute(processDescriptor);
if (processResult.getExitCode()!=0) {
getContainerLogs(Paths.get(jsonFile.getParent().toString(), containerLogFileName));
if (processResult.getExitCode() != 0) {
getContainerLogs(Paths.get(jsonFile.getParent().toString(), containerLogFileName));
}
LOGGER.log(Level.INFO, "Ended process of docker container for testing connection");
return processResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public FeaturestoreJdbcConnectorDTO getJdbcConnectorDTO(Users user, Project proj
}
replaceOnlineFsConnectorUrl(featurestoreJdbcConnectorDTO);
replaceOfflineFsConnectorUrl(featurestoreJdbcConnectorDTO);
featurestoreJdbcConnectorDTO.setDriverPath(featurestoreConnector.getJdbcConnector().getDriverPath());

return featurestoreJdbcConnectorDTO;
}
Expand Down Expand Up @@ -244,6 +245,10 @@ private FeaturestoreJdbcConnector createOrUpdateJdbcConnector(Users user, Featur
// create and set secret
createOrUpdateSecret(user, featurestore, featurestoreJdbcConnectorDTO,
featurestoreJdbcConnector, password, featurestoreJdbcConnector.getPasswordSecret());

if (!Strings.isNullOrEmpty(featurestoreJdbcConnectorDTO.getDriverPath())) {
featurestoreJdbcConnector.setDriverPath(featurestoreJdbcConnectorDTO.getDriverPath());
}
return featurestoreJdbcConnector;
}

Expand All @@ -253,7 +258,7 @@ private FeaturestoreJdbcConnector createOrUpdateJdbcConnector(Users user, Featur
* @return List<OptionDTO>
* @throws FeaturestoreException
*/
private List<OptionDTO> validationDTO(FeaturestoreJdbcConnectorDTO featurestoreJdbcConnectorDTO)
public List<OptionDTO> validationDTO(FeaturestoreJdbcConnectorDTO featurestoreJdbcConnectorDTO)
throws FeaturestoreException {
verifyJdbcConnectorConnectionString(featurestoreJdbcConnectorDTO.getConnectionString());
List<OptionDTO> arguments = featurestoreJdbcConnectorDTO.getArguments();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class FeaturestoreJdbcConnectorDTO extends FeaturestoreStorageConnectorDT
private String connectionString;
private List<OptionDTO> arguments;

private String driverPath;

public FeaturestoreJdbcConnectorDTO() {
}

Expand All @@ -67,11 +69,19 @@ public void setArguments(List<OptionDTO> arguments) {
this.arguments = arguments;
}

@XmlElement
public String getDriverPath() {return driverPath; };

public void setDriverPath(String driverPath) {
this.driverPath = driverPath;
}

@Override
public String toString() {
return "FeaturestoreJdbcConnectorDTO{" +
"connectionString='" + connectionString + '\'' +
", arguments=" + arguments +
", driverPath='" + driverPath + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public class FeaturestoreJdbcConnector implements Serializable {
@ManyToOne(cascade = CascadeType.ALL)
private Secret passwordSecret;


@Column(name = "driver_path")
private String driverPath;

public static long getSerialVersionUID() {
return serialVersionUID;
}
Expand Down Expand Up @@ -91,6 +95,15 @@ public void setPasswordSecret(Secret encryptionSecret) {
this.passwordSecret = encryptionSecret;
}


public String getDriverPath() {
return driverPath;
}

public void setDriverPath(String driverPath) {
this.driverPath = driverPath;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down

0 comments on commit 413f0b6

Please sign in to comment.