Sample Spark workload to access data
See an example of an end to end sample workload flow that describes the process for enabling a Spark session, connecting to the HMS REST Catalog server, and running a Spark engine query to access data.
Preparing to run the workload
Perform the following steps before you run the workload:
- Obtain the OAuth Client ID and Client Secret for your Data Catalog consumer from the
clientIdandsecretcolumns of the Manage Users export CSV file. - Pass the Client ID and Client Secret to the sample script as a single argument,
ClientId:Secretusing a colon between the two values, not spaces. - From a shell, run your PySpark script and supply that value for the
--credentialparameter.Replace
your_script.pywith your script filename and substitute your real client credentials for the placeholder.python your_script.py --credential '[***CLIENT-ID:CLIENT-SECRET***]'
PySpark workload example
The PySpark workload connects to Cloudera HMS REST Catalog using the
CLIENT_ID and CLIENT_SECRET as credentials and runs
workloads from an external system such as Databricks, Snowflake, or Standalone Spark in
Docker. The following example script requires the ClientId:Secret string to
be passed at runtime using the args.credential parameter.
# © 2024 by Cloudera, Inc. All rights reserved.
# Scripts and sample code are licensed under the Apache License,
# Version 2.0
import pyspark
from pyspark.sql import SparkSession
import argparse
parser = argparse.ArgumentParser(description="Spark WorkLoad Script")
parser.add_argument("--credential", help="ClientId:Secret")
args = parser.parse_args()
conf = (
pyspark.SparkConf()
.setAppName('Fetch Employees')
.setMaster('[***SPARK-MASTER-URL***]')
.set('spark.jars', '[***PATH-TO-ICEBERG-SPARK-RUNTIME-JAR***]')
.set('spark.files', '[***PATH-TO-LOG4J2-PROPERTIES***]')
#packages
.set('spark.jars.packages', '[***ICEBERG-SPARK-RUNTIME-PACKAGES***]')
#SQL Extensions
.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
#Configuring Catalog
.set('spark.sql.defaultCatalog', 'demo')
.set('spark.sql.catalog.demo', 'org.apache.iceberg.spark.SparkCatalog')
.set('spark.sql.catalog.demo.type', 'rest')
.set('spark.sql.catalog.demo.uri', 'https://[***DATALAKE-HOSTNAME***]/[***DATALAKE-NAME***]/cdp-datashare-access/hms-api')
.set('spark.sql.catalog.demo.io-impl', '[***ICEBERG-FILE-IO-IMPL***]')
.set('spark.sql.catalog.demo.s3.client-factory-impl', '[***ICEBERG-S3-FILE-IO-CLIENT-FACTORY-IMPL***]')
.set('spark.sql.catalog.demo.credential', args.credential)
.set('spark.sql.catalog.demo.default-namespace', '[***DEFAULT-NAMESPACE***]')
)
## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")
print("Spark Job Running...")
print("####### Credential: #######")
print(args.credential)
## list databases
dblist=spark.catalog.listDatabases()
print("###### List Databases ########")
print(dblist)
spark.sparkContext.parallelize([dblist]).coalesce(1).saveAsTextFile("file:///[***OUTPUT-DATABASES-DIR***]")
## list tables
tableList=spark.catalog.listTables("demo.[***DEFAULT-NAMESPACE***]")
print("###### List Tables #######")
print(tableList)
spark.sparkContext.parallelize([tableList]).coalesce(1).saveAsTextFile("file:///[***OUTPUT-TABLES-DIR***]")
## Run a Query
print("##### Query: fetch all the employees of 'department -> d006' ######")
results=spark.sql("select [***DEFAULT-NAMESPACE***].employees.first_name, [***DEFAULT-NAMESPACE***].employees.last_name, [***DEFAULT-NAMESPACE***].departments.dept_name "
"from [***DEFAULT-NAMESPACE***].employees, [***DEFAULT-NAMESPACE***].departments, [***DEFAULT-NAMESPACE***].dept_emp "
"where [***DEFAULT-NAMESPACE***].employees.emp_no=[***DEFAULT-NAMESPACE***].dept_emp.emp_no "
"and [***DEFAULT-NAMESPACE***].dept_emp.dept_no=[***DEFAULT-NAMESPACE***].departments.dept_no "
"and [***DEFAULT-NAMESPACE***].departments.dept_no='[***SAMPLE-DEPT-NO***]'")
print(results.show())
results.coalesce(1).write.option("header", "true").csv("file:///[***OUTPUT-QUERY-RESULTS-DIR***]")
