Chapter 7. Accessing ORC Files from Spark
Spark on HDP provides full support for Optimized Row Columnar ("ORC") files. ORC is a column-based file format that offers efficient storage of Hive data.
The following example shows how to access an ORC file programmatically as a table.
The example uses a text file called people.txt
, which is included in the Apache Spark distribution. The file contains three lines:
Michael, 29 Andy, 30 Justin, 19
Download or create the
people.txt
file.Copy
people.txt
into HDFS:cd /usr/hdp/current/spark-client/conf/
hadoop dfs -put examples/src/main/resources/people.txt people.txt
Create and populate a Hive table:
**# Import ORC support and Spark SQL libraries import org.apache.spark.sql.hive.orc._ import org.apache.spark.sql._ **# Prepare the Spark table** **# Create a hiveContext **# sc is an existing SparkContext val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) **# Create an RDD of "people" objects val people = sc.textFile("people.txt") **# Specify the schema as a string val schemaString = "name age" **# Create the schema based on the schemaString val schema = StructType( schemaString.split(" ").map(fieldName => {if(fieldName == "name") StructField(fieldName, StringType, true) else StructField(fieldName, IntegerType, true)})) **# Convert records in people to rows val rowRDD = people.map(_.split(",")).map(p => Row(p(0), new Integer(p(1).trim))) **# Apply the schema to the RDD val peopleSchemaRDD = hiveContext.applySchema(rowRDD, schema) **# Register the people SchemaRdd as a table peopleSchemaRDD.registerTempTable("people") val results = hiveContext.sql("SELECT * FROM people") **# List query results results.map(t => "Name: " + t.toString).collect().foreach(println)
Create and populate an ORC table from
people
:**# ORC-specific section ** **# Save people as an ORC-format file peopleSchemaRDD.saveAsOrcFile("people.orc") **# Import "people.orc" into a Spark SQL table called "morePeople" val morePeople = hiveContext.orcFile("people.orc") **# Register morePeople as a table **# This allows you to run standard SQL queries on morePeople morePeople.registerTempTable("morePeople") **# Display all rows hiveContext.sql("SELECT * from morePeople").collect.foreach(println)