Getting started with HWC
You need to know how to use the Hive Warehouse Connector (HWC) with different programming languages and build systems. You find out where HWC binaries are located in CDP parcels and how a Spark application consumes the binaries.
The examples in this topic assume that you are running CDP version 7.2.9.0-203. Substitute your actual CDP version when you copy examples.
Cloudera artifactory and HWC dependency
To pull the HWC dependency corresponding to a release, use the following artifactory:
https://repository.cloudera.com/artifactory/cloudera-repos
Use with Maven
To use HWC with maven, define the cloudera artifactory as a repository.
<repository> <id>cloudera</id> <name>cloudera</name> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> </repository>
In the pom.xml of the project, add the dependency as shown in the following example:
<dependency> <groupId>com.hortonworks.hive</groupId> <artifactId>hive-warehouse-connector_2.11</artifactId> <version>1.0.0.7.2.9.0-203</version> <scope>provided</scope> </dependency>
Use with Sbt
Add the Cloudera and Shibboleth repositories, and the HWC dependency to the build sbt as follows:
resolvers += "Cloudera repo" at "https://repository.cloudera.com/artifactory/cloudera-repos", resolvers += "opensaml Repository" at "https://build.shibboleth.net/nexus/content/repositories/releases", libraryDependencies += "com.hortonworks.hive" % "hive-warehouse-connector_2.11" % "1.0.0.7.2.9.0-203" % "provided",
Dependency scope
Generally, you add HWC dependencies in provided scope unless there is a specific requirement to do otherwise. While running spark application, you can specify the HWC jar present in your distribution using the --jars option to spark-submit or spark-shell.
HWC Binaries in CDP
HWC binaries are located in /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/. This directory contains HWC jar, a python zip, and the R package. Use these binaries to launch Spark applications in Scala, Java, Python, or R.
The following files are in /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/.
-
hive-warehouse-connector-assembly-1.0.0.7.2.9.0-203.jar
-
pyspark_hwc-1.0.0.7.2.9.0-203.zip
-
SparklyrHWC-1.0.0.7.2.9.0-203
Working with different languages
You use HWC APIs to perform basic read and write operations. You need to understand how to use HWC APIs with different languages. The following examples show basic capabilities that are covered in detail later in this documentation.
Use with Scala
import com.hortonworks.hwc.HiveWarehouseSession import org.apache.spark.sql.{SaveMode, SparkSession} object HWCApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("HWCApp").enableHiveSupport.getOrCreate val hwc = HiveWarehouseSession.session(spark).build // create sample data val tvSeries = createSampleDataDf(spark) val tableName = "tv_series" hwc.dropTable(tableName, true, true) println(s"=======Writing to hive table - $tableName via HWC=======") // write to hive table via HWC tvSeries.write.format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR) .option("table", tableName) .mode(SaveMode.Append).save println(s"=======Reading hive table $tableName via HWC=======") // Read via HWC hwc.sql(s"select * from $tableName").show(truncate = false) hwc.close() spark.stop } private def createSampleDataDf(spark: SparkSession) = { spark.sql("drop table if exists tv_series_dataset") spark.sql("create table tv_series_dataset(id int, name string, genres string, rating double) using orc") spark.sql("insert into tv_series_dataset values " + "(1, 'Chernobyl', 'Drama|History|Tragedy|Science', 9.4), " + "(2, 'Westworld', 'Sci-fi', 8.6), (3, 'Sense8', 'Sci-fi', 8.3), " + "(4, 'Person of Interest', 'Drama|Sci-fi', 8.4), " + "(5, 'Its okay to not be okay', 'Drama', 8.7), " + "(6, 'Daredevil', 'Action|Sci-fi', 8.6), " + "(7, 'Money Heist', 'Drama|Thriller', 8.3), " + "(8, 'Breaking Bad', 'Crime|Drama', 9.5)") spark.sql("select * from tv_series_dataset") }
Use with Java
The following Java code is equivalent to the scala code above.
import com.hortonworks.hwc.HiveWarehouseSession import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; public class HWCApp { public static void main(String[] args) { SparkSession spark = SparkSession.builder().appName("HWCApp").enableHiveSupport().getOrCreate(); // HiveWarehouseSession creation HiveWarehouseSession hwc = HiveWarehouseBuilder.session(spark).build(); // create sample data Dataset<Row> tvSeries = createSampleDataDf(spark); String tableName = "tv_series"; hwc.dropTable(tableName, true, true); System.out.println("=======Writing to hive table - " + tableName + " via HWC======="); // write data to hive table via HWC tvSeries.write().format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR) .option("table", tableName) .mode(SaveMode.Append).save(); System.out.println("=======Reading hive table - " + tableName + " via HWC======="); // read hive table as dataframe using HWC hwc.sql("select * from " + tableName).show(false); hwc.close(); spark.stop(); } private static Dataset<Row> createSampleDataDf(SparkSession spark) { spark.sql("drop table if exists tv_series_dataset"); spark.sql("create table tv_series_dataset(id int, name string, genres string, rating double) using orc"); spark.sql("insert into tv_series_dataset values " + "(1, 'Chernobyl', 'Drama|History|Tragedy|Science', 9.4), " + "(2, 'Westworld', 'Sci-fi', 8.6), (3, 'Sense8', 'Sci-fi', 8.3), " + "(4, 'Person of Interest', 'Drama|Sci-fi', 8.4), " + "(5, 'Its okay to not be okay', 'Drama', 8.7), " + "(6, 'Daredevil', 'Action|Sci-fi', 8.6), " + "(7, 'Money Heist', 'Drama|Thriller', 8.3), " + "(8, 'Breaking Bad', 'Crime|Drama', 9.5)"); return spark.sql("select * from tv_series_dataset"); } }
Launching a Java or Scala app
After packaging the app in a jar, launch the app using standard Spark syntax for launching applications. Provide HWC jar from the distribution. The Spark application can be launched as follows:
spark-submit --jars /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.7.2*.jar \ --class com.cloudera.HWCApp \ ...More spark/HWC confs... ...More spark/HWC confs... /path-to-jar/hwc-app.jar
Use with Python
from pyspark.sql import SparkSession from pyspark_llap import HiveWarehouseSession spark = SparkSession.builder.enableHiveSupport().appName("hwc-app").getOrCreate() hwc = HiveWarehouseSession.session(spark).build() tableName = "tv_series" hwc.dropTable(tableName, True, True) tvSeries = spark.createDataFrame([ (1, "Chernobyl", "Drama|History|Tragedy|Science", 9.4), (2, "Westworld", "Sci-fi", 8.6), (3, "Sense8", "Sci-fi", 8.3), (4, "Person of Interest", "Drama|Sci-fi", 8.4), (5, "It's okay to not be okay", "Drama", 8.7), (6, "Daredevil", "Action|Sci-fi", 8.6), (7, "Money Heist", "Drama|Thriller", 8.3), (8, "Breaking Bad", "Crime|Drama", 9.5) ], ["id", "name", "genres", "rating"]) print("=======Writing to hive table - " + tableName + " via HWC=======") # write to hive table via HWC tvSeries.write.format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR).option("table", tableName).mode("append").save() print("=======Reading hive table - " + tableName + " via HWC=======") # Read via HWC hwc.sql("select * from " + tableName).show() hwc.close() spark.stop()
Launching a Python app
After getting the python code ready, launch it using spark-submit
. Provide the HWC jar and HWC python zip as follows:
spark-submit --jars /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.7.2*.jar \ --py-files /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/pyspark_hwc-1.0.0.7.2.*.zip \ ...More spark/HWC confs... ...More spark/HWC confs... /path-to-python-app/hwc-app.py
Use with Sparklyr
You can access Hive tables through R by loading the sparklyr library along with the SparklyrHWC package available in /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/, which can be used to trigger HWC API’s from R.
library(sparklyr) library(SparklyrHWC, lib.loc = c(file.path(“<path to SparklyrHWC>"))) #Set env variables Sys.setenv(SPARK_HOME = "/opt/cloudera/parcels/CDH/lib/spark/") Sys.setenv(HADOOP_HOME = "/opt/cloudera/parcels/CDH/lib/hadoop") #Configurations needed to use spark-acid and related configurations. config <- spark_config() config$spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://<url>:10000/default" config$spark.datasource.hive.warehouse.user.name="hive" config$spark.hadoop.hive.metastore.uris="thrift://<url>:9083" config$spark.sql.extensions="com.hortonworks.spark.sql.rule.Extensions" config$spark.kryo.registrator="com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator" config$spark.datasource.hive.warehouse.read.mode="DIRECT_READER_V2" #Build HWC session hs <- build(HiveWarehouseBuilder.session(sc)) #Use database sparklyr::sdf_sql(sc,"use test") #Reading a managed table using spark acid direct-reader intDf <- sparklyr::spark_read_table(sc, 'emp_hwc') #Converts SparkDataframe to R dataframe sparklyr::sdf_collect(intDf1) #Writing into a managed table #Read first table intDf <- sparklyr::spark_read_table(sc, 'emp_hwc') #read second table intDf1 <- sparklyr::spark_read_table(sc, 'emp_overwrite') #Commit transaction if read using spark-acid commitTxn(hs) #Append the second table, to the first. SparklyrHWC::spark_write_table('emp_hwc',intDf1,'append') #Overwrite the first table with the second table. SparklyrHWC::spark_write_table('emp_hwc',intDf1,'overwrite') #Using HWC Api’s #create a table from existing table SparklyrHWC::executeUpdate(hs,"create table hwc1 as select * from 'emp_hwc'") #Execute query hwcDf <- SparklyrHWC::executeQuery(hs, "select * from hwc1") #convert into R dataframe. hwcSdf <- sparklyr::sdf_copy_to(sc, hwcDf)