Sample Spark workload to access data
See an example of an end to end sample workload flow that describes the process for enabling a Spark session, connecting to the HMS REST Catalog server, and running a Spark engine query to access data.
PySpark workload example
The PySpark workload connects to Cloudera HMS REST Catalog with CLIENT_ID
and CLIENT_SECRET
as credentials and runs workloads from an external system
like Databricks, Snowflake, Standalone Spark in Docker, and so on.
# © 2024 by Cloudera, Inc. All rights reserved.
# Scripts and sample code are licensed under the Apache License,
# Version 2.0
import pyspark
from pyspark.sql import SparkSession
import argparse
parser = argparse.ArgumentParser(description="Spark WorkLoad Script")
parser.add_argument("--credential", help="ClientId:Secret")
args = parser.parse_args()
conf = (
pyspark.SparkConf()
.setAppName('Fetch Employees')
.setMaster('local[*]')
.set('spark.jars', '/opt/spark/jars/iceberg-spark-runtime-3.5_2.13-1.5.2.jar')
.set('spark.files', '/opt/spark/conf/log4j2.properties')
#packages
.set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.13-1.5.2.jar')
#SQL Extensions
.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
#Configuring Catalog
.set('spark.sql.defaultCatalog', 'demo')
.set('spark.sql.catalog.demo', 'org.apache.iceberg.spark.SparkCatalog')
.set('spark.sql.catalog.demo.type', 'rest')
.set('spark.sql.catalog.demo.uri', 'https://<datalake-hostname>/<datalake-name>/cdp-share-access/hms-api/icecli')
.set('spark.sql.catalog.demo.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
.set('spark.sql.catalog.demo.s3.client-factory-impl', 'org.apache.iceberg.aws.s3.DefaultS3FileIOAwsClientFactory')
.set('spark.sql.catalog.demo.credential', args.credential)
.set('spark.sqlja.catalog.demo.default-namespace', 'emp_data')
)
## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")
print("Spark Job Running...")
print("####### Credential: #######")
print(args.credential)
## list databases
dblist=spark.catalog.listDatabases()
print("###### List Databases ########")
print(dblist)
spark.sparkContext.parallelize([dblist]).coalesce(1).saveAsTextFile("file:///opt/spark/out/databases")
## list tables
tableList=spark.catalog.listTables("demo.emp_data")
print("###### List Tables #######")
print(tableList)
spark.sparkContext.parallelize([tableList]).coalesce(1).saveAsTextFile("file:///opt/spark/out/tables")
## Run a Query
print("##### Query: fetch all the employees of 'department -> d006' ######")
results=spark.sql("select emp_data.employees.first_name, emp_data.employees.last_name, emp_data.departments.dept_name "
"from emp_data.employees, emp_data.departments, emp_data.dept_emp "
"where emp_data.employees.emp_no=emp_data.dept_emp.emp_no "
"and emp_data.dept_emp.dept_no=emp_data.departments.dept_no "
"and emp_data.departments.dept_no='d006'")
print(results.show())
results.coalesce(1).write.option("header", "true").csv("file:///opt/spark/out/query_results")