Using Hive Warehouse Connector with Oozie Spark Action
You can use Hive Warehouse Connector (HWC) with Oozie Spark action by updating job.properties file or action-level configurations.
Steps
- Create a new ShareLib using a different name, such as hwc.
- Place the HWC JAR onto the new ShareLib. For information about placing HWC JARs in the new ShareLib, see the Appendix - Creating a new ‘hwc’ ShareLib section below.
- Execute a ShareLib update.
- When executing a Spark action using the HWC include the following properties in
the job.properties
file:
oozie.action.sharelib.for.spark=spark,hwc
You can update the action-level configurations to execute Hive commands using both HWC and non-HWC. If you have a workflow which contains an action where you would like to use HWC and another action where you do not want to use HWC, you can achieve the same by specifying the ShareLib properties at the action level.
Example
<spark xmlns="uri:oozie:spark-action:1.0">
...
<configuration>
<property xmlns="">
<name>oozie.action.sharelib.for.spark</name>
<value>spark,hwc</value>
</property>
</configuration>
...
</spark>
Appendix - Creating a new ‘hwc’ ShareLib
The oozie admin commands have to be executed by the oozie user.
- Kinit as oozie.
- Check the current available
ShareLibs:
oozie admin -shareliblist -oozie {url}
- Create the folder for it on HDFS:
hdfs dfs -mkdir /user/oozie/share/lib/lib_{latestTimestamp}/hwc
- Add the JAR files to it from the /opt/cloudera/parcels/CDH/jars
directory:
- hive-warehouse-connector-assembly-1.0.0.***VERSION NUMBER***-XXX.jar
- hive-jdbc-3.1.3000.***VERSION NUMBER***-XXX.jar
- hive-jdbc-handler-3.1.3000.***VERSION NUMBER***-XXX.jar
- hive-service-3.1.3000.***VERSION NUMBER***-XXX.jar
- spark-sql-kafka-0-10_2.11-***VERSION NUMBER***-XXX.jar
- Update the ShareLib
property:
oozie admin -sharelibupdate -oozie {url}
- List the ShareLibs again to check if hwc is
present:
oozie admin -shareliblist -oozie {url}
Example for using HWC with Oozie Spark action
Understand how you can use the Hive Warehouse Connector (HWC) with Oozie Spark actions through an example that creates an application to read tables from Hive using HWC and display its contents. You can do it either by using a JAR application or by using a Python application.
Using application JAR
This example provides detailed information about the job.properties
file,
workflow.xml
file, and application logic required for this task, and
lists the necessary information required for using HWC in Oozie Spark action when you build
an application JAR.
- Example application logic
- You can package the following Scala application logic into a JAR by using either
Maven or SBT command line utility with the compatible CDP versions. You can call it
the
org.example
package. The following application logic requires only two dependencies - spark-sql and HWC.import com.hortonworks.hwc.HiveWarehouseSession import org.apache.spark.sql.SparkSession object ExampleRun { def main(args: Array[String]): Unit = { println("Using Hive Warehouse connector") //Create a Spark session val spark = SparkSession.builder().enableHiveSupport().getOrCreate() //Create a HWC session using the Spark Session val hive = HiveWarehouseSession.session(spark).build() println(args(0)) // Print the input //Query the string provided in the arguments using hive.sql() hive.sql(“SELECT * FROM ” + args(0)).show //Close the Spark session spark.close() } }
- Maven method
Add the following dependencies when you build an application JAR by using Maven:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>com.hortonworks.hive</groupId> <artifactId>hive-warehouse-connector_2.11</artifactId> <version>${hwc.version}</version> </dependency>
Create a JAR using the following command:mvn clean package -Dspark.version=<CDP Spark version> -Dhwc.version=<CDP HWC Version>
For example,mvn clean package -Dspark.version=2.4.7.7.1.7.61-1 -Dhwc.version=1.0.0.7.1.7.61-1
- SBT method
Add the following library dependencies when you build an application JAR by using SBT:
val sparkVersion = sys.props.getOrElse("spark.version", "<CDP Spark version>") val hwcVersion = sys.props.getOrElse("hwc.version", "<CDP HWC Version>") libraryDependencies ++= Seq( "com.hortonworks.hive" % "hive-warehouse-connector_2.11" % hwcVersion % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion % "provided" force() )
Create a JAR file using the following command:sbt clean compile assembly -Dspark.version=<CDP Spark version> -Dhwc.version=<CDP HWC Version>
For example,sbt clean compile assembly -Dspark.version=2.4.7.7.1.7.61-1 -Dhwc.version=1.0.0.7.1.7.61-1
- Maven method
- Example job.properties file
-
HCAT_METASTORE_URI=thrift://myhost-1.myhost.example.site:9083 ROOT_LOGGER_LEVEL=INFO HCAT_PRINCIPAL=hive/_HOST@EXAMPLE.COM oozie.action.sharelib.for.spark=spark,hwc MASTER=yarn JDBC_PRINCIPAL=hive/_HOST@EXAMPLE.COM JDBC_URL=jdbc:hive2://myhost-1.myhost.example.site:10001/default;transportMode=http;httpPath=cliservice;ssl=true;sslTrustStore=/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks;trustStorePassword=update_this_password oozie.wf.application.path=hdfs:///tmp/workdir HIVE_TABLE_NAME=sampleTable JDBC_MODE=JDBC_CLUSTER APP_NAME=MyApp MODE=cluster JAR=hdfs:///tmp/workdir/hwc-examples-1.0.jar CLASSNAME=org.example.ExampleRun OOZIE_LAUNCHER_OPTS=”-verbose:class” SPARK_OPTS=--conf spark.driver.extraJavaOptions='-verbose:class' --conf spark.executor.extraJavaOptions='-verbose:class'
- Example workflow.xml file
-
<?xml version="1.0" encoding="utf-8"?> <workflow-app name="spark-hwc-hive-wf" xmlns="uri:oozie:workflow:1.0"> <credentials> <credential name="hcatauth" type="hcat"> <property> <name>hcat.metastore.uri</name> <value>${HCAT_METASTORE_URI}</value> </property> <property> <name>hcat.metastore.principal</name> <value>${HCAT_PRINCIPAL}</value> </property> </credential> <credential name="hs2-creds" type="hive2"> <property> <name>hive2.server.principal</name> <value>${JDBC_PRINCIPAL}</value> </property> <property> <name>hive2.jdbc.url</name> <value>${JDBC_URL}</value> </property> </credential> </credentials> <start to="SPARK_HWC_JDBC_READ"/> <action name="SPARK_HWC_JDBC_READ" cred="hs2-creds,hcatauth"> <spark xmlns="uri:oozie:spark-action:1.0"> <configuration> <property> <name>mapreduce.job.hdfs-servers</name> <value>${firstNotNull(wf:conf('HDFS_SERVERS'),' ')}</value> </property> <property> <name>oozie.launcher.mapreduce.map.java.opts</name> <value>${firstNotNull(wf:conf('OOZIE_LAUNCHER_OPTS'),' ')}</value> </property> <property> <name>oozie.action.rootlogger.log.level</name> <value>${firstNotNull(wf:conf('ROOT_LOGGER_LEVEL'),'INFO')}</value> </property> </configuration> <master>${MASTER}</master> <mode>${MODE}</mode> <name>${APP_NAME}</name> <class>${CLASSNAME}</class> <jar>${JAR}</jar> <spark-opts>--conf spark.sql.hive.hiveserver2.jdbc.url=${JDBC_URL} --conf spark.sql.extensions="com.hortonworks.spark.sql.rule.Extensions" --conf spark.datasource.hive.warehouse.read.mode=${JDBC_MODE} --conf spark.sql.hive.hiveserver2.jdbc.url.principal=${JDBC_PRINCIPAL} ${firstNotNull(wf:conf('SPARK_OPTS'),' ')}</spark-opts> <arg>${HIVE_TABLE_NAME}</arg> </spark> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}</message> </kill> <end name="end"/> </workflow-app>
Using Python application
job.properties
file,
workflow.xml
file, and application logic required for this task, and
lists the necessary information required for using HWC in Oozie Spark action when a Python
application is built.- Example application logic
-
import sys from pyspark.sql import SparkSession from pyspark_llap import HiveWarehouseSession spark = SparkSession.builder.enableHiveSupport().getOrCreate() hwc = HiveWarehouseSession.session(spark).build() tableName = sys.argv[1] print "=======Reading hive table - " + tableName + " via HWC=======" # Read via HWC hwc.sql("select * from " + tableName).show() hwc.close() spark.stop()
- Example job.properties file
-
HCAT_METASTORE_URI=thrift://myhos t-1.myhost.example.site:9083 ROOT_LOGGER_LEVEL=INFO HCAT_PRINCIPAL=hive/_HOST@EXAMPLE.COM oozie.action.sharelib.for.spark=spark,hwc MASTER=yarn JDBC_PRINCIPAL=hive/_HOST@EXAMPLE.COM JDBC_URL=jdbc:hive2://myhost-1.myhost.example.site:10001/default;transportMode=http;httpPath=cliservice;ssl=true;sslTrustStore=/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks;trustStorePassword=update_this_password oozie.wf.application.path=hdfs:///tmp/workdir HIVE_TABLE_NAME=sampleTable JDBC_MODE=JDBC_CLUSTER APP_NAME=MyApp MODE=cluster PY_FILE=hdfs:///tmp/workdir/testhwcread.py PYSPARK_HWC_ZIP=/opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/pyspark_hwc-1.0.0.7.1.7.61-1.zip OOZIE_LAUNCHER_OPTS=”-verbose:class” SPARK_OPTS=--conf spark.driver.extraJavaOptions='-verbose:class' --conf spark.executor.extraJavaOptions='-verbose:class'
- Example workflow.xml file
-
<?xml version="1.0" encoding="utf-8"?> <workflow-app name="spark-hwc-hive-wf" xmlns="uri:oozie:workflow:1.0"> <credentials> <credential name="hcatauth" type="hcat"> <property> <name>hcat.metastore.uri</name> <value>${HCAT_METASTORE_URI}</value> </property> <property> <name>hcat.metastore.principal</name> <value>${HCAT_PRINCIPAL}</value> </property> </credential> <credential name="hs2-creds" type="hive2"> <property> <name>hive2.server.principal</name> <value>${JDBC_PRINCIPAL}</value> </property> <property> <name>hive2.jdbc.url</name> <value>${JDBC_URL}</value> </property> </credential> </credentials> <start to="SPARK_HWC_JDBC_READ"/> <action name="SPARK_HWC_JDBC_READ" cred="hs2-creds,hcatauth"> <spark xmlns="uri:oozie:spark-action:1.0"> <configuration> <property> <name>mapreduce.job.hdfs-servers</name> <value>${firstNotNull(wf:conf('HDFS_SERVERS'),' ')}</value> </property> <property> <name>oozie.launcher.mapreduce.map.java.opts</name> <value>${firstNotNull(wf:conf('OOZIE_LAUNCHER_OPTS'),' ')}</value> </property> <property> <name>oozie.action.rootlogger.log.level</name> <value>${firstNotNull(wf:conf('ROOT_LOGGER_LEVEL'),'INFO')}</value> </property> </configuration> <master>${MASTER}</master> <mode>${MODE}</mode> <name>${APP_NAME}</name> <jar>${PY_FILE}</jar> <spark-opts>--conf spark.sql.hive.hiveserver2.jdbc.url=${JDBC_URL} --conf spark.sql.extensions="com.hortonworks.spark.sql.rule.Extensions" --conf spark.datasource.hive.warehouse.read.mode=${JDBC_MODE} --conf spark.sql.hive.hiveserver2.jdbc.url.principal=${JDBC_PRINCIPAL} --conf spark.submit.pyFiles=${PYSPARK_HWC_ZIP} ${firstNotNull(wf:conf('SPARK_OPTS'),' ')}</spark-opts> <arg>${HIVE_TABLE_NAME}</arg> </spark> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}</message> </kill> <end name="end"/> </workflow-app>