Sample Spark workload to access data
See an example of an end to end sample workload flow that describes the process for enabling a Spark session, connecting to the HMS REST Catalog server, and running a Spark engine query to access data.
Before you run the workload
Obtain the OAuth client ID and client secret for your Data Catalog consumer from the
Manage Users export (CSV columns clientId and
secret). Pass them to the sample script as a single argument,
ClientId:Secret (a colon between the two values, no spaces).
From a shell, run your PySpark script and supply that value for
--credential. Replace your_script.py with your script
filename and substitute your real client credentials for the placeholder.
python your_script.py --credential '[***CLIENT-ID:CLIENT-SECRET***]'
PySpark workload example
The PySpark workload connects to Cloudera HMS REST Catalog with CLIENT_ID
and CLIENT_SECRET as credentials and runs workloads from an external system
like Databricks, Snowflake, Standalone Spark in Docker, and so on. The following example
expects the same ClientId:Secret string to be passed in at run time via
args.credential (see the previous section).
# © 2024 by Cloudera, Inc. All rights reserved.
# Scripts and sample code are licensed under the Apache License,
# Version 2.0
import pyspark
from pyspark.sql import SparkSession
import argparse
parser = argparse.ArgumentParser(description="Spark WorkLoad Script")
parser.add_argument("--credential", help="ClientId:Secret")
args = parser.parse_args()
conf = (
pyspark.SparkConf()
.setAppName('Fetch Employees')
.setMaster('[***SPARK-MASTER-URL***]')
.set('spark.jars', '[***PATH-TO-ICEBERG-SPARK-RUNTIME-JAR***]')
.set('spark.files', '[***PATH-TO-LOG4J2-PROPERTIES***]')
#packages
.set('spark.jars.packages', '[***ICEBERG-SPARK-RUNTIME-PACKAGES***]')
#SQL Extensions
.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
#Configuring Catalog
.set('spark.sql.defaultCatalog', 'demo')
.set('spark.sql.catalog.demo', 'org.apache.iceberg.spark.SparkCatalog')
.set('spark.sql.catalog.demo.type', 'rest')
.set('spark.sql.catalog.demo.uri', 'https://[***DATALAKE-HOSTNAME***]/[***DATALAKE-NAME***]/cdp-datashare-access/hms-api')
.set('spark.sql.catalog.demo.io-impl', '[***ICEBERG-FILE-IO-IMPL***]')
.set('spark.sql.catalog.demo.s3.client-factory-impl', '[***ICEBERG-S3-FILE-IO-CLIENT-FACTORY-IMPL***]')
.set('spark.sql.catalog.demo.credential', args.credential)
.set('spark.sql.catalog.demo.default-namespace', '[***DEFAULT-NAMESPACE***]')
)
## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")
print("Spark Job Running...")
print("####### Credential: #######")
print(args.credential)
## list databases
dblist=spark.catalog.listDatabases()
print("###### List Databases ########")
print(dblist)
spark.sparkContext.parallelize([dblist]).coalesce(1).saveAsTextFile("file:///[***OUTPUT-DATABASES-DIR***]")
## list tables
tableList=spark.catalog.listTables("demo.[***DEFAULT-NAMESPACE***]")
print("###### List Tables #######")
print(tableList)
spark.sparkContext.parallelize([tableList]).coalesce(1).saveAsTextFile("file:///[***OUTPUT-TABLES-DIR***]")
## Run a Query
print("##### Query: fetch all the employees of 'department -> d006' ######")
results=spark.sql("select [***DEFAULT-NAMESPACE***].employees.first_name, [***DEFAULT-NAMESPACE***].employees.last_name, [***DEFAULT-NAMESPACE***].departments.dept_name "
"from [***DEFAULT-NAMESPACE***].employees, [***DEFAULT-NAMESPACE***].departments, [***DEFAULT-NAMESPACE***].dept_emp "
"where [***DEFAULT-NAMESPACE***].employees.emp_no=[***DEFAULT-NAMESPACE***].dept_emp.emp_no "
"and [***DEFAULT-NAMESPACE***].dept_emp.dept_no=[***DEFAULT-NAMESPACE***].departments.dept_no "
"and [***DEFAULT-NAMESPACE***].departments.dept_no='[***SAMPLE-DEPT-NO***]'")
print(results.show())
results.coalesce(1).write.option("header", "true").csv("file:///[***OUTPUT-QUERY-RESULTS-DIR***]")
