Read and write Hive tables in R

The Hive Warehouse Connector (HWC) supports reads and writes to Apache Hive managed ACID tables in R. Cloudera provides an R package SparklyrHWC that includes all HWC methods, such as execute and executeQuery, and a spark_write_table method to write to managed tables. The native sparklyr spark_write_table method supports writes to external tables only.

You can configure either one of the following HWC execution modes in R:
  • JDBC mode
    • Suitable for writing production workloads.
    • Not suitable for reading production workloads.
    • Use this mode for reading if latency is not an issue.
  • Spark-ACID mode
    • Suitable for reading production workloads.
    • Does not support writes

You can read Hive managed tables using either JDBC or Spark-ACID mode. The mode you configure affects the background process. You use the same R code regardless of the mode with one exception: You do not need to call the commitTxn(hs) when using JDBC mode.

To write to Hive managed tables, you must connect to HWC in JDBC mode.

You can read and write Hive external tables in R using the sparklyr package. HWC is not required.

In the following procedure, you configure Spark-Acid execution mode to read tables on a production cluster. You use the native sparklyr spark_read_table and spark_load_table to read Hive managed tables in R.

Reading a Hive managed table from R

  1. Configure Spark Acid execution mode in R.
    For example:
    > config <- spark_config()
    > config$sparklyr.jars.default <- "/<path>/hive-warehouse-connector-assembly-<version>.jar"
    > config$spark.sql.extensions="com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension"
    > config$"false"
    > config$spark.sql.hive.hwc.execution.mode="spark"
    > config$spark.sql.hive.hiveserv
    > config$"hive"
    > config$spark.hadoop.hive.metastore.uris="thrift://<host>:9083"
    > config$spark.sql.extensions="com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension"
    > sc <- spark_connect(master = "yarn", config = config)
  2. Read Hive using sparklyr methods spark_read_table() or spark_load_table().
    For example:
    > library(sparklyr)  // Load sparklyr library
    > Sys.setenv(HADOOP_HOME = "/opt/cloudera/parcels/CDH/lib/hadoop") // set environment variables
    > Sys.setenv(SPARK_HOME = "/opt/cloudera/parcels/CDH/lib/spark/")
    > sc <- spark_connect(master = "yarn", config = config) // Create a spark Connection.
    > intDf <- sparklyr::spark_read_table(sc, 'emp_hwc') // read table
    >  sparklyr::sdf_collect(intDf1) // Converts SparkDataframe to R dataframe               
  3. Explicitly commit your code. For example, use SparklyrHWC::commitTxn(hs), where hs is the HiveWarehouseSessionImpl object.
    You need to commit your code using commitTxn() due to sparklyr calling specific cache methods internally.

Writing a Hive managed table in R

You configure JDBC mode and use and SparklyrHWC::spark_write_table() to write to a managed table.
  1. In R, configure JDBC mode by setting the following configs and values:
    > config$spark.sql.extensions="com.hortonworks.spark.sql.rule.Extensions"
    > config$"false"
    > config$"cluster"                
  2. Include the sparklyr library.
    > library(sparklyr)  // Load sparklyr library                       
  3. Write to a managed table using SparklyrHWC::spark_write_table.
    > library(sparklyr)  // Load sparklyr library
    > Sys.setenv(HADOOP_HOME = "/opt/cloudera/parcels/CDH/lib/hadoop") // set environment variables
    > Sys.setenv(SPARK_HOME = "/opt/cloudera/parcels/CDH/lib/spark/")
    > sc <- spark_connect(master = "yarn", config = config) // Create a spark Connection.
    > intDf <- sparklyr::spark_read_table(sc, 'emp_hwc') // read first table
    > intDf1 <-  sparklyr::spark_read_table(sc, 'emp_overwrite') // read second table
    > commitTxn(hs) // Commit transaction if read using spark-acid
    > SparklyrHWC::spark_write_table('emp_hwc',intDf1,'append') // Append the second table, to the first.
    > SparklyrHWC::spark_write_table('emp_hwc',intDf1,'overwrite') // Overwrite the first table with the second table.
    You do not need to call commitTxn(hs) when using JDBC mode.

Supported HWC APIs in R

The SparklyrHWC package supports the following HWC APIs in Scala.
  • execute() and executeQuery() (recommended) APIs to run read SQL statements
  • executeUpdate() API to run write SQL statements
  • API call for hive CTASK operations (create table as select .....)
  • Other HWC APIs, such as dropTable, dropDatabase, showTable

Any HWC API in Scala can be used in R. The behavior of these APIs in R and HWC is similar.

  1. Create a table using another existing table.
    SparklyrHWC::executeUpdate(hs,"create table hwc1 as select * from hwc")
  2. Read the new table.
    hwcDf <- SparklyrHWC::executeQuery(hs, "select * from hwc1")
  3. Convert the table to a SparkDataframe.
    hwcSdf <- sparklyr::sdf_copy_to(sc, hwcDf)
  4. Write the table to a new table.
  5. Append the table data to the new table.
  6. Insert data into the table using HWC API.
    SparklyrHWC::executeUpdate(hs,"insert into hwcNew values(2,'B')")
  7. Read the newly create table using HWC API.
    SparklyrHWC::executeQuery(hs, "select * from hwcNew")