Chapter 6. Accessing ORC Files from Spark
Spark on HDP supports the Optimized Row Columnar ("ORC") file format, a self-describing, type-aware column-based file format that is one of the primary file formats supported in Apache Hive. The columnar format lets the reader read, decompress, and process only the columns that are required for the current query. ORC support in Spark SQL and DataFrame APIs provides fast access to ORC data contained in Hive tables. It supports ACID transactions, snapshot isolation, built-in indexes, and complex types.
Accessing ORC in Spark
Spark’s ORC data source supports complex data types (such as array, map, and struct), and provides read and write access to ORC files. It leverages Spark SQL’s Catalyst engine for common optimizations such as column pruning, predicate push-down, and partition pruning.
This chapter has several examples of Spark’s ORC integration, showing how such optimizations are applied to user programs.
To start using ORC, define a HiveContext instance:
import org.apache.spark.sql._ val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
The following examples use a few data structures to demonstrate working with complex types. The Person struct has name, age, and a sequence of Contacts, which are themselves defined by names and phone numbers. Define these structures as follows:
case class Contact(name: String, phone: String) case class Person(name: String, age: Int, contacts: Seq[Contact])
Next, create 100 records. In the physical file these records will be saved in columnar format, but users will see rows when accessing ORC files via the DataFrame API. Each row represents one Person record.
val records = (1 to 100).map { i =>; Person(s"name_$i", i, (0 to 1).map { m => Contact(s"contact_$m", s"phone_$m") }) }
Reading and Writing with ORC
Spark’s DataFrameReader and DataFrameWriter are used to access ORC files, in a similar manner to other data sources.
To write People objects as ORC files to directory “people”, use the following command:
sc.parallelize(records).toDF().write.format("orc").save("people")
Read the objects back as follows:
val people = sqlContext.read.format("orc").load("people.json")
For reuse in future operations, register it as temporary table “people”:
people.registerTempTable("people")
Column Pruning
The previous step registered the table as a temporary table named “people”. The following SQL query references two columns from the underlying table.
sqlContext.sql("SELECT name FROM people WHERE age < 15").count()
At runtime, the physical table scan will only load columns name and age, without reading the contacts column from the file system. This improves read performance.
ORC reduces I/O overhead by only touching required columns. It requires significantly fewer seek operations because all columns within a single stripe are stored together on disk.
Predicate Push-down
The columnar nature of the ORC format helps avoid reading unnecessary columns, but it is still possible to read unnecessary rows. In our example, we read all rows where age was between 0 and 100, even though we requested rows where age was less than 15. Such full table scanning is an expensive operation.
ORC avoids this type of overhead by using predicate push-down with three levels of built-in indexes within each file: file level, stripe level, and row level:
File and stripe level statistics are in the file footer, making it easy to determine if the rest of the file needs to be read.
Row level indexes include column statistics for each row group and position, for seeking to the start of the row group.
ORC utilizes these indexes to move the filter operation to the data loading phase, by reading only data that potentially includes required rows.
This combination of indexed data and columnar storage reduces disk I/O significantly, especially for larger datasets where I/O bandwidth becomes the main bottleneck for performance.
Important | |
---|---|
By default, ORC predicate push-down is disabled in Spark SQL. To obtain performance benefits from predicate push-down, you must enable it explicitly, as follows: sqlContext.setConf("spark.sql.orc.filterPushdown", "true") |
Partition Pruning
When predicate pushdown is not applicable--for example, if all stripes contain records that match the predicate condition--a query with a WHERE clause might need to read the entire data set. This becomes a bottleneck over a large table. Partition pruning is another optimization method; it exploits query semantics to avoid reading large amounts of data unnecessarily.
Partition pruning is possible when data within a table is split across multiple logical partitions. Each partition corresponds to a particular value(s) of partition column(s), and is stored as a sub-directory within the table’s root directory on HDFS. Where applicable, only the required partitions (subdirectories) of a table are queried, thereby avoiding unnecessary I/O.
Spark supports saving data out in a partitioned layout seamlessly, through the partitionBy method available during data source writes. To partition the people table by the “age” column, use the following command:
people.write.format("orc").partitionBy("age").save("peoplePartitioned")
Records will be automatically partitioned by the age field, and then saved into different
directories; for example, peoplePartitioned/age=1/
,
peoplePartitioned/age=2/
, etc.
After partitioning the data, subsequent queries will be able to skip large amounts of I/O
when the partition column is referenced in predicates. For example, the following query will
automatically locate and load the file under peoplePartitioned/age=20/
; it will
skip all others.
val peoplePartitioned = sqlContext.read.format("orc").load("peoplePartitioned") peoplePartitioned.registerTempTable("peoplePartitioned") sqlContext.sql("SELECT * FROM peoplePartitioned WHERE age = 20")
DataFrame Support
DataFrames look similar to Spark RDDs, but have higher-level semantics built into their operators. This allows optimization to be pushed down to the underlying query engine. ORC data can be loaded into DataFrames.
Here is the Scala API translation of the preceding SELECT query, using the DataFrame API:
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.setConf("spark.sql.orc.filterPushdown", "true") val people = sqlContext.read.format("orc").load("peoplePartitioned") people.filter(people("age") < 15).select("name").show()
DataFrames are not limited to Scala. There is a Java API and, for data scientists, a Python API binding:
sqlContext = HiveContext(sc) sqlContext.setConf("spark.sql.orc.filterPushdown", "true") people = sqlContext.read.format("orc").load("peoplePartitioned") people.filter(people.age < 15).select("name").show()
Additional Resources
Apache ORC website: https://orc.apache.org/
ORC performance: http://hortonworks.com/blog/orcfile-in-hdp-2-better-compression-better-performance/
Get Started with Spark: http://hortonworks.com/hadoop/spark/get-started/