Using Hive Warehouse Connector with Oozie Spark Action

You can use Hive Warehouse Connector (HWC) with Oozie Spark action by updating job.properties file or action-level configurations.

When you update the job.properties file, Hive Jars coming from Oozie’s Spark ShareLib will be ignored and only the HWC jar is used.

Steps

  1. Create a new ShareLib using a different name, such as hwc.
  2. Place the HWC JAR onto the new ShareLib. For information about placing HWC JARs in the new ShareLib, see the Appendix - Creating a new ‘hwc’ ShareLib section below.
  3. Execute a ShareLib update.
  4. When executing a Spark action using the HWC include the following properties in the job.properties file:
    oozie.action.sharelib.for.spark=spark,hwc

If you have a workflow which contains an action where you would like to use HWC and another action where you do not want to use HWC, you can achieve the same by specifying the ShareLib properties at the action level.

Example

<spark xmlns="uri:oozie:spark-action:1.0">
          ...
          <configuration>
          <property xmlns="">
          <name>oozie.action.sharelib.for.spark</name>
          <value>spark,hwc</value>
          </property>
          </configuration>
          ...
          </spark>

Appendix - Creating a new ‘hwc’ ShareLib

The oozie admin commands have to be executed by the oozie user.

  1. Kinit as oozie.
  2. Check the current available ShareLibs:
    oozie admin -shareliblist -oozie {url}
  3. Create the folder for it on HDFS:
    hdfs dfs -mkdir /user/oozie/share/lib/lib_{latestTimestamp}/hwc
  4. Add the JAR files to it from the /opt/cloudera/parcels/CDH/jars directory:
    • hive-warehouse-connector-assembly-1.0.0.***VERSION NUMBER***-XXX.jar
    • hive-service-3.1.3000.***VERSION NUMBER***-XXX.jar
    • hive-jdbc-3.1.3000.***VERSION NUMBER***-XXX.jar
    • hive-jdbc-handler-3.1.3000.***VERSION NUMBER***-XXX.jar
    • hive-impala-3.1.3000.***VERSION NUMBER***-XXX.jar
    • impala-bridge-***VERSION NUMBER***-XXX.jar
    • impala-data-source-api-***VERSION NUMBER***-XXX.jar
    • impala-frontend-***VERSION NUMBER***-XXX.jar
    • impala-minimal-s3a-aws-sdk-***VERSION NUMBER***-XXX.jar
    • spark-sql-kafka-0-10_2.11-***VERSION NUMBER***-XXX.jar
  5. Update the ShareLib property:
    oozie admin -sharelibupdate -oozie {url}
  6. List the ShareLibs again to check if hwc is present:
    oozie admin -shareliblist -oozie {url}

Example for using HWC with Oozie Spark action

Understand how you can use the Hive Warehouse Connector (HWC) with Oozie Spark actions through an example that creates an application to read tables from Hive using HWC and display its contents. You can do it either by using a JAR application or by using a Python application.

Using application JAR

This example provides detailed information about the job.properties file, workflow.xml file, and application logic required for this task, and lists the necessary information required for using HWC in Oozie Spark action when you build an application JAR.

Example application logic
You can package the following Scala application logic into a JAR by using either Maven or SBT command line utility with the compatible CDP versions. You can call it the org.example package. The following application logic requires only two dependencies - spark-sql and HWC.
import com.hortonworks.hwc.HiveWarehouseSession
import org.apache.spark.sql.SparkSession

object ExampleRun {
  def main(args: Array[String]): Unit = {
    println("Using Hive Warehouse connector")
    //Create a Spark session
    val spark = SparkSession.builder().enableHiveSupport().getOrCreate() 
    //Create a HWC session using the Spark Session
    val hive = HiveWarehouseSession.session(spark).build()
    println(args(0)) // Print the input
    //Query the string provided in the arguments using hive.sql()
    hive.sql(“SELECT * FROM ” + args(0)).show
    //Close the Spark session
    spark.close()
  }
}
  • Maven method

    Add the following dependencies when you build an application JAR by using Maven:

    <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.hortonworks.hive</groupId>
            <artifactId>hive-warehouse-connector_2.11</artifactId>
            <version>${hwc.version}</version>
        </dependency>
    Create a JAR using the following command:
    mvn clean package -Dspark.version=<CDP Spark version> -Dhwc.version=<CDP HWC Version>
    For example,
    mvn clean package -Dspark.version=2.4.7.7.1.7.61-1 -Dhwc.version=1.0.0.7.1.7.61-1
  • SBT method

    Add the following library dependencies when you build an application JAR by using SBT:

    val sparkVersion = sys.props.getOrElse("spark.version", "<CDP Spark version>")
    val hwcVersion = sys.props.getOrElse("hwc.version", "<CDP HWC Version>")
    
    libraryDependencies ++= Seq(
      "com.hortonworks.hive" % "hive-warehouse-connector_2.11" % hwcVersion % "provided",
      "org.apache.spark" %% "spark-sql" % sparkVersion % "provided" force()
    ) 
    Create a JAR file using the following command:
    sbt clean compile assembly -Dspark.version=<CDP Spark version> -Dhwc.version=<CDP HWC Version>
    For example,
    sbt clean compile assembly -Dspark.version=2.4.7.7.1.7.61-1 -Dhwc.version=1.0.0.7.1.7.61-1
Save these JAR files in HDFS in a specific location so that it can be used later in the job.properties file. The class name here is org.example.ExampleRun which you will use later while specifying the job.
Example job.properties file
HCAT_METASTORE_URI=thrift://myhost-1.myhost.example.site:9083
ROOT_LOGGER_LEVEL=INFO
HCAT_PRINCIPAL=hive/_HOST@EXAMPLE.COM
oozie.action.sharelib.for.spark=spark,hwc
MASTER=yarn
JDBC_PRINCIPAL=hive/_HOST@EXAMPLE.COM
JDBC_URL=jdbc:hive2://myhost-1.myhost.example.site:10001/default;transportMode=http;httpPath=cliservice;ssl=true;sslTrustStore=/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks;trustStorePassword=update_this_password
oozie.wf.application.path=hdfs:///tmp/workdir
HIVE_TABLE_NAME=sampleTable
JDBC_MODE=JDBC_CLUSTER
APP_NAME=MyApp
MODE=cluster
JAR=hdfs:///tmp/workdir/hwc-examples-1.0.jar
CLASSNAME=org.example.ExampleRun
OOZIE_LAUNCHER_OPTS=”-verbose:class”
SPARK_OPTS=--conf spark.driver.extraJavaOptions='-verbose:class' --conf spark.executor.extraJavaOptions='-verbose:class'
All the values are example values and are indicative of what you need to write in the file.
HCAT_METASTORE_URI represents the Hive metastore URI and HCAT_PRINCIPAL is the configuration required for Kerberos authentication for the Hive metastore. oozie.action.sharelib.for.spark=spark,hwc must be set as it is. MASTER specifies running Spark in the YARN mode. JDBC_PRINCIPAL is required for Kerberos authentication for HiveServer2. JDBC_URL is required to create a connection to Hive Server 2.
If you run into any classpath issues while executing the Oozie job, then you can check the details by using OOZIE_LAUNCHER_OPTS and SPARK_OPTS. These configurations show you what classes are loaded from which JAR files in the Spark job by checking the YARN logs of the Spark job.
Example workflow.xml file
<?xml version="1.0" encoding="utf-8"?>
<workflow-app name="spark-hwc-hive-wf" xmlns="uri:oozie:workflow:1.0">
    <credentials>
        <credential name="hcatauth" type="hcat">
            <property>
                <name>hcat.metastore.uri</name>
                <value>${HCAT_METASTORE_URI}</value>
            </property>
            <property>
                <name>hcat.metastore.principal</name>
                <value>${HCAT_PRINCIPAL}</value>
            </property>
        </credential>
        <credential name="hs2-creds" type="hive2">
            <property>
                <name>hive2.server.principal</name>
                <value>${JDBC_PRINCIPAL}</value>
            </property>
            <property>
                <name>hive2.jdbc.url</name>
                <value>${JDBC_URL}</value>
            </property>
        </credential>
    </credentials>

    <start to="SPARK_HWC_JDBC_READ"/>
    <action name="SPARK_HWC_JDBC_READ" cred="hs2-creds,hcatauth">
        <spark xmlns="uri:oozie:spark-action:1.0">
            <configuration>
                <property>
                    <name>mapreduce.job.hdfs-servers</name>
                    <value>${firstNotNull(wf:conf('HDFS_SERVERS'),' ')}</value>
               </property>
               <property>
                    <name>oozie.launcher.mapreduce.map.java.opts</name>
		    <value>${firstNotNull(wf:conf('OOZIE_LAUNCHER_OPTS'),' ')}</value>
               </property>
               <property>
                    <name>oozie.action.rootlogger.log.level</name>
                    <value>${firstNotNull(wf:conf('ROOT_LOGGER_LEVEL'),'INFO')}</value>
               </property>
            </configuration>
            <master>${MASTER}</master>
            <mode>${MODE}</mode>
	    <name>${APP_NAME}</name>
	    <class>${CLASSNAME}</class>
	    <jar>${JAR}</jar>
            <spark-opts>--conf spark.sql.hive.hiveserver2.jdbc.url=${JDBC_URL}  --conf spark.sql.extensions="com.hortonworks.spark.sql.rule.Extensions" --conf spark.datasource.hive.warehouse.read.mode=${JDBC_MODE} --conf spark.sql.hive.hiveserver2.jdbc.url.principal=${JDBC_PRINCIPAL} ${firstNotNull(wf:conf('SPARK_OPTS'),' ')}</spark-opts>
            <arg>${HIVE_TABLE_NAME}</arg>
        </spark>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}</message>
    </kill>
    <end name="end"/>
</workflow-app>
Save this workflow.xml file in the directory where you have defined oozie.wf.application.path. The different properties seen in “${}” are properties that are written either in the job.properties file or can be passed in the command line. Notice that the <spark-opts> tag contains the necessary configurations that are required for HWC. The <arg> tag contains the input for the application. The <arg> tag is currently set to a Hive table name which is used by a SELECT statement written in the application code.

Using Python application

This example provides detailed information about the job.properties file, workflow.xml file, and application logic required for this task, and lists the necessary information required for using HWC in Oozie Spark action when a Python application is built.
Example application logic
import sys
from pyspark.sql import SparkSession
from pyspark_llap import HiveWarehouseSession

spark = SparkSession.builder.enableHiveSupport().getOrCreate()
hwc = HiveWarehouseSession.session(spark).build()
tableName = sys.argv[1]

print "=======Reading hive table - " + tableName + " via HWC======="
# Read via HWC
hwc.sql("select * from " + tableName).show()

hwc.close()
spark.stop()
You are using the pyspark module and HWC specific pyspark_llap module for executing the Python program. The pyspark_llap module is derived from the HWC artifacts given in the CDP builds.
Example job.properties file
HCAT_METASTORE_URI=thrift://myhos
t-1.myhost.example.site:9083
ROOT_LOGGER_LEVEL=INFO
HCAT_PRINCIPAL=hive/_HOST@EXAMPLE.COM
oozie.action.sharelib.for.spark=spark,hwc
MASTER=yarn
JDBC_PRINCIPAL=hive/_HOST@EXAMPLE.COM
JDBC_URL=jdbc:hive2://myhost-1.myhost.example.site:10001/default;transportMode=http;httpPath=cliservice;ssl=true;sslTrustStore=/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks;trustStorePassword=update_this_password
oozie.wf.application.path=hdfs:///tmp/workdir
HIVE_TABLE_NAME=sampleTable
JDBC_MODE=JDBC_CLUSTER
APP_NAME=MyApp
MODE=cluster
PY_FILE=hdfs:///tmp/workdir/testhwcread.py
PYSPARK_HWC_ZIP=/opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/pyspark_hwc-1.0.0.7.1.7.61-1.zip
OOZIE_LAUNCHER_OPTS=”-verbose:class”
SPARK_OPTS=--conf spark.driver.extraJavaOptions='-verbose:class' --conf spark.executor.extraJavaOptions='-verbose:class'
All the values are example values and are indicative of what you need to write in the file.
HCAT_METASTORE_URI represents the hive metastore URI and HCAT_PRINCIPAL is the configuration required for Kerberos authentication for the Hive metastore. oozie.action.sharelib.for.spark=spark,hwc must be set as it is. MASTER specifies running Spark in the YARN mode. JDBC_PRINCIPAL is required for Kerberos authentication for HiveServer2. JDBC_URL is required to create a connection to Hive Server 2.
If you run into any classpath issues while executing the Oozie job, then you can check the details by using OOZIE_LAUNCHER_OPTS and SPARK_OPTS. These configurations show you what classes are loaded from which JAR files in the Spark job by checking the YARN logs of the Spark job.
Example workflow.xml file
<?xml version="1.0" encoding="utf-8"?>
<workflow-app name="spark-hwc-hive-wf" xmlns="uri:oozie:workflow:1.0">
    <credentials>
        <credential name="hcatauth" type="hcat">
            <property>
                <name>hcat.metastore.uri</name>
                <value>${HCAT_METASTORE_URI}</value>
            </property>
            <property>
                <name>hcat.metastore.principal</name>
                <value>${HCAT_PRINCIPAL}</value>
            </property>
        </credential>
        <credential name="hs2-creds" type="hive2">
            <property>
                <name>hive2.server.principal</name>
                <value>${JDBC_PRINCIPAL}</value>
            </property>
            <property>
                <name>hive2.jdbc.url</name>
                <value>${JDBC_URL}</value>
            </property>
        </credential>
    </credentials>

    <start to="SPARK_HWC_JDBC_READ"/>
    <action name="SPARK_HWC_JDBC_READ" cred="hs2-creds,hcatauth">
        <spark xmlns="uri:oozie:spark-action:1.0">
            <configuration>
                <property>
                    <name>mapreduce.job.hdfs-servers</name>
                    <value>${firstNotNull(wf:conf('HDFS_SERVERS'),' ')}</value>
               </property>
               <property>
                    <name>oozie.launcher.mapreduce.map.java.opts</name>
                    <value>${firstNotNull(wf:conf('OOZIE_LAUNCHER_OPTS'),' ')}</value>
               </property>
               <property>
                    <name>oozie.action.rootlogger.log.level</name>
                    <value>${firstNotNull(wf:conf('ROOT_LOGGER_LEVEL'),'INFO')}</value>
               </property>
            </configuration>
            <master>${MASTER}</master>
            <mode>${MODE}</mode>
	    <name>${APP_NAME}</name>
	    <jar>${PY_FILE}</jar>
            <spark-opts>--conf spark.sql.hive.hiveserver2.jdbc.url=${JDBC_URL}  --conf spark.sql.extensions="com.hortonworks.spark.sql.rule.Extensions" --conf spark.datasource.hive.warehouse.read.mode=${JDBC_MODE} --conf spark.sql.hive.hiveserver2.jdbc.url.principal=${JDBC_PRINCIPAL} --conf spark.submit.pyFiles=${PYSPARK_HWC_ZIP} ${firstNotNull(wf:conf('SPARK_OPTS'),' ')}</spark-opts>
            <arg>${HIVE_TABLE_NAME}</arg>
        </spark>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}</message>
    </kill>
    <end name="end"/>
</workflow-app>
Save this workflow.xml file in the directory where you have defined oozie.wf.application.path. The different properties seen in “${}” are properties that are written either in the job.properties file or can be passed in the command line. The <spark-opts> tag contains the necessary configurations that are required for HWC. The <arg> tag contains the input for the application that needs to be run. It is currently set to a Hive table name which is used for executing a SELECT query on the same table.