Skip to content

Commit

Permalink
Merge branch 'disconnected'
Browse files Browse the repository at this point in the history
  • Loading branch information
dstreev committed Dec 9, 2022
2 parents 1855db7 + d86f417 commit c700355
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 30 deletions.
Binary file modified .gitignore
Binary file not shown.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,21 @@ You'll run `hms-mirror` from a **LEFT** cluster edgenode. This node will requir

There are cases where 'HDFS' isn't the primary data source. So the only thing the cluster share is storage in these 'common' storage units. You want to transfer the schema, but the data doesn't need to move (at least for 'EXTERNAL' (non-transactional) tables). In this case, try the `-d|--data-strategy` COMMON. The schema's will go through all the needed conversions while the data remains in the same location.

### Disconnected Mode

Use the `-rid|--right-is-disconnected` mode when you need to build (and/or) transfer schema/datasets from one cluster to another, but you can't connect to both at the same time. See the issues log for details regarding the cases [here issue #17](../../issues/17)

Use cases:
- Schema Only Transfers
- SQL, EXPORT_IMPORT, and HYBRID only when -is or -cs is used. This might be the case when the clusters are secure (kerberized), but don't share a common kerberos domain/user auth. So an intermediate or common storage location will be used to migrate the data.
- Both clusters (and HS2 endpoints) are Kerberized, but the clusters are NOT the same major hadoop version. In this case, hms-mirror doesn't support connecting to both of these endpoints at the same time. Running in the disconnected mode will help push through with the conversion.

hms-mirror will run as normal, with the exception of examining and running scripts against the right cluster. It will be assumed that the RIGHT cluster elements do NOT exist.

The RIGHT_ 'execution' scripts and distcp commands will need to be run MANUALLY via Beeline on the RIGHT cluster.

Note: This will be know as the "right-is-disconnected" option. Which means the process should be run from a node that has access to the "left" cluster. This is 'counter' to our general recommendation that the process should be run from the 'right' cluster.

## Setup

### Binary Package
Expand Down Expand Up @@ -888,6 +903,8 @@ Hive Metastore Migration Utility
definitions. This will allow the system defaults
to take over and define the location of the new
datasets.
-rid,--right-is-disconnected Don't attempt to connect to the 'right' cluster
and run in this mode
-ro,--read-only For SCHEMA_ONLY, COMMON, and LINKED data
strategies set RIGHT table to NOT purge on DROP
-rr,--reset-right Use this for testing to remove the database on
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

<groupId>com.cloudera.utils.hadoop</groupId>
<artifactId>hms-mirror</artifactId>
<version>1.5.3.6-SNAPSHOT</version>
<version>1.5.4.0-SNAPSHOT</version>
<name>hms-mirror</name>

<url>https://github.com/dstreev/hms_mirror</url>
Expand Down
42 changes: 38 additions & 4 deletions src/main/java/com/cloudera/utils/hadoop/hms/Mirror.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.TimeUnit;

import static com.cloudera.utils.hadoop.hms.mirror.MessageCode.ENVIRONMENT_CONNECTION_ISSUE;
import static com.cloudera.utils.hadoop.hms.mirror.MessageCode.ENVIRONMENT_DISCONNECTED;

public class Mirror {
private static final Logger LOG = LogManager.getLogger(Mirror.class);
Expand Down Expand Up @@ -405,6 +406,10 @@ public long init(String[] args) {
config.setTransferOwnership(Boolean.TRUE);
}

if (cmd.hasOption("rid")) {
config.getCluster(Environment.RIGHT).getHiveServer2().setDisconnected(Boolean.TRUE);
}

String dataStrategyStr = cmd.getOptionValue("d");
// default is SCHEMA_ONLY
if (dataStrategyStr != null) {
Expand Down Expand Up @@ -723,13 +728,27 @@ public long init(String[] args) {
try {
conn = connPools.getEnvironmentConnection(target);
if (conn == null) {
config.getErrors().set(ENVIRONMENT_CONNECTION_ISSUE.getCode(), new Object[]{target});
return config.getErrors().getReturnCode();
if (target == Environment.RIGHT && config.getCluster(target).getHiveServer2().getDisconnected()) {
// Skip error. Set Warning that we're disconnected.
config.getWarnings().set(ENVIRONMENT_DISCONNECTED.getCode(), new Object[]{target});
} else {
config.getErrors().set(ENVIRONMENT_CONNECTION_ISSUE.getCode(), new Object[]{target});
return config.getErrors().getReturnCode();
}
} else {
// Exercise the connection.
stmt = conn.createStatement();
stmt.execute("SELECT 1");
}
} catch (SQLException se) {
if (target == Environment.RIGHT && config.getCluster(target).getHiveServer2().getDisconnected()) {
// Set warning that RIGHT is disconnected.
config.getWarnings().set(ENVIRONMENT_DISCONNECTED.getCode(), new Object[]{target});
} else {
LOG.error(se);
config.getErrors().set(ENVIRONMENT_CONNECTION_ISSUE.getCode(), new Object[]{target});
return config.getErrors().getReturnCode();
}
} catch (Throwable t) {
LOG.error(t);
config.getErrors().set(ENVIRONMENT_CONNECTION_ISSUE.getCode(), new Object[]{target});
Expand All @@ -754,7 +773,10 @@ public long init(String[] args) {
// Don't load the datasource for the right with DUMP strategy.
break;
default:
config.getCluster(Environment.RIGHT).setPools(connPools);
// Don't set the Pools when Disconnected.
if (!config.getCluster(Environment.RIGHT).getHiveServer2().getDisconnected()) {
config.getCluster(Environment.RIGHT).setPools(connPools);
}
}

if (config.isConnectionKerberized()) {
Expand Down Expand Up @@ -1007,6 +1029,9 @@ public void doit() {
runbookFile.write("Execute was **ON**, so many of the scripts have been run already. Verify status " +
"in the above report. `distcp` actions (if requested/applicable) need to be run manually. " +
"Some cleanup scripts may have been run if no `distcp` actions were requested.\n\n");
if (config.getCluster(Environment.RIGHT).getHiveServer2().getDisconnected()) {
runbookFile.write("Process ran with RIGHT environment 'disconnected'. All RIGHT scripts will need to be run manually.\n\n");
}
} else {
runbookFile.write("Execute was **OFF**. All actions will need to be run manually. See below steps.\n\n");
}
Expand Down Expand Up @@ -1073,7 +1098,11 @@ public void doit() {
LOG.info("RIGHT Execution Script is here: " + dbRightExecuteFile);
runbookFile.write(step++ + ". **RIGHT** clusters SQL script. ");
if (config.isExecute()) {
runbookFile.write(" (Has been executed already, check report file details)");
if (!config.getCluster(Environment.RIGHT).getHiveServer2().getDisconnected()) {
runbookFile.write(" (Has been executed already, check report file details)");
} else {
runbookFile.write(" (Has NOT been executed because the environment is NOT connected. Review and run scripts manually.)");
}
} else {
runbookFile.write("(Has NOT been executed yet)");
}
Expand Down Expand Up @@ -1358,6 +1387,11 @@ private Options getOptions() {
daOption.setRequired(Boolean.FALSE);
options.addOption(daOption);

Option ridOption = new Option("rid", "right-is-disconnected", false,
"Don't attempt to connect to the 'right' cluster and run in this mode");
ridOption.setRequired(Boolean.FALSE);
options.addOption(ridOption);

Option ipOption = new Option("ip", "in-place", false,
"Downgrade ACID tables to EXTERNAL tables with purge.");
ipOption.setRequired(Boolean.FALSE);
Expand Down
14 changes: 11 additions & 3 deletions src/main/java/com/cloudera/utils/hadoop/hms/mirror/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,17 @@ public Boolean runTableSql(List<Pair> sqlList, TableMirror tblMirror, Environmen
// conn will be null if config.execute != true.
conn = getConnection();

if (conn == null && config.isExecute()) {
if (conn == null && config.isExecute() && !this.getHiveServer2().getDisconnected()) {
// this is a problem.
rtn = Boolean.FALSE;
tblMirror.addIssue(getEnvironment(), "Connection missing. This is a bug.");
}

if (conn == null && this.getHiveServer2().getDisconnected()) {
tblMirror.addIssue(getEnvironment(), "Running in 'disconnected' mode. NO RIGHT operations will be done. " +
"The scripts will need to be run 'manually'.");
}

if (conn != null) {
Statement stmt = null;
try {
Expand Down Expand Up @@ -541,15 +546,18 @@ public Boolean runDatabaseSql(DBMirror dbMirror, Pair dbSqlPair) {
try {
conn = getConnection();

if (conn == null && config.isExecute()) {
if (conn == null && config.isExecute() && !this.getHiveServer2().getDisconnected()) {
// this is a problem.
rtn = Boolean.FALSE;
dbMirror.addIssue(getEnvironment(), "Connection missing. This is a bug.");
}

if (conn == null && this.getHiveServer2().getDisconnected()) {
dbMirror.addIssue(getEnvironment(), "Running in 'disconnected' mode. NO RIGHT operations will be done. " +
"The scripts will need to be run 'manually'.");
}

if (conn != null) {

if (dbMirror != null)
LOG.debug(getEnvironment() + " - " + dbSqlPair.getDescription() + ": " + dbMirror.getName());
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,10 @@ && getMigrateACID().isDowngrade()
HiveServer2Config rightHS2 = this.getCluster(Environment.RIGHT).getHiveServer2();

if (rightHS2 != null) {
// TODO: Add validation for -rid (right-is-disconnected) option.
// - Only applies to SCHEMA_ONLY, SQL, EXPORT_IMPORT, and HYBRID data strategies.
// -
//
if (getDataStrategy() != DataStrategy.STORAGE_MIGRATION && !rightHS2.isValidUri()) {
if (!this.getDataStrategy().equals(DataStrategy.DUMP)) {
rtn = Boolean.FALSE;
Expand Down Expand Up @@ -1040,7 +1044,8 @@ public Boolean checkConnections() {
Set<Environment> envs = Sets.newHashSet(Environment.LEFT, Environment.RIGHT);
for (Environment env : envs) {
Cluster cluster = clusters.get(env);
if (cluster != null && cluster.getHiveServer2() != null && cluster.getHiveServer2().isValidUri()) {
if (cluster != null && cluster.getHiveServer2() != null && cluster.getHiveServer2().isValidUri() &&
!cluster.getHiveServer2().getDisconnected()) {
Connection conn = null;
try {
conn = cluster.getConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,31 +74,32 @@ protected void initPooledDataSources() {

for (Environment environment : environments) {
HiveServer2Config hs2Config = hiveServerConfigs.get(environment);
if (!hs2Config.getDisconnected()) {
ConnectionFactory connectionFactory =
new DriverManagerConnectionFactory(hs2Config.getUri(), hs2Config.getConnectionProperties());

ConnectionFactory connectionFactory =
new DriverManagerConnectionFactory(hs2Config.getUri(), hs2Config.getConnectionProperties());
PoolableConnectionFactory poolableConnectionFactory =
new PoolableConnectionFactory(connectionFactory, null);

PoolableConnectionFactory poolableConnectionFactory =
new PoolableConnectionFactory(connectionFactory, null);
ObjectPool<PoolableConnection> connectionPool =
new GenericObjectPool<>(poolableConnectionFactory);

ObjectPool<PoolableConnection> connectionPool =
new GenericObjectPool<>(poolableConnectionFactory);
poolableConnectionFactory.setPool(connectionPool);

poolableConnectionFactory.setPool(connectionPool);

PoolingDataSource poolingDatasource = new PoolingDataSource<>(connectionPool);
PoolingDataSource poolingDatasource = new PoolingDataSource<>(connectionPool);
// poolingDatasource.setLoginTimeout(10);

dataSources.put(environment, poolingDatasource);
Connection conn = null;
try {
conn = getEnvironmentConnection(environment);
} catch (Throwable t) {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
throw new RuntimeException(e);
dataSources.put(environment, poolingDatasource);
Connection conn = null;
try {
conn = getEnvironmentConnection(environment);
} catch (Throwable t) {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
}
Expand All @@ -111,7 +112,9 @@ public synchronized Connection getEnvironmentConnection(Environment environment)
if (lclDriver != null) {
DriverManager.registerDriver(lclDriver);
try {
conn = getEnvironmentDataSource(environment).getConnection();
DataSource ds = getEnvironmentDataSource(environment);
if (ds != null)
conn = ds.getConnection();
} catch (Throwable se) {
se.printStackTrace();
LOG.error(se);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

public class HiveServer2Config {
private String uri = null;
private Boolean disconnected = Boolean.FALSE;
private Properties connectionProperties;
private String jarFile = null;

Expand All @@ -33,6 +34,14 @@ public void setUri(String uri) {
this.uri = uri;
}

public Boolean getDisconnected() {
return disconnected;
}

public void setDisconnected(Boolean disconnected) {
this.disconnected = disconnected;
}

public Properties getConnectionProperties() {
if (connectionProperties == null) {
setConnectionProperties(new Properties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ public enum MessageCode {
DISTCP_RDL_WO_WAREHOUSE_DIR(55, "When using `-rdl|--reset-to-default-location` you must also specify " +
"warehouse locations `-wd|-ewd` to build the `distcp` workplans."),
ENCRYPT_PASSWORD(56, "Encrypted Password {0}"),
DECRYPT_PASSWORD(57, "Decrypted Password {0}")
DECRYPT_PASSWORD(57, "Decrypted Password {0}"),
ENVIRONMENT_DISCONNECTED(58, "Environment {0} is disconnected. Current db/table status could not be determined. " +
"All actions will assume they don't exist.\n\nStrategies/methods of sync that require the 'RIGHT' cluster or 'LEFT' cluster " +
"to be linked may not work without a `common-storage` or `intermediate-storage` option that will bridge the gap.")

;

Expand Down

0 comments on commit c700355

Please sign in to comment.