HiveWarehouseSession API operations
HiveWarehouseSession acts as an API to bridge Spark with Hive. In your Spark source code, you create an instance of HiveWarehouseSession. You use the language-specific code to create the HiveWarehouseSession.
Import statements and variables
The following string constants are defined by the API:
HIVE_WAREHOUSE_CONNECTOR
DATAFRAME_TO_STREAM
STREAM_TO_STREAM
For more information, see the Github project for the Hive Warehouse Connector.
Assuming spark
is running in an existing SparkSession
,
use this code for imports:
- Scala
import com.hortonworks.hwc.HiveWarehouseSession import com.hortonworks.hwc.HiveWarehouseSession._ val hive = HiveWarehouseSession.session(spark).build()
- Java
import com.hortonworks.hwc.HiveWarehouseSession; import static com.hortonworks.hwc.HiveWarehouseSession.*; HiveWarehouseSession hive = HiveWarehouseSession.session(spark).build();
- Python
from pyspark_llap import HiveWarehouseSession hive = HiveWarehouseSession.session(spark).build()
Catalog operations
Set the current database for unqualified Hive table references
hive.setDatabase(<database>)
Execute a catalog operation and return a DataFrame
hive.execute("describe extended web_sales").show(100)
Show databases
hive.showDatabases().show(100)
Show tables for the current database
hive.showTables().show(100)
-
Describe a table
hive.describeTable(<table_name>).show(100)
-
Create a database
hive.createDatabase(<database_name>,<ifNotExists>)
Create an ORC table
hive.createTable("web_sales").ifNotExists().column("sold_time_sk", "bigint").column("ws_ship_date_sk", "bigint").create()
See the CreateTableBuilder interface section below for additional table creation options. Note: You can also create tables through standard Hive using
hive.executeUpdate
.-
Drop a database
hive.dropDatabase(<databaseName>, <ifExists>, <useCascade>)
Drop a table
hive.dropTable(<tableName>, <ifExists>, <usePurge>)
Read operations
Execute a Hive SELECT query and return a DataFrame.
hive.executeQuery("select * from web_sales")
Write operations
-
Execute a Hive update statement
hive.executeUpdate("ALTER TABLE old_name RENAME TO new_name")
Note: You can execute CREATE, UPDATE, DELETE, INSERT, and MERGE statements in this way.
Write a DataFrame to Hive in batch (uses LOAD DATA INTO TABLE)
Java/Scala:
df.write.format(HIVE_WAREHOUSE_CONNECTOR).option("table", <tableName>).save()
Python:
df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table", &tableName>).save()
Write a DataFrame to Hive using HiveStreaming
Java/Scala:
//Using dynamic partitioning df.write.format(DATAFRAME_TO_STREAM).option("table", <tableName>).save() //Or, to write to static partition df.write.format(DATAFRAME_TO_STREAM).option("table", <tableName>).option("partition", <partition>).save()
Python:
//Using dynamic partitioning df.write.format(HiveWarehouseSession().DATAFRAME_TO_STREAM).option("table", <tableName>).save() //Or, to write to static partition df.write.format(HiveWarehouseSession().DATAFRAME_TO_STREAM).option("table", <tableName>).option("partition", <partition>).save()
Write a Spark Stream to Hive using HiveStreaming.
Java/Scala:
stream.writeStream.format(STREAM_TO_STREAM).option("table", "web_sales").start()
Python:
stream.writeStream.format(HiveWarehouseSession().STREAM_TO_STREAM).option("table", "web_sales").start()
ETL example (Scala)
Read table data from Hive, transform it in Spark, and write to a new Hive table.
import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession._
val hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("tpcds_bin_partitioned_orc_1000")
val df = hive.executeQuery("select * from web_sales")
df.createOrReplaceTempView("web_sales")
hive.setDatabase("testDatabase")
hive.createTable("newTable")
.ifNotExists()
.column("ws_sold_time_sk", "bigint")
.column("ws_ship_date_sk", "bigint")
.create()
sql("SELECT ws_sold_time_sk, ws_ship_date_sk FROM web_sales WHERE ws_sold_time_sk > 80000)
.write.format(HIVE_WAREHOUSE_CONNECTOR)
.option("table", "newTable")
.save()
HiveWarehouseSession interface
package com.hortonworks.hwc;
public interface HiveWarehouseSession {
//Execute Hive SELECT query and return DataFrame
Dataset<Row> executeQuery(String sql);
//Execute Hive update statement
boolean executeUpdate(String sql);
//Execute Hive catalog-browsing operation and return DataFrame
Dataset<Row> execute(String sql);
//Reference a Hive table as a DataFrame
Dataset<Row> table(String sql);
//Return the SparkSession attached to this HiveWarehouseSession
SparkSession session();
//Set the current database for unqualified Hive table references
void setDatabase(String name);
/**
* Helpers: wrapper functions over execute or executeUpdate
*/
//Helper for show databases
Dataset<Row> showDatabases();
//Helper for show tables
Dataset<Row> showTables();
//Helper for describeTable
Dataset<Row> describeTable(String table);
//Helper for create database
void createDatabase(String database, boolean ifNotExists);
//Helper for create table stored as ORC
CreateTableBuilder createTable(String tableName);
//Helper for drop database
void dropDatabase(String database, boolean ifExists, boolean cascade);
//Helper for drop table
void dropTable(String table, boolean ifExists, boolean purge);
}
CreateTableBuilder interface
package com.hortonworks.hwc;
public interface CreateTableBuilder {
//Silently skip table creation if table name exists
CreateTableBuilder ifNotExists();
//Add a column with the specific name and Hive type
//Use more than once to add multiple columns
CreateTableBuilder column(String name, String type);
//Specific a column as table partition
//Use more than once to specify multiple partitions
CreateTableBuilder partition(String name, String type);
//Add a table property
//Use more than once to add multiple properties
CreateTableBuilder prop(String key, String value);
//Make table bucketed, with given number of buckets and bucket columns
CreateTableBuilder clusterBy(long numBuckets, String ... columns);
//Creates ORC table in Hive from builder instance
void create();
}