Skip to content

Commit

Permalink
Adding Option to Skip Link Check. This is helpful when going from clo…
Browse files Browse the repository at this point in the history
…ud storage resources and not having to configure hms-mirror with credentials and libs for those storage environments.
  • Loading branch information
dstreev committed Apr 12, 2023
1 parent 46df0f1 commit 8b261fc
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 33 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

<groupId>com.cloudera.utils.hadoop</groupId>
<artifactId>hms-mirror</artifactId>
<version>1.5.4.3.2-SNAPSHOT</version>
<version>1.5.4.4-SNAPSHOT</version>
<name>hms-mirror</name>

<url>https://github.com/dstreev/hms_mirror</url>
Expand Down Expand Up @@ -62,7 +62,7 @@
<commons-text.version>1.10.0</commons-text.version>
<junit.version>4.13.1</junit.version>

<hadoop-cli.version>2.4.1.0</hadoop-cli.version>
<hadoop-cli.version>2.4.3.0</hadoop-cli.version>

</properties>

Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/cloudera/utils/hadoop/hms/Mirror.java
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ public long init(String[] args) {
config.setDatabaseOnly(Boolean.TRUE);
}

if (cmd.hasOption("slc")) {
config.setSkipLinkCheck(Boolean.TRUE);
}

if (cmd.hasOption("ma")) {
config.getMigrateACID().setOn(Boolean.TRUE);
String bucketLimit = cmd.getOptionValue("ma");
Expand Down Expand Up @@ -1451,6 +1455,13 @@ private Options getOptions() {
ipOption.setRequired(Boolean.FALSE);
options.addOption(ipOption);

Option skipLinkTestOption = new Option("slc", "skip-link-check", false,
"Skip Link Check. Use when going between or to Cloud Storage to avoid having to configure " +
"hms-mirror with storage credentials and libraries. This does NOT preclude your Hive Server 2 and " +
"compute environment from such requirements.");
skipLinkTestOption.setRequired(Boolean.FALSE);
options.addOption(skipLinkTestOption);

// Non Native Migrations
Option mnnOption = new Option("mnn", "migrate-non-native", false,
"Migrate Non-Native tables (if strategy allows). These include table definitions that rely on " +
Expand Down
74 changes: 43 additions & 31 deletions src/main/java/com/cloudera/utils/hadoop/hms/mirror/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class Config {
private boolean copyAvroSchemaUrls = Boolean.FALSE;
private DataStrategy dataStrategy = DataStrategy.SCHEMA_ONLY;
private Boolean databaseOnly = Boolean.FALSE;
private Boolean skipLinkCheck = Boolean.FALSE;
private String[] databases = null;
private LegacyTranslations legacyTranslations = new LegacyTranslations();
@JsonIgnore
Expand Down Expand Up @@ -428,6 +429,14 @@ public void setDatabaseOnly(Boolean databaseOnly) {
this.databaseOnly = databaseOnly;
}

public Boolean getSkipLinkCheck() {
return skipLinkCheck;
}

public void setSkipLinkCheck(Boolean skipLinkCheck) {
this.skipLinkCheck = skipLinkCheck;
}

public DataStrategy getDataStrategy() {
return dataStrategy;
}
Expand Down Expand Up @@ -1016,39 +1025,42 @@ && getMigrateACID().isDowngrade()

protected Boolean linkTest() {
Boolean rtn = Boolean.FALSE;
HadoopSession session = null;
try {
session = getCliPool().borrow();
LOG.info("Performing Cluster Link Test to validate cluster 'hcfsNamespace' availability.");
// TODO: develop a test to copy data between clusters.
String leftHCFSNamespace = this.getCluster(Environment.LEFT).getHcfsNamespace();
String rightHCFSNamespace = this.getCluster(Environment.RIGHT).getHcfsNamespace();

// List User Directories on LEFT
String leftlsTestLine = "ls " + leftHCFSNamespace + "/user";
String rightlsTestLine = "ls " + rightHCFSNamespace + "/user";
LOG.info("LEFT ls testline: " + leftlsTestLine);
LOG.info("RIGHT ls testline: " + rightlsTestLine);

CommandReturn lcr = session.processInput(leftlsTestLine);
if (lcr.isError()) {
throw new RuntimeException("Link to RIGHT cluster FAILED.\n " + lcr.getError() +
"\nCheck configuration and hcfsNamespace value. " +
"Check the documentation about Linking clusters: https://github.com/dstreev/hms-mirror#linking-clusters-storage-layers");
}
CommandReturn rcr = session.processInput(rightlsTestLine);
if (rcr.isError()) {
throw new RuntimeException("Link to LEFT cluster FAILED.\n " + rcr.getError() +
"\nCheck configuration and hcfsNamespace value. " +
"Check the documentation about Linking clusters: https://github.com/dstreev/hms-mirror#linking-clusters-storage-layers");
}
if (this.skipLinkCheck) {
LOG.warn("Skipping Link Check.");
rtn = Boolean.TRUE;
} finally {
if (session != null)
getCliPool().returnSession(session);
} else {
HadoopSession session = null;
try {
session = getCliPool().borrow();
LOG.info("Performing Cluster Link Test to validate cluster 'hcfsNamespace' availability.");
// TODO: develop a test to copy data between clusters.
String leftHCFSNamespace = this.getCluster(Environment.LEFT).getHcfsNamespace();
String rightHCFSNamespace = this.getCluster(Environment.RIGHT).getHcfsNamespace();

// List User Directories on LEFT
String leftlsTestLine = "ls " + leftHCFSNamespace + "/user";
String rightlsTestLine = "ls " + rightHCFSNamespace + "/user";
LOG.info("LEFT ls testline: " + leftlsTestLine);
LOG.info("RIGHT ls testline: " + rightlsTestLine);

CommandReturn lcr = session.processInput(leftlsTestLine);
if (lcr.isError()) {
throw new RuntimeException("Link to RIGHT cluster FAILED.\n " + lcr.getError() +
"\nCheck configuration and hcfsNamespace value. " +
"Check the documentation about Linking clusters: https://github.com/dstreev/hms-mirror#linking-clusters-storage-layers");
}
CommandReturn rcr = session.processInput(rightlsTestLine);
if (rcr.isError()) {
throw new RuntimeException("Link to LEFT cluster FAILED.\n " + rcr.getError() +
"\nCheck configuration and hcfsNamespace value. " +
"Check the documentation about Linking clusters: https://github.com/dstreev/hms-mirror#linking-clusters-storage-layers");
}
rtn = Boolean.TRUE;
} finally {
if (session != null)
getCliPool().returnSession(session);
}
}


return rtn;
}

Expand Down
57 changes: 57 additions & 0 deletions src/main/java/com/cloudera/utils/hadoop/hms/util/StorageType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.cloudera.utils.hadoop.hms.util;

import java.util.ArrayList;
import java.util.List;

public enum StorageType {
ORC("org.apache.hadoop.hive.ql.io.orc.OrcSerde","org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
TEXTFILE("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe","org.apache.hadoop.mapred.TextInputFormat", "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"),
SEQUENCEFILE("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe","org.apache.hadoop.mapred.SequenceFileInputFormat", "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat"),
PARQUET("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe","org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
JSONFILE("org.apache.hadoop.hive.serde2.JsonSerDe","org.apache.hadoop.mapred.TextInputFormat","org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"),
RCFILE("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe","org.apache.hadoop.hive.ql.io.RCFileInputFormat","org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
AVRO("org.apache.hadoop.hive.serde2.avro.AvroSerDe","org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat", "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
UNKNOWN("unknown", "unknown", "unknown");

private String rowFormatSerde;
private String inputFormat;
private String outputFormat;

StorageType(String rowFormatSerde, String inputFormat, String outputFormat) {
this.rowFormatSerde = rowFormatSerde;
this.inputFormat = inputFormat;
this.outputFormat = outputFormat;
}

public static StorageType from(String rowFormatSerde, String inputFormat) {
StorageType rtn = UNKNOWN;
// Remove Quotes if they exists.
List<StorageType> storageTypeList = new ArrayList<StorageType>();

String inputFormatLcl = inputFormat;
if (inputFormatLcl.startsWith("'")) {
inputFormatLcl = inputFormat.substring(1, inputFormat.length() - 1);
}
String rowFormatSerdeLcl = rowFormatSerde;
if (rowFormatSerdeLcl.startsWith("'")) {
rowFormatSerdeLcl = rowFormatSerde.substring(1, rowFormatSerde.length() - 1);
}
if (rowFormatSerdeLcl != null) {
for (StorageType storageType : StorageType.values()) {
if (storageType.rowFormatSerde.equals(rowFormatSerdeLcl)) {
storageTypeList.add(storageType);
// rtn = storageType;
// break;
}
}
}

for (StorageType storageType : storageTypeList) {
if (storageType.inputFormat.equals(inputFormatLcl)) {
rtn = storageType;
break;
}
}
return rtn;
}
}
14 changes: 14 additions & 0 deletions src/main/java/com/cloudera/utils/hadoop/hms/util/TableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,17 @@ public static void upsertTblProperty(String key, String value, List<String> tabl
}
}

public static StorageType getStorageType(List<String> tblDef) {
// String rtn = null;
int tpIdx = tblDef.indexOf("ROW FORMAT SERDE");
String rowformatSerde = tblDef.get(tpIdx+1);
tpIdx = tblDef.indexOf("STORED AS INPUTFORMAT");
String inputFormat = tblDef.get(tpIdx+1);
StorageType storageType = StorageType.from(rowformatSerde, inputFormat);

return storageType;
}

public static String getTblProperty(String key, EnvironmentTable environmentTable) {
return getTblProperty(key, environmentTable.getDefinition());
}
Expand All @@ -817,6 +828,9 @@ public static String getTblProperty(String key, List<String> tblDef) {
break;
}
}
// Remove Comma, if present.
if (rtn != null && rtn.endsWith(","))
rtn = rtn.substring(0, rtn.length()-1);
return rtn;
}

Expand Down

0 comments on commit 8b261fc

Please sign in to comment.