Spark Guide
Also available as:
PDF
loading table of contents...

Chapter 5. Using the Spark DataFrame API

The Spark DataFrame API provides table-like access to data from a variety of sources. Its purpose is similar to Python's pandas library and R's data frames: collect and organize data into a tabular format with named columns. DataFrames can be constructed from a wide array of sources, including structured data files, Hive tables, and existing Spark RDDs.

  1. As user spark, upload the people.txt file to HDFS:

    cd /usr/hdp/current/spark-client
    su spark
    hdfs dfs -copyFromLocal examples/src/main/resources/people.txt people.txt
    hdfs dfs -copyFromLocal examples/src/main/resources/people.json people.json
  2. Launch the Spark shell:

    cd /usr/hdp/current/spark-client
    su spark 
    ./bin/spark-shell --num-executors 1 --executor-memory 512m --master yarn-client
  3. At the Spark shell, type the following:

    scala> val df = sqlContext.read.format("json").load("people.json")
  4. Using df.show, display the contents of the DataFrame:

    scala> df.show 
    16/05/31 11:24:10 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool
    
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+

Additional DataFrame API Examples

Here are additional examples of Scala-based DataFrame access, using DataFrame df defined in the previous subsection:

// Import the DataFrame functions API
scala> import org.apache.spark.sql.functions._

// Select all rows, but increment age by 1
scala> df.select(df("name"), df("age") + 1).show()

// Select people older than 21
scala> df.filter(df("age") > 21).show()

// Count people by age
df.groupBy("age").count().show()

Specify Schema Programmatically

The following example uses the DataFrame API to specify a schema for people.txt, and retrieve names from a temporary table associated with the schema:

import org.apache.spark.sql._

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val people = sc.textFile("people.txt")
val schemaString = "name age"

import org.apache.spark.sql.types.{StructType,StructField,StringType}

val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

peopleDataFrame.registerTempTable("people")

val results = sqlContext.sql("SELECT name FROM people")

results.map(t => "Name: " + t(0)).collect().foreach(println)

This will produce output similar to the following:

16/05/31 14:36:49 INFO cluster.YarnScheduler: Removed TaskSet 13.0, whose tasks have all completed, from pool 
16/05/31 14:36:49 INFO scheduler.DAGScheduler: ResultStage 13 (collect at :33) finished in 0.129 s
16/05/31 14:36:49 INFO scheduler.DAGScheduler: Job 10 finished: collect at :33, took 0.162827 s
Name: Michael
Name: Andy
Name: Justin