Sample Spark workload to access data

See an example of an end to end sample workload flow that describes the process for enabling a Spark session, connecting to the HMS REST Catalog server, and running a Spark engine query to access data.

Preparing to run the workload

Perform the following steps before you run the workload:
  1. Obtain the OAuth Client ID and Client Secret for your Data Catalog consumer from the clientId and secret columns of the Manage Users export CSV file.
  2. Pass the Client ID and Client Secret to the sample script as a single argument, ClientId:Secret using a colon between the two values, not spaces.
  3. From a shell, run your PySpark script and supply that value for the --credential parameter.

    Replace your_script.py with your script filename and substitute your real client credentials for the placeholder.

    python your_script.py --credential '[***CLIENT-ID:CLIENT-SECRET***]'

PySpark workload example

The PySpark workload connects to Cloudera HMS REST Catalog using the CLIENT_ID and CLIENT_SECRET as credentials and runs workloads from an external system such as Databricks, Snowflake, or Standalone Spark in Docker. The following example script requires the ClientId:Secret string to be passed at runtime using the args.credential parameter.

# © 2024 by Cloudera, Inc. All rights reserved.
# Scripts and sample code are licensed under the Apache License, 
# Version 2.0

import pyspark
from pyspark.sql import SparkSession
import argparse

parser = argparse.ArgumentParser(description="Spark WorkLoad Script")
parser.add_argument("--credential", help="ClientId:Secret")
args = parser.parse_args()

conf = (
    pyspark.SparkConf()
        .setAppName('Fetch Employees')
        .setMaster('[***SPARK-MASTER-URL***]')
        .set('spark.jars', '[***PATH-TO-ICEBERG-SPARK-RUNTIME-JAR***]')
        .set('spark.files', '[***PATH-TO-LOG4J2-PROPERTIES***]')
        #packages
        .set('spark.jars.packages', '[***ICEBERG-SPARK-RUNTIME-PACKAGES***]')
        #SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
        #Configuring Catalog
        .set('spark.sql.defaultCatalog', 'demo')
        .set('spark.sql.catalog.demo', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.demo.type', 'rest')
        .set('spark.sql.catalog.demo.uri', 'https://[***DATALAKE-HOSTNAME***]/[***DATALAKE-NAME***]/cdp-datashare-access/hms-api')
        .set('spark.sql.catalog.demo.io-impl', '[***ICEBERG-FILE-IO-IMPL***]')
        .set('spark.sql.catalog.demo.s3.client-factory-impl', '[***ICEBERG-S3-FILE-IO-CLIENT-FACTORY-IMPL***]')
        .set('spark.sql.catalog.demo.credential', args.credential)
        .set('spark.sql.catalog.demo.default-namespace', '[***DEFAULT-NAMESPACE***]')
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")
print("Spark Job Running...")
print("####### Credential: #######")
print(args.credential)

## list databases
dblist=spark.catalog.listDatabases()
print("###### List Databases ########")
print(dblist)
spark.sparkContext.parallelize([dblist]).coalesce(1).saveAsTextFile("file:///[***OUTPUT-DATABASES-DIR***]")

## list tables
tableList=spark.catalog.listTables("demo.[***DEFAULT-NAMESPACE***]")
print("###### List Tables #######")
print(tableList)
spark.sparkContext.parallelize([tableList]).coalesce(1).saveAsTextFile("file:///[***OUTPUT-TABLES-DIR***]")

## Run a Query
print("##### Query: fetch all the employees of 'department -> d006' ######")
results=spark.sql("select [***DEFAULT-NAMESPACE***].employees.first_name, [***DEFAULT-NAMESPACE***].employees.last_name, [***DEFAULT-NAMESPACE***].departments.dept_name "
                  "from [***DEFAULT-NAMESPACE***].employees, [***DEFAULT-NAMESPACE***].departments, [***DEFAULT-NAMESPACE***].dept_emp "
                  "where [***DEFAULT-NAMESPACE***].employees.emp_no=[***DEFAULT-NAMESPACE***].dept_emp.emp_no "
                  "and [***DEFAULT-NAMESPACE***].dept_emp.dept_no=[***DEFAULT-NAMESPACE***].departments.dept_no "
                  "and [***DEFAULT-NAMESPACE***].departments.dept_no='[***SAMPLE-DEPT-NO***]'")
print(results.show())
results.coalesce(1).write.option("header", "true").csv("file:///[***OUTPUT-QUERY-RESULTS-DIR***]")