Introduction to HWC and DataFrame APIs

As an Apache Spark developer, you learn the code constructs for executing Apache Hive queries using the HiveWarehouseSession API. In Spark source code, you see how to create an instance of HiveWarehouseSession. You also learn how to access a Hive ACID table using DataFrames.

Supported APIs

  • Spark SQL

    Supports native Spark SQL query read (only) patterns. Output conforms to native spark.sql conventions.

  • HWC

    Supports HiveWarehouse Session API operations using the HWC sql API.

  • DataFrames

    Supports accessing a Hive ACID table from Scala, or pySpark, directly using DataFrames. Use the short name HiveAcid. Direct reads and writes from the file are not supported.

    Spark SQL Example

    $ spark-shell <parameters to specify HWC jar and config settings>
    scala> sql("select * from managedTable").show 
    scala> spark.read.table("managedTable").show

    HWC API Example

    scala> val hive = com.hortonworks.hwc.HiveWarehouseSession.session(spark).build()
    scala> hive.executeQuery("select * from emp_acid").show
    scala> hive.executeQuery("select e.emp_id, e.first_name, d.name department from emp_acid e join dept_ext d on e.dept_id = d.id").show

    DataFrames Example

    Hive ACID tables are tables in Hive metastore and must be formatted using DataFrames as follows:

    Syntax:
    spark.read.format("HiveAcid").option("table", "<table name>")
    
    OR 
    
    val df = hive.sql("<SELECT query>")
    Example:
    scala> val df = spark.read.format("HiveAcid").option(Map("table" -> "default.acidtbl")).load()
    scala> df.collect()
    
    OR
    
    scala> val df = hive.sql("select * from managedTable where a=100")
    scala> df.collect()

Import statements and variables

The following string constants are defined by the API:

  • HIVE_WAREHOUSE_CONNECTOR
  • DATAFRAME_TO_STREAM
  • STREAM_TO_STREAM

Assuming spark is running in an existing SparkSession, use this code for imports:

  • Scala
    import com.hortonworks.hwc.HiveWarehouseSession
    import com.hortonworks.hwc.HiveWarehouseSession._
    val hive = HiveWarehouseSession.session(spark).build()
  • Java
    import com.hortonworks.hwc.HiveWarehouseSession;
    import static com.hortonworks.hwc.HiveWarehouseSession.*;
    HiveWarehouseSession hive = HiveWarehouseSession.session(spark).build();
  • Python
    from pyspark_llap import HiveWarehouseSession
    hive = HiveWarehouseSession.session(spark).build()

Executing queries

HWC supports three methods for executing queries:
  • .sql()
    • Executes queries in any HWC mode.

    • Consistent with the Spark sql interface.

    • Masks the internal implementation based on the cluster type you configured, either JDBC_CLIENT or JDBC_CLUSTER.
  • .execute()
    • Required for executing queries if spark.datasource.hive.warehouse.read.mode=JDBC_CLUSTER.
    • Uses a driver side JDBC connection.
    • Provided for backward compatibility where the method defaults to reading in JDBC client mode irrespective of the value of JDBC client or cluster mode configuration.
    • Recommended for catalog queries.
  • .executeQuery()
    • Executes queries, except catalog queries, in LLAP mode (spark.datasource.hive.warehouse.read.via.llap= true)
    • If LLAP is not enabled in the cluster, .executeQuery() does not work. CDP Data Center does not support LLAP.
    • Provided for backward compatibility.

Results are returned as a DataFrame to Spark.

hive.sql vs. spark.sql

There are a number of important differences between the hive.sql and spark.sql functions:
  • hive.sql() can only handle Apache Hive tables.
  • spark.sql() selects HWC when you query an Apache Hive managed (ACID) table and falls back to native Spark for reading external tables.
  • You can use the same Spark APIs, such as spark.sql(), to access either managed or external tables or temporary views. This is not the case with hive.sql().
  • The Direct Reader imposes the constraint that the Hive table must be transactional.