Using the Spark DataFrame API
A DataFrame is a distributed collection of data organized into named
columns. It is conceptually equivalent to a table in a relational
database or a data frame in R or in the Python pandas
library. You can construct DataFrames from a wide array of sources,
including structured data files, Apache Hive tables, and existing Spark
resilient distributed datasets (RDD). The Spark DataFrame API is
available in Scala, Java, Python, and R.
This section provides examples of DataFrame API use.
To list JSON file contents as a DataFrame:
- Upload the
people.txt
andpeople.json
example files to your object store:hdfs dfs -put people.txt people.json s3a://<bucket_name>/
- Launch the Spark
shell:
spark-shell --conf "spark.yarn.access.hadoopFileSystems=s3a://<bucket_name>/"
- At the Spark shell, type the
following:
scala> val df = spark.read.format("json").load("s3a://<bucket_name>/people.json")
- Using
df.show
, display the contents of the DataFrame:scala> df.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
The following examples use Scala to access DataFrame df
defined in the
previous subsection:
// Select all rows, but increment age by 1
scala> df.select(df("name"), df("age") + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
// Select people older than 21
scala> df.filter(df("age") > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
// Count people by age
scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
The following example uses the DataFrame API to specify a schema for
people.txt
, and then retrieves names from a temporary table associated
with the schema:
val people = sc.textFile("s3a://<bucket_name>/people.txt")
val schemaString = "name age"
import org.apache.spark.sql.types.{StructType,StructField,StringType}
import org.apache.spark.sql.Row
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
val peopleDataFrame = spark.createDataFrame(rowRDD, schema)
peopleDataFrame.createOrReplaceTempView("people")
val results = spark.sql("SELECT name FROM people")
results.map(t => "Name: " + t(0)).collect().foreach(println)
This produces output similar to the following:
Name: Michael
Name: Andy
Name: Justin