Chapter 5. Using the Spark DataFrame API
The Spark DataFrame API provides table-like access to data from a variety of sources. Its
purpose is similar to Python's pandas
library and R's data frames: collect and
organize data into a tabular format with named columns. DataFrames can be constructed from a
wide array of sources, including structured data files, Hive tables, and existing Spark
RDDs.
As user
spark
, upload thepeople.txt
file to HDFS:cd /usr/hdp/current/spark-client su spark hdfs dfs -copyFromLocal examples/src/main/resources/people.txt people.txt hdfs dfs -copyFromLocal examples/src/main/resources/people.json people.json
Launch the Spark shell:
cd /usr/hdp/current/spark-client su spark ./bin/spark-shell --num-executors 1 --executor-memory 512m --master yarn-client
At the Spark shell, type the following:
scala> val df = sqlContext.read.format("json").load("people.json")
Using
df.show
, display the contents of the DataFrame:scala> df.show 16/04/10 11:24:10 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
Additional DataFrame API Examples
Here are additional examples of Scala-based DataFrame access, using DataFrame
df
defined in the previous subsection:
// Import the DataFrame functions API scala> import org.apache.spark.sql.functions._ // Select all rows, but increment age by 1 scala> df.select(df("name"), df("age") + 1).show() // Select people older than 21 scala> df.filter(df("age") > 21).show() // Count people by age df.groupBy("age").count().show()
Specify Schema Programmatically
The following example uses the DataFrame API to specify a schema for
people.txt
, and retrieve names from a temporary table associated with the
schema:
import org.apache.spark.sql._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) val people = sc.textFile("people.txt") val schemaString = "name age" import org.apache.spark.sql.types.{StructType,StructField,StringType} val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) peopleDataFrame.registerTempTable("people") val results = sqlContext.sql("SELECT name FROM people") results.map(t => "Name: " + t(0)).collect().foreach(println)
This will produce output similar to the following:
16/04/10 14:36:49 INFO cluster.YarnScheduler: Removed TaskSet 13.0, whose tasks have all completed, from pool 16/04/10 14:36:49 INFO scheduler.DAGScheduler: ResultStage 13 (collect at :33) finished in 0.129 s 16/04/10 14:36:49 INFO scheduler.DAGScheduler: Job 10 finished: collect at :33, took 0.162827 s Name: Michael Name: Andy Name: Justin