Sample Spark workload to access data

See an example of an end to end sample workload flow that describes the process for enabling a Spark session, connecting to the HMS REST Catalog server, and running a Spark engine query to access data.

Before you run the workload

Obtain the OAuth client ID and client secret for your Data Catalog consumer from the Manage Users export (CSV columns clientId and secret). Pass them to the sample script as a single argument, ClientId:Secret (a colon between the two values, no spaces).

From a shell, run your PySpark script and supply that value for --credential. Replace your_script.py with your script filename and substitute your real client credentials for the placeholder.

python your_script.py --credential '[***CLIENT-ID:CLIENT-SECRET***]'

PySpark workload example

The PySpark workload connects to Cloudera HMS REST Catalog with CLIENT_ID and CLIENT_SECRET as credentials and runs workloads from an external system like Databricks, Snowflake, Standalone Spark in Docker, and so on. The following example expects the same ClientId:Secret string to be passed in at run time via args.credential (see the previous section).

# © 2024 by Cloudera, Inc. All rights reserved.
# Scripts and sample code are licensed under the Apache License, 
# Version 2.0

import pyspark
from pyspark.sql import SparkSession
import argparse

parser = argparse.ArgumentParser(description="Spark WorkLoad Script")
parser.add_argument("--credential", help="ClientId:Secret")
args = parser.parse_args()

conf = (
    pyspark.SparkConf()
        .setAppName('Fetch Employees')
        .setMaster('[***SPARK-MASTER-URL***]')
        .set('spark.jars', '[***PATH-TO-ICEBERG-SPARK-RUNTIME-JAR***]')
        .set('spark.files', '[***PATH-TO-LOG4J2-PROPERTIES***]')
        #packages
        .set('spark.jars.packages', '[***ICEBERG-SPARK-RUNTIME-PACKAGES***]')
        #SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
        #Configuring Catalog
        .set('spark.sql.defaultCatalog', 'demo')
        .set('spark.sql.catalog.demo', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.demo.type', 'rest')
        .set('spark.sql.catalog.demo.uri', 'https://[***DATALAKE-HOSTNAME***]/[***DATALAKE-NAME***]/cdp-datashare-access/hms-api')
        .set('spark.sql.catalog.demo.io-impl', '[***ICEBERG-FILE-IO-IMPL***]')
        .set('spark.sql.catalog.demo.s3.client-factory-impl', '[***ICEBERG-S3-FILE-IO-CLIENT-FACTORY-IMPL***]')
        .set('spark.sql.catalog.demo.credential', args.credential)
        .set('spark.sql.catalog.demo.default-namespace', '[***DEFAULT-NAMESPACE***]')
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")
print("Spark Job Running...")
print("####### Credential: #######")
print(args.credential)

## list databases
dblist=spark.catalog.listDatabases()
print("###### List Databases ########")
print(dblist)
spark.sparkContext.parallelize([dblist]).coalesce(1).saveAsTextFile("file:///[***OUTPUT-DATABASES-DIR***]")

## list tables
tableList=spark.catalog.listTables("demo.[***DEFAULT-NAMESPACE***]")
print("###### List Tables #######")
print(tableList)
spark.sparkContext.parallelize([tableList]).coalesce(1).saveAsTextFile("file:///[***OUTPUT-TABLES-DIR***]")

## Run a Query
print("##### Query: fetch all the employees of 'department -> d006' ######")
results=spark.sql("select [***DEFAULT-NAMESPACE***].employees.first_name, [***DEFAULT-NAMESPACE***].employees.last_name, [***DEFAULT-NAMESPACE***].departments.dept_name "
                  "from [***DEFAULT-NAMESPACE***].employees, [***DEFAULT-NAMESPACE***].departments, [***DEFAULT-NAMESPACE***].dept_emp "
                  "where [***DEFAULT-NAMESPACE***].employees.emp_no=[***DEFAULT-NAMESPACE***].dept_emp.emp_no "
                  "and [***DEFAULT-NAMESPACE***].dept_emp.dept_no=[***DEFAULT-NAMESPACE***].departments.dept_no "
                  "and [***DEFAULT-NAMESPACE***].departments.dept_no='[***SAMPLE-DEPT-NO***]'")
print(results.show())
results.coalesce(1).write.option("header", "true").csv("file:///[***OUTPUT-QUERY-RESULTS-DIR***]")