Reading and writing Hive tables in R

The Hive Warehouse Connector (HWC) supports reads and writes to Apache Hive managed ACID tables in R. Cloudera provides an R package SparklyrHWC that includes all HWC methods, such as execute and executeQuery, and a spark_write_table method to write to managed tables. The native sparklyr spark_write_table method supports writes to external tables only.

Support

HWC should work with Sparklyr 1.0.4. Versions later than 1.0.4 should also work if interfaces are not changed by sparklyR. However, SparklyR isn't supported by Cloudera. We will support any issues around using HWC from sparklyR.

Downloading SparklyrHWC

You can download the SparklyrHWC R package that includes HWC methods from your CDP Cluster. Go to /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/. Copy the SparklyrHWC-<version> to your download location.

To read and write Hive tables in R, you need to configure an HWC execution mode. You can configure either one of the following HWC execution modes in R:
  • JDBC mode
    • Suitable for writing production workloads.
    • Suitable for reading production workloads having a data size of 1 GB or less.
    • Use this mode for reading if latency is not an issue.
  • Spark-ACID mode
    • Suitable for reading production workloads.
    • Does not support writes

Reading and writing managed tables

You can read Hive managed tables using either JDBC or Spark-ACID mode. The mode you configure affects the background process. You use the same R code regardless of the mode with one exception: You do not need to call the commitTxn(hs) when using JDBC mode.

To write to Hive managed tables, you must connect to HWC in JDBC mode.

Reading and writing external tables

You can read and write Hive external tables in R using the sparklyr package. HWC is not required.

In the following procedure, you configure Spark-Acid execution mode to read tables on a production cluster. You use the native sparklyr spark_read_table and spark_load_table to read Hive managed tables in R.

Reading a Hive managed table example

  1. Read tables using spark-acid - direct_reader_v2 mode.
    For example:
    library(sparklyr)
    
    // Configurations needed  to use spark-acid 
    
    config <- spark_config()
    config$sparklyr.jars.default <- "<path to hwc jar>"
    config$spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://<url>:10000/default"
    config$spark.datasource.hive.warehouse.user.name="hive"
    config$spark.hadoop.hive.metastore.uris="thrift://<url>:9083"
    config$spark.sql.extensions="com.hortonworks.spark.sql.rule.Extensions"
    config$spark.kryo.registrator="com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator"
    config$spark.datasource.hive.warehouse.read.mode="DIRECT_READER_V2"
    
    //Set env variables
    
    Sys.setenv(SPARK_HOME = "/opt/cloudera/parcels/CDH/lib/spark/")
    Sys.setenv(HADOOP_HOME = "/opt/cloudera/parcels/CDH/lib/hadoop")
    
    // Spark context can be set to local or yarn mode
    sc <- spark_connect(master = "local",config = config)                    
  2. Read Hive using sparklyr methods spark_read_table() or spark_load_table().
    For example:
    // reading a managed table using spark acid direct-reader
    
    > intDf <- sparklyr::spark_read_table(sc, 'emp_hwc')
    >  sparklyr::sdf_collect(intDf1) // Converts SparkDataframe to R dataframe               
  3. Explicitly commit your code. For example, use SparklyrHWC::commitTxn(hs), where hs is the HiveWarehouseSessionImpl object.
    You need to commit your code using commitTxn() due to sparklyr calling specific cache methods internally.

Writing a Hive managed table in R

You configure JDBC mode and use and SparklyrHWC::spark_write_table() to write to a managed table.
  1. In R, configure JDBC mode by setting the following configs and values:
    > config$spark.sql.extensions="com.hortonworks.spark.sql.rule.Extensions"
    > config$spark.datasource.hive.warehouse.read.via.llap="false"
    > config$spark.datasource.hive.warehouse.read.jdbc.mode="cluster"                
  2. Include the sparklyr library.
    > library(sparklyr)  // Load sparklyr library                       
  3. Write to a managed table using SparklyrHWC::spark_write_table.
    > library(sparklyr)  // Load sparklyr library
    > Sys.setenv(HADOOP_HOME = "/opt/cloudera/parcels/CDH/lib/hadoop") // set environment variables
    > Sys.setenv(SPARK_HOME = "/opt/cloudera/parcels/CDH/lib/spark/")
    > sc <- spark_connect(master = "yarn", config = config) // Create a spark Connection.
    
    > intDf <- sparklyr::spark_read_table(sc, 'emp_hwc') // read first table
    > intDf1 <-  sparklyr::spark_read_table(sc, 'emp_overwrite') // read second table
    > commitTxn(hs) // Commit transaction if read using spark-acid
    > SparklyrHWC::spark_write_table('emp_hwc',intDf1,'append') // Append the second table, to the first.
    > SparklyrHWC::spark_write_table('emp_hwc',intDf1,'overwrite') // Overwrite the first table with the second table.
    
    You do not need to call commitTxn(hs) when using JDBC mode.

Supported HWC APIs in R

The SparklyrHWC package supports the following HWC APIs in Scala.
  • execute() and executeQuery() (recommended) APIs to run read SQL statements
  • executeUpdate() API to run write SQL statements
  • API call for hive CTASK operations (create table as select .....)
  • Other HWC APIs, such as dropTable, dropDatabase, showTable

Any HWC API in Scala can be used in R. The behavior of these APIs in R and HWC is similar.

  1. Create a table using another existing table.
    SparklyrHWC::executeUpdate(hs,"create table hwc1 as select * from hwc")
  2. Read the new table.
    hwcDf <- SparklyrHWC::executeQuery(hs, "select * from hwc1")
  3. Convert the table to a SparkDataframe.
    hwcSdf <- sparklyr::sdf_copy_to(sc, hwcDf)
  4. Write the table to a new table.
    SparklyrHWC::spark_write_table('hwcNew',hwcSdf,'OVERWRITE')
  5. Append the table data to the new table.
    SparklyrHWC::spark_write_table('hwcNew',hwcSdf,'APPEND')
  6. Insert data into the table using HWC API.
    SparklyrHWC::executeUpdate(hs,"insert into hwcNew values(2,'B')")
  7. Read the newly create table using HWC API.
    SparklyrHWC::executeQuery(hs, "select * from hwcNew")