Sample Spark workload to access data

See an example of an end to end sample workload flow that describes the process for enabling a Spark session, connecting to the HMS REST Catalog server, and running a Spark engine query to access data.

PySpark workload example

The PySpark workload connects to Cloudera HMS REST Catalog with CLIENT_ID and CLIENT_SECRET as credentials and runs workloads from an external system like Databricks, Snowflake, Standalone Spark in Docker, and so on.

# © 2024 by Cloudera, Inc. All rights reserved.
# Scripts and sample code are licensed under the Apache License, 
# Version 2.0

import pyspark
from pyspark.sql import SparkSession
import argparse

parser = argparse.ArgumentParser(description="Spark WorkLoad Script")
parser.add_argument("--credential", help="ClientId:Secret")
args = parser.parse_args()

conf = (
    pyspark.SparkConf()
        .setAppName('Fetch Employees')
        .setMaster('local[*]')
        .set('spark.jars', '/opt/spark/jars/iceberg-spark-runtime-3.5_2.13-1.5.2.jar')
        .set('spark.files', '/opt/spark/conf/log4j2.properties')
        #packages
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.13-1.5.2.jar')
        #SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
        #Configuring Catalog
        .set('spark.sql.defaultCatalog', 'demo')
        .set('spark.sql.catalog.demo', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.demo.type', 'rest')
        .set('spark.sql.catalog.demo.uri', 'https://<datalake-hostname>/<datalake-name>/cdp-share-access/hms-api/icecli')
        .set('spark.sql.catalog.demo.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        .set('spark.sql.catalog.demo.s3.client-factory-impl', 'org.apache.iceberg.aws.s3.DefaultS3FileIOAwsClientFactory')
        .set('spark.sql.catalog.demo.credential', args.credential)
        .set('spark.sqlja.catalog.demo.default-namespace', 'emp_data')
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")
print("Spark Job Running...")
print("####### Credential: #######")
print(args.credential)

## list databases
dblist=spark.catalog.listDatabases()
print("###### List Databases ########")
print(dblist)
spark.sparkContext.parallelize([dblist]).coalesce(1).saveAsTextFile("file:///opt/spark/out/databases")

## list tables
tableList=spark.catalog.listTables("demo.emp_data")
print("###### List Tables #######")
print(tableList)
spark.sparkContext.parallelize([tableList]).coalesce(1).saveAsTextFile("file:///opt/spark/out/tables")

## Run a Query
print("##### Query: fetch all the employees of 'department -> d006' ######")
results=spark.sql("select emp_data.employees.first_name, emp_data.employees.last_name, emp_data.departments.dept_name "
                  "from emp_data.employees, emp_data.departments, emp_data.dept_emp "
                  "where emp_data.employees.emp_no=emp_data.dept_emp.emp_no "
                  "and emp_data.dept_emp.dept_no=emp_data.departments.dept_no "
                  "and emp_data.departments.dept_no='d006'")
print(results.show())
results.coalesce(1).write.option("header", "true").csv("file:///opt/spark/out/query_results")