Skip to content

Commit

Permalink
[HOPSWORKS-1847] Online feature store storage connectors should not b…
Browse files Browse the repository at this point in the history
…e stored in the database (#691)
  • Loading branch information
SirOibaf authored Oct 7, 2020
1 parent 411bed8 commit f19a9cc
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.importjob.FeaturegroupImportJobDTO;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController;
import io.hops.hopsworks.common.featurestore.settings.FeaturestoreClientSettingsDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO;
Expand Down Expand Up @@ -108,8 +107,6 @@ public class FeaturestoreService {
private JWTHelper jWTHelper;
@EJB
private Settings settings;
@EJB
private OnlineFeaturestoreController onlineFeaturestoreController;
@Inject
private FeaturegroupService featuregroupService;
@Inject
Expand Down Expand Up @@ -278,20 +275,6 @@ public Response getFeaturestoreId(@Context SecurityContext sc, @PathParam("featu
FeaturestoreJdbcConnectorDTO onlineFeaturestoreConnector = null;
FeaturestoreClientSettingsDTO featurestoreClientSettingsDTO = new FeaturestoreClientSettingsDTO();
featurestoreClientSettingsDTO.setOnlineFeaturestoreEnabled(settings.isOnlineFeaturestore());
if(settings.isOnlineFeaturestore()
&& onlineFeaturestoreController.checkIfDatabaseExists(
onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject()))) {
Users user = jWTHelper.getUserPrincipal(sc);
String dbUsername = onlineFeaturestoreController.onlineDbUsername(project, user);
String dbName = onlineFeaturestoreController.getOnlineFeaturestoreDbName(project);
onlineFeaturestoreConnector =
featurestoreStorageConnectorController.getOnlineFeaturestoreConnector(user, project,
dbUsername, featurestore, dbName);
featurestoreDTO.setMysqlServerEndpoint(settings.getFeaturestoreJdbcUrl());
featurestoreDTO.setOnlineFeaturestoreSize(onlineFeaturestoreController.getDbSize(featurestore));
featurestoreDTO.setOnlineFeaturestoreName(featurestore.getProject().getName());
featurestoreDTO.setOnlineEnabled(true);
}
FeaturestoreMetadataDTO featurestoreMetadataDTO =
new FeaturestoreMetadataDTO(featurestoreDTO, featuregroups, trainingDatasets,
featurestoreClientSettingsDTO, storageConnectors, onlineFeaturestoreConnector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void setFeaturestoreId(Integer featurestoreId) throws FeaturestoreExcepti
@ApiKeyRequired( acceptedScopes = {ApiScope.FEATURESTORE}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER"})
@ApiOperation(value = "Get all storage connectors of a feature store",
response = FeaturestoreStorageConnectorDTO.class, responseContainer = "List")
public Response getStorageConnectors(@Context SecurityContext sc) {
public Response getStorageConnectors(@Context SecurityContext sc) throws FeaturestoreException {
List<FeaturestoreStorageConnectorDTO> featurestoreStorageConnectorDTOS =
featurestoreStorageConnectorController.getAllStorageConnectorsForFeaturestore(featurestore);
GenericEntity<List<FeaturestoreStorageConnectorDTO>> featurestoreStorageConnectorsGeneric =
Expand All @@ -157,7 +157,8 @@ public Response getStorageConnectors(@Context SecurityContext sc) {
response = FeaturestoreStorageConnectorDTO.class, responseContainer = "List")
public Response getStorageConnectorsOfType(
@ApiParam(value = "storage connector type", example = "JDBC")
@PathParam("connectorType") FeaturestoreStorageConnectorType connectorType, @Context SecurityContext sc) {
@PathParam("connectorType") FeaturestoreStorageConnectorType connectorType, @Context SecurityContext sc)
throws FeaturestoreException {
verifyStorageConnectorType(connectorType);
List<FeaturestoreStorageConnectorDTO> featurestoreStorageConnectorDTOS =
featurestoreStorageConnectorController.getAllStorageConnectorsForFeaturestoreWithType(featurestore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public Featurestore createProjectFeatureStore(Project project, Users user, Strin
* @param featurestore the featurestore entity
* @return a DTO representation of the featurestore
*/
public FeaturestoreDTO convertFeaturestoreToDTO(Featurestore featurestore) {
private FeaturestoreDTO convertFeaturestoreToDTO(Featurestore featurestore) {
String hiveDbDescription = featurestoreFacade.getHiveDatabaseDescription(featurestore.getHiveDbId());
FeaturestoreDTO featurestoreDTO = new FeaturestoreDTO(featurestore);
featurestoreDTO.setFeaturestoreDescription(hiveDbDescription);
Expand All @@ -270,7 +270,7 @@ public FeaturestoreDTO convertFeaturestoreToDTO(Featurestore featurestore) {
if (settings.isOnlineFeaturestore() &&
onlineFeaturestoreController.checkIfDatabaseExists(
onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject()))) {
featurestoreDTO.setMysqlServerEndpoint(settings.getFeaturestoreJdbcUrl());
featurestoreDTO.setMysqlServerEndpoint(onlineFeaturestoreController.getJdbcURL());
featurestoreDTO.setOnlineFeaturestoreSize(onlineFeaturestoreController.getDbSize(featurestore));
featurestoreDTO.setOnlineFeaturestoreName(featurestore.getProject().getName());
featurestoreDTO.setOnlineEnabled(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.hops.hopsworks.common.featurestore.online;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.dao.user.UserFacade;
import io.hops.hopsworks.common.dao.user.security.secrets.SecretsFacade;
import io.hops.hopsworks.common.featurestore.FeaturestoreConstants;
Expand All @@ -23,6 +24,7 @@
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.jdbc.FeaturestoreJdbcConnectorController;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.security.secrets.SecretsController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.FeaturestoreException;
Expand Down Expand Up @@ -78,6 +80,8 @@ public class OnlineFeaturestoreController {
private HdfsUsersController hdfsUsersController;
@EJB
private UserFacade userFacade;
@EJB
private ServiceDiscoveryController serviceDiscoveryController;

@PostConstruct
public void init() {
Expand Down Expand Up @@ -108,20 +112,16 @@ private Connection initConnection(String databaseName, Project project, Users us
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_SECRETS_ERROR,
Level.SEVERE, "Problem getting secrets for the JDBC connection to the online FS");
}
jdbcString = settings.getFeaturestoreJdbcUrl() + databaseName;
try {
return DriverManager.getConnection(jdbcString, dbUsername, password);
} catch (SQLException e) {
LOGGER.log(Level.SEVERE,
"Error initiating MySQL JDBC connection to online feature store for user: " + user.getEmail() + " error:"
+ e);
return DriverManager.getConnection(getJdbcURL(databaseName), dbUsername, password);
} catch (SQLException | ServiceDiscoveryException e) {
throw new FeaturestoreException(
RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE, Level.SEVERE,
"project: " + project.getName() + ", database: " + databaseName + ", db user:" + dbUsername +
", jdbcString: " + jdbcString, e.getMessage(), e);
}
}

/**
* Runs a update/create SQL query against an online featurestore database, impersonating the user making the request
*
Expand Down Expand Up @@ -425,23 +425,25 @@ public void removeOnlineFeaturestoreUser(Featurestore featurestore, Users user)
}

String dbUser = onlineDbUsername(featurestore.getProject().getName(), user.getUsername());
String connectorName = dbUser + FeaturestoreConstants.ONLINE_FEATURE_STORE_CONNECTOR_SUFFIX;

SecretId id = new SecretId(user.getUid(), dbUser);
secretsFacade.deleteSecret(id);
try {
onlineFeaturestoreFacade.removeOnlineFeaturestoreUser(dbUser);

List<FeaturestoreStorageConnectorDTO> jdbcConnectors =
featurestoreJdbcConnectorController.getJdbcConnectorsForFeaturestore(featurestore);
for (FeaturestoreStorageConnectorDTO storageConnector: jdbcConnectors) {
if (storageConnector.getName().equalsIgnoreCase(connectorName)) {
featurestoreJdbcConnectorController.removeFeaturestoreJdbcConnector(storageConnector.getId());
}
}
} catch (Exception e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_DELETING_ONLINE_FEATURESTORE_USER,
Level.SEVERE, "An error occurred when trying to delete the MySQL database user for an online feature store",
e.getMessage(), e);
}
String connectorName = dbUser + FeaturestoreConstants.ONLINE_FEATURE_STORE_CONNECTOR_SUFFIX;
List<FeaturestoreStorageConnectorDTO> jdbcConnectors =
featurestoreJdbcConnectorController.getJdbcConnectorsForFeaturestore(featurestore);
for (FeaturestoreStorageConnectorDTO storageConnector: jdbcConnectors) {
if (storageConnector.getName().equalsIgnoreCase(connectorName)) {
featurestoreJdbcConnectorController.removeFeaturestoreJdbcConnector(storageConnector.getId());
}
}
}


Expand Down Expand Up @@ -474,4 +476,15 @@ private String generateRandomUserPw() {
public Boolean checkIfDatabaseExists(String dbName) {
return onlineFeaturestoreFacade.checkIfDatabaseExists(dbName);
}

public String getJdbcURL() throws ServiceDiscoveryException {
return getJdbcURL("");
}

private String getJdbcURL(String dbName) throws ServiceDiscoveryException {
return "jdbc:mysql://" + serviceDiscoveryController
.getAnyAddressOfServiceWithDNS(ServiceDiscoveryController.HopsworksService.MYSQL).getAddress()
+ ":3306"
+ "/" + dbName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public class FeaturestoreStorageConnectorController {
* @param featurestore the featurestore to query
* @return List of JSON/XML DTOs of the storage connectors
*/
public List<FeaturestoreStorageConnectorDTO> getAllStorageConnectorsForFeaturestore(Featurestore featurestore) {
public List<FeaturestoreStorageConnectorDTO> getAllStorageConnectorsForFeaturestore(Featurestore featurestore)
throws FeaturestoreException{
List<FeaturestoreStorageConnectorDTO> featurestoreStorageConnectorDTOS = new ArrayList<>();
featurestoreStorageConnectorDTOS.addAll(
featurestoreJdbcConnectorController.getJdbcConnectorsForFeaturestore(featurestore));
Expand All @@ -79,7 +80,8 @@ public List<FeaturestoreStorageConnectorDTO> getAllStorageConnectorsForFeaturest
* @return List of JSON/XML DTOs of the storage connectors
*/
public List<FeaturestoreStorageConnectorDTO> getAllStorageConnectorsForFeaturestoreWithType(
Featurestore featurestore, FeaturestoreStorageConnectorType featurestoreStorageConnectorType) {
Featurestore featurestore, FeaturestoreStorageConnectorType featurestoreStorageConnectorType)
throws FeaturestoreException {
switch(featurestoreStorageConnectorType) {
case S3:
return featurestoreS3ConnectorController.getS3ConnectorsForFeaturestore(featurestore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorType;
import io.hops.hopsworks.common.hive.HiveController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.jdbc.FeaturestoreJdbcConnector;
Expand All @@ -43,7 +43,7 @@ public class FeaturestoreJdbcConnectorController {
@EJB
private FeaturestoreJdbcConnectorFacade featurestoreJdbcConnectorFacade;
@EJB
private Settings settings;
private ServiceDiscoveryController serviceDiscoveryController;
@EJB
private HiveController hiveController;

Expand Down Expand Up @@ -289,9 +289,17 @@ private void verifyUserInput(Featurestore featurestore, FeaturestoreJdbcConnecto
* @param featurestore featurestore to query for jdbc connectors
* @return list of XML/JSON DTOs of the jdbc connectors
*/
public List<FeaturestoreStorageConnectorDTO> getJdbcConnectorsForFeaturestore(Featurestore featurestore) {
List<FeaturestoreJdbcConnector> jdbcConnectors = featurestoreJdbcConnectorFacade.findByFeaturestore(featurestore);
return jdbcConnectors.stream().map(FeaturestoreJdbcConnectorDTO::new).collect(Collectors.toList());
public List<FeaturestoreStorageConnectorDTO> getJdbcConnectorsForFeaturestore(Featurestore featurestore)
throws FeaturestoreException {
try {
return featurestoreJdbcConnectorFacade.findByFeaturestore(featurestore).stream()
.map(FeaturestoreJdbcConnectorDTO::new)
.map(this::replaceOnlineFsConnectorUrl)
.collect(Collectors.toList());
} catch (RuntimeException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.STORAGE_CONNECTOR_GET_ERROR, Level.SEVERE,
"Error resolving MySQL DNS name", e.getMessage(), e);
}
}

/**
Expand All @@ -304,7 +312,12 @@ public List<FeaturestoreStorageConnectorDTO> getJdbcConnectorsForFeaturestore(Fe
public FeaturestoreJdbcConnectorDTO getJdbcConnectorWithIdAndFeaturestore(Featurestore featurestore, Integer id)
throws FeaturestoreException {
FeaturestoreJdbcConnector featurestoreJdbcConnector = verifyJdbcConnectorId(id, featurestore);
return new FeaturestoreJdbcConnectorDTO(featurestoreJdbcConnector);
try {
return replaceOnlineFsConnectorUrl(new FeaturestoreJdbcConnectorDTO(featurestoreJdbcConnector));
} catch (RuntimeException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.STORAGE_CONNECTOR_GET_ERROR, Level.SEVERE,
"Error resolving MySQL DNS name", e.getMessage(), e);
}
}

/**
Expand All @@ -319,14 +332,13 @@ public FeaturestoreJdbcConnectorDTO getJdbcConnectorWithIdAndFeaturestore(Featur
public FeaturestoreJdbcConnectorDTO createJdbcConnectorForOnlineFeaturestore(String onlineDbUsername,
Featurestore featurestore, String dbName) throws FeaturestoreException {
String connectorName = onlineDbUsername + FeaturestoreConstants.ONLINE_FEATURE_STORE_CONNECTOR_SUFFIX;
List<FeaturestoreStorageConnectorDTO> featurestoreConnectors = getJdbcConnectorsForFeaturestore(featurestore);
for (FeaturestoreStorageConnectorDTO storageConnector: featurestoreConnectors) {
if(connectorName.equalsIgnoreCase(storageConnector.getName())){
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_NAME, Level.FINE,

if (featurestoreJdbcConnectorFacade.findByName(connectorName).isPresent()) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_NAME, Level.FINE,
"a storage connector with that name already exists");
}
}
String connectionString = settings.getFeaturestoreJdbcUrl() + dbName;

String connectionString = "jdbc:mysql://mysql.service.consul:3306/" + dbName;
String arguments = FeaturestoreConstants.ONLINE_FEATURE_STORE_JDBC_PASSWORD_ARG + "=" +
FeaturestoreConstants.ONLINE_FEATURE_STORE_CONNECTOR_PASSWORD_TEMPLATE + "," +
FeaturestoreConstants.ONLINE_FEATURE_STORE_JDBC_USER_ARG + "=" + onlineDbUsername;
Expand All @@ -341,4 +353,18 @@ public FeaturestoreJdbcConnectorDTO createJdbcConnectorForOnlineFeaturestore(Str
return createFeaturestoreJdbcConnector(featurestore, featurestoreJdbcConnectorDTO);
}

private FeaturestoreJdbcConnectorDTO replaceOnlineFsConnectorUrl(FeaturestoreJdbcConnectorDTO jdbcConnectorDTO) {
String connectionString = "";
try {
connectionString = jdbcConnectorDTO.getConnectionString().replace("mysql.service.consul",
serviceDiscoveryController.getAnyAddressOfServiceWithDNS(ServiceDiscoveryController.HopsworksService.MYSQL)
.getAddress());
} catch (ServiceDiscoveryException e) {
throw new RuntimeException(e);
}
jdbcConnectorDTO.setConnectionString(connectionString);

return jdbcConnectorDTO;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import javax.persistence.TypedQuery;
import javax.validation.ConstraintViolationException;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -92,6 +93,16 @@ public FeaturestoreJdbcConnector findByIdAndFeaturestore(Integer id, Featurestor
}
}

public Optional<FeaturestoreJdbcConnector> findByName(String name) {
try {
return Optional.of(em.createNamedQuery("FeaturestoreJdbcConnector.findByName", FeaturestoreJdbcConnector.class)
.setParameter("name", name)
.getSingleResult());
} catch (NoResultException e) {
return Optional.empty();
}
}

/**
* Updates a jdbc connector
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public enum HopsworksService {
HOPSWORKS_APP("hopsworks.glassfish"),
JUPYTER_LOGSTASH("jupyter.logstash"),
REGISTRY("registry"),
CONSUL_SERVER("consul");
CONSUL_SERVER("consul"),
MYSQL("mysql");

private String name;
HopsworksService(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ public class Settings implements Serializable {
/* -------------------- Featurestore --------------- */
private static final String VARIABLE_FEATURESTORE_DEFAULT_QUOTA = "featurestore_default_quota";
private static final String VARIABLE_FEATURESTORE_DEFAULT_STORAGE_FORMAT = "featurestore_default_storage_format";
private static final String VARIABLE_FEATURESTORE_JDBC_URL = "featurestore_jdbc_url";
private static final String VARIABLE_ONLINE_FEATURESTORE = "featurestore_online_enabled";
private static final String VARIABLE_FG_PREVIEW_LIMIT = "fg_preview_limit";
private static final String VARIABLE_ONLINE_FEATURESTORE_TS = "featurestore_online_tablespace";
Expand Down Expand Up @@ -701,7 +700,6 @@ private void populateCache() {
FEATURESTORE_DB_DEFAULT_QUOTA = setStrVar(VARIABLE_FEATURESTORE_DEFAULT_QUOTA, FEATURESTORE_DB_DEFAULT_QUOTA);
FEATURESTORE_DB_DEFAULT_STORAGE_FORMAT =
setStrVar(VARIABLE_FEATURESTORE_DEFAULT_STORAGE_FORMAT, FEATURESTORE_DB_DEFAULT_STORAGE_FORMAT);
FEATURESTORE_JDBC_URL = setStrVar(VARIABLE_FEATURESTORE_JDBC_URL, FEATURESTORE_JDBC_URL);
ONLINE_FEATURESTORE = setBoolVar(VARIABLE_ONLINE_FEATURESTORE, ONLINE_FEATURESTORE);
ONLINE_FEATURESTORE_TS = setStrVar(VARIABLE_ONLINE_FEATURESTORE_TS, ONLINE_FEATURESTORE_TS);

Expand Down Expand Up @@ -3552,13 +3550,6 @@ public Boolean isHopsUtilInsecure() {
return isCloud() || isLocalHost();
}

private String FEATURESTORE_JDBC_URL = "jdbc:mysql://" + HOPSWORKS_IP + ":3306/";

public synchronized String getFeaturestoreJdbcUrl() {
checkCache();
return FEATURESTORE_JDBC_URL;
}

private Boolean REQUESTS_VERIFY = false;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
"fsjdbc"),
@NamedQuery(name = "FeaturestoreJdbcConnector.findById",
query = "SELECT fsjdbc FROM FeaturestoreJdbcConnector fsjdbc WHERE fsjdbc.id = :id"),
@NamedQuery(name = "FeaturestoreJdbcConnector.findByName",
query = "SELECT fsjdbc FROM FeaturestoreJdbcConnector fsjdbc WHERE fsjdbc.name = :name"),
@NamedQuery(name = "FeaturestoreJdbcConnector.findByFeaturestore", query = "SELECT fsjdbc " +
"FROM FeaturestoreJdbcConnector fsjdbc WHERE fsjdbc.featurestore = :featurestore"),
@NamedQuery(name = "FeaturestoreJdbcConnector.findByFeaturestoreAndId", query = "SELECT fsjdbc " +
Expand Down
Loading

0 comments on commit f19a9cc

Please sign in to comment.