A library to read/write DataFrames and Streaming DataFrames to/from Apache Hive™ using LLAP. With Apache Ranger™, this library provides row/column level fine-grained access controls.
Note that for open-source usage, master branch requires Hive 3.1.0 which is a forthcoming release. For configuration of prior versions, please see prior documentation.
branch | Spark | Hive | HDP |
---|---|---|---|
master (Summer 2018) | 2.3.1 | 3.1.0 | 3.0.0 (GA) |
branch-2.3 | 2.3.0 | 2.1.0 | 2.6.x (TP) |
branch-2.2 | 2.2.0 | 2.1.0 | 2.6.x (TP) |
branch-2.1 | 2.1.1 | 2.1.0 | 2.6.x (TP) |
branch-1.6 | 1.6.3 | 2.1.0 | 2.5.x (TP) |
Ensure the following Spark properties are set via spark-defaults.conf
or
using --conf
or through other Spark configuration.
Property | Description | Example |
---|---|---|
spark.sql.hive.hiveserver2.jdbc.url | ThriftJDBC URL for LLAP HiveServer2 | jdbc:hive2://localhost:10000 |
spark.datasource.hive.warehouse.load.staging.dir | Temp directory for batch writes to Hive | /tmp |
spark.hadoop.hive.llap.daemon.service.hosts | App name for LLAP service | @llap0 |
spark.hadoop.hive.zookeeper.quorum | Zookeeper hosts used by LLAP | host1:2181;host2:2181;host3:2181 |
For use in Spark client-mode on kerberized Yarn cluster, set:
Property | Description | Example |
---|---|---|
spark.sql.hive.hiveserver2.jdbc.url.principal | Set equal to hive.server2.authentication.kerberos.principal | hive/[email protected] |
For use in Spark cluster-mode on kerberized Yarn cluster, set:
Property | Description | Example |
---|---|---|
spark.security.credentials.hiveserver2.enabled | Use Spark ServiceCredentialProvider | true |
Spark Type | Hive Type |
---|---|
ByteType | TinyInt |
ShortType | SmallInt |
IntegerType | Integer |
LongType | BigInt |
FloatType | Float |
DoubleType | Double |
DecimalType | Decimal |
StringType* | String, Char, Varchar* |
BinaryType | Binary |
BooleanType | Boolean |
TimestampType* | Timestamp* |
DateType | Date |
ArrayType | Array |
StructType | Struct |
- A Hive String, Char, Varchar column will be converted into a Spark StringType column.
- When a Spark StringType column has maxLength metadata, it will be converted into a Hive Varchar column. Otherwise, it will be converted into a Hive String column.
- A Hive Timestamp column will lose sub-microsecond precision when it is converted into a Spark TimestampType column. Because a Spark TimestampType column is microsecond precision, while a Hive Timestamp column is nanosecond precision.
Spark Type | Hive Type | Plan |
---|---|---|
CalendarIntervalType | Interval | Planned for future support |
MapType | Map | Planned for future support |
N/A | Union | Not supported in Spark |
NullType | N/A | Not supported in Hive |
Support is currently available for spark-shell
, pyspark
, and spark-submit
.
-
Locate the
hive-warehouse-connector-assembly
jar. If building from source, this will be located within thetarget/scala-2.11
folder. If using pre-built distro, follow instructions from your distro provider, e.g. on HDP the jar would be located in/usr/hdp/current/hive-warehouse-connector/
-
Use
--jars
to add the connector jar to app submission, e.g.
spark-shell --jars /usr/hdp/current/hive-warehouse-connector/hive-warehouse-connector-assembly-1.0.0.jar
- Follow the instructions above to add the connector jar to app submission.
- Additionally add the connector's Python package to app submission, e.g.
pyspark --jars /usr/hdp/current/hive-warehouse-connector/hive-warehouse-connector-assembly-1.0.0.jar --py-files /usr/hdp/current/hive-warehouse-connector/pyspark_hwc-1.0.0.zip
HiveWarehouseSession
acts as an API to bridge Spark with HiveServer2.
In your Spark source, create an instance of HiveWarehouseSession
using HiveWarehouseBuilder
- Create HiveWarehouseSession (assuming
spark
is an existingSparkSession
):
val hive = com.hortonworks.spark.sql.hive.llap.HiveWarehouseBuilder.session(spark).build()
- Set the current database for unqualified Hive table references:
hive.setDatabase(<database>)
- Execute catalog operation and return DataFrame, e.g.
hive.execute("describe extended web_sales").show(100, false)
- Show databases:
hive.showDatabases().show(100, false)
- Show tables for current database:
hive.showTables().show(100, false)
- Describe table:
hive.describeTable(<table_name>).show(100, false)
- Create a database:
hive.createDatabase(<database_name>)
- Create ORC table, e.g.:
hive.createTable("web_sales") .ifNotExists() .column("sold_time_sk", "bigint") .column("ws_ship_date_sk", "bigint") .create()
- Drop a database:
hive.dropDatabase(<databaseName>, <ifExists>, <useCascade>)
- Drop a table:
hive.dropTable(<tableName>, <ifExists>, <usePurge>)
- Execute Hive SELECT query and return DataFrame, e.g.
val df = hive.executeQuery("select * from web_sales")
- Reference a Hive table as a DataFrame
val df = hive.table(<tableName>)
- Execute Hive update statement, e.g.
hive.executeUpdate("ALTER TABLE old_name RENAME TO new_name")
- Write a DataFrame to Hive in batch (uses LOAD DATA INTO TABLE), e.g.
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector") .option("table", <tableName>) .save()
- Write a DataFrame to Hive using HiveStreaming, e.g.
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource")
.option("database", <databaseName>)
.option("table", <tableName>)
.option("metastoreUri", <HMS_URI>)
.save()
// To write to static partition
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource")
.option("database", <databaseName>)
.option("table", <tableName>)
.option("partition", <partition>)
.option("metastoreUri", <HMS URI>)
.save()
- Write a Spark Stream to Hive using HiveStreaming, e.g.
stream.writeStream
.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource")
.option("metastoreUri", metastoreUri)
.option("database", "streaming")
.option("table", "web_sales")
.start()
interface HiveWarehouseSession {
//Execute Hive SELECT query and return DataFrame
Dataset<Row> executeQuery(String sql);
//Execute Hive update statement
boolean executeUpdate(String sql);
//Execute Hive catalog-browsing operation and return DataFrame
Dataset<Row> execute(String sql);
//Reference a Hive table as a DataFrame
Dataset<Row> table(String sql);
//Return the SparkSession attached to this HiveWarehouseSession
SparkSession session();
//Set the current database for unqualified Hive table references
void setDatabase(String name);
/**
* Helpers: wrapper functions over execute or executeUpdate
*/
//Helper for show databases
Dataset<Row> showDatabases();
//Helper for show tables
Dataset<Row> showTables();
//Helper for describeTable
Dataset<Row> describeTable(String table);
//Helper for create database
void createDatabase(String database, boolean ifNotExists);
//Helper for create table stored as ORC
CreateTableBuilder createTable(String tableName);
//Helper for drop database
void dropDatabase(String database, boolean ifExists, boolean cascade);
//Helper for drop table
void dropTable(String table, boolean ifExists, boolean purge);
}
Read table data from Hive, transform in Spark, write to new Hive table
val hive = com.hortonworks.spark.sql.hive.llap.HiveWarehouseBuilder.session(spark).build()
hive.setDatabase("tpcds_bin_partitioned_orc_1000")
val df = hive.executeQuery("select * from web_sales")
hive.setDatabase("spark_llap")
val tempTable = "t_" + System.currentTimeMillis()
hive.createTable(tempTable).ifNotExists().column("ws_sold_time_sk", "bigint").column("ws_ship_date_sk", "bigint").create()
df.select("ws_sold_time_sk", "ws_ship_date_sk").filter("ws_sold_time_sk > 80000").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").option("table", tempTable).save()
val df2 = hive.executeQuery("select * from " + tempTable)
df2.show(20)
hive.dropTable(tempTable, true, false)