Spark SQL example
This example demonstrates how to use spark.sql
to
create and load two tables and select rows from the tables into two
DataFrames. The next steps use the DataFrame API to filter the rows for
salaries greater than 150,000 from one of the tables and shows the
resulting DataFrame. Then the two DataFrames are joined to create a third
DataFrame. Finally the new DataFrame is saved to a Hive table.
- Copy the Hue
sample_07.csv
andsample_08.csv
files to your object store in a location accessible by the Spark cluster:hdfs dfs -put /opt/cloudera/parcels/CDH/lib/hue/apps/beeswax/data/sample_0* s3a://<bucket_name>/
- Launch
spark-shell
:spark-shell --conf "spark.yarn.access.hadoopFileSystems=s3a://<bucket_name>"
- Create Hive tables sample_07 and sample_08:
scala> spark.sql("CREATE EXTERNAL TABLE sample_07 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile LOCATION 's3a://<bucket_name>/s07/'") scala> spark.sql("CREATE EXTERNAL TABLE sample_08 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile LOCATION 's3a://<bucket_name>/s08/'")
- In another session, launch
Beeline:
beeline -u "jdbc:hive2://<HiveServer2_host>:10001/default;principal=hive/_HOST@CLOUDERA.SITE;transportMode=http;httpPath=cliservice"
- In Beeline, show the Hive tables:
0: jdbc:hive2://hs2.cloudera.site:> show tables; +------------+--+ | tab_name | +------------+--+ | sample_07 | | sample_08 | +------------+--+
- In the Spark shell, load the data from the CSV files into the
tables:
scala> spark.sql("LOAD DATA INPATH 's3a://<bucket_name>/sample_07.csv' OVERWRITE INTO TABLE sample_07") scala> spark.sql("LOAD DATA INPATH 's3a://<bucket_name>/sample_08.csv' OVERWRITE INTO TABLE sample_08")
- Create DataFrames containing the contents of the sample_07 and
sample_08 tables:
scala> val df_07 = spark.sql("SELECT * from sample_07") scala> val df_08 = spark.sql("SELECT * from sample_08")
- Show all rows in df_07 with salary greater than 150,000:
The output should be:scala> df_07.filter(df_07("salary") > 150000).show()
+-------+--------------------+---------+------+ | code| description|total_emp|salary| +-------+--------------------+---------+------+ |11-1011| Chief executives| 299160|151370| |29-1022|Oral and maxillof...| 5040|178440| |29-1023| Orthodontists| 5350|185340| |29-1024| Prosthodontists| 380|169360| |29-1061| Anesthesiologists| 31030|192780| |29-1062|Family and genera...| 113250|153640| |29-1063| Internists, general| 46260|167270| |29-1064|Obstetricians and...| 21340|183600| |29-1067| Surgeons| 50260|191410| |29-1069|Physicians and su...| 237400|155150| +-------+--------------------+---------+------+
- Create the DataFrame
df_09
by joiningdf_07
anddf_08
, retaining only thecode
anddescription
columns.
The new DataFrame looks like:scala> val df_09 = df_07.join(df_08, df_07("code") === df_08("code")).select(df_07.col("code"),df_07.col("description")) scala> df_09.orderBy($"code".asc).show()
+-------+--------------------+ | code| description| +-------+--------------------+ |00-0000| All Occupations| |11-0000|Management occupa...| |11-1011| Chief executives| |11-1021|General and opera...| |11-1031| Legislators| |11-2011|Advertising and p...| |11-2021| Marketing managers| |11-2022| Sales managers| |11-2031|Public relations ...| |11-3011|Administrative se...| |11-3021|Computer and info...| |11-3031| Financial managers| |11-3041|Compensation and ...| |11-3042|Training and deve...| |11-3049|Human resources m...| |11-3051|Industrial produc...| |11-3061| Purchasing managers| |11-3071|Transportation, s...| |11-9011|Farm, ranch, and ...| |11-9012|Farmers and ranchers| +-------+--------------------+
- Save DataFrame
df_09
as the Hive tablesample_09
:scala> df_09.write.option("path","s3a://<bucket_name>/s09/").saveAsTable("sample_09")
- In Beeline, show the Hive tables:
0: jdbc:hive2://hs2.cloudera.site:> show tables; +------------+--+ | tab_name | +------------+--+ | sample_07 | | sample_08 | | sample_09 | +------------+--+
Here is an equivalent program in Python, that you could submit using
spark-submit
:
from pyspark import SparkContext, SparkConf, HiveContext
if __name__ == "__main__":
# create Spark context with Spark configuration
conf = SparkConf().setAppName("Data Frame Join")
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
df_07 = sqlContext.sql("SELECT * from sample_07")
df_07.filter(df_07.salary > 150000).show()
df_08 = sqlContext.sql("SELECT * from sample_08")
tbls = sqlContext.sql("show tables")
tbls.show()
df_09 = df_07.join(df_08, df_07.code == df_08.code).select(df_07.code,df_07.description)
df_09.show()
df_09.write.saveAsTable("sample_09")
tbls = sqlContext.sql("show tables")
tbls.show()