Chapter 9. Accessing Hive Tables from Spark
The following example reads and writes to HDFS under Hive directories using the built-in
UDF collect_list(col)
, which returns a list of objects with duplicates.
Note | |
---|---|
If Spark was installed manually (without using Ambari), see Configuring Spark for Hive Access before accessing Hive data from Spark. |
In a production environment this type of operation would run under an account with
appropriate HDFS permissions; the following example uses hdfs
user.
Launch the Spark Shell on a YARN cluster:
su hdfs ./bin/spark-shell --num-executors 2 --executor-memory 512m --master yarn-client
Create Hive Context:
scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
You should see output similar to the following:
… hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@7d9b2e8d
Create a Hive table:
scala> hiveContext.sql("CREATE TABLE IF NOT EXISTS TestTable (key INT, value STRING)")
You should see output similar to the following:
… from=org.apache.hadoop.hive.ql.Driver> 15/08/20 13:39:18 INFO PerfLogger: </PERFLOG method=Driver.run start=1440092357218 end=1440092358126 duration=908 from=org.apache.hadoop.hive.ql.Driver> res0: org.apache.spark.sql.DataFrame = [result: string]
Load sample data from
KV1.txt
into the table:scala> hiveContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE TestTable")
Invoke the Hive
collect_list
UDF:scala> hiveContext.sql("from TestTable SELECT key, collect_list(value) group by key order by key").collect.foreach(println)