Running Applications with CDS 3.2 Powered by Apache Spark

With CDS 3.2 Powered by Apache Spark (CDS 3.2), you can run Apache Spark 3 applications locally or distributed across a cluster, either by using an interactive shell or by submitting an application. Running Spark applications interactively is commonly performed during the data-exploration phase and for ad hoc analysis.

The Spark 3 job commands

With Spark 3, you use slightly different command names than with Spark 2, so that you can run both versions of Spark side-by-side without conflicts:

  • spark3-submit instead of spark-submit.

  • spark3-shell instead of spark-shell.

  • pyspark3 instead of pyspark.

For development and test purposes, you can also configure each host so that invoking the Spark 2 command name runs the corresponding Spark 3 executable.

Canary test for pyspark3 command

The following example shows a simple pyspark3 session that refers to the SparkContext, calls the collect() function which runs a Spark 3 job, and writes data to HDFS. This sequence of operations helps to check if there are obvious configuration issues that prevent Spark 3 jobs from working at all. For the HDFS path for the output directory, substitute a path that exists on your own system.

$ hdfs dfs -mkdir /user/jdoe/spark
$ pyspark3
...
SparkSession available as 'spark'.
>>> strings = ["one","two","three"]
>>> s2 = sc.parallelize(strings)
>>> s3 = s2.map(lambda word: word.upper())
>>> s3.collect()
['ONE', 'TWO', 'THREE']
>>> s3.saveAsTextFile('hdfs:///user/jdoe/spark/canary_test')
>>> quit()
$ hdfs dfs -ls /user/jdoe/spark
Found 1 items
drwxr-xr-x   - jdoe spark-users  0 2016-08-26 14:41 /user/jdoe/spark/canary_test
$ hdfs dfs -ls /user/jdoe/spark/canary_test
Found 3 items
-rw-r--r--   3 jdoe spark-users  0 2016-08-26 14:41 /user/jdoe/spark/canary_test/_SUCCESS
-rw-r--r--   3 jdoe spark-users  4 2016-08-26 14:41 /user/jdoe/spark/canary_test/part-00000
-rw-r--r--   3 jdoe spark-users 10 2016-08-26 14:41 /user/jdoe/spark/canary_test/part-00001
$ hdfs dfs -cat /user/jdoe/spark/canary_test/part-00000
ONE
$ hdfs dfs -cat /user/jdoe/spark/canary_test/part-00001
TWO
THREE

Fetching Spark 3 Maven Dependencies

The Maven coordinates are a combination of groupId, artifactId and version. The groupId and artifactId are the same as for the upstream Apache Spark project. For example, for spark-core, groupId is org.apache.spark, and artifactId is spark-core_2.12, both the same as the upstream project. The version is different for the Cloudera packaging: see Using the CDS 3 Powered by Apache Spark Maven Repository for the exact name depending on which release you are using.

Accessing the Spark 3 History Server

The Spark 3 history server is available on port 18089, rather than port 18088 as with the Spark 2 history server.

Running applications using CDS 3.2 with GPU Support

Running a Spark job using CDS 3.2 with GPU Support

  1. Log on to the node where you want to run the job.
  2. Run the following command to launch spark3-shell:
    spark3-shell  --conf "spark.rapids.sql.enabled=true" \
                  --conf "spark.executor.memoryOverhead=5g"
    where
    --conf spark.rapids.sql.enabled=true

    enables the following environment variables for GPUs:

    "spark.task.resource.gpu.amount" - sets GPU resource amount per task
    
    "spark.rapids.sql.concurrentGpuTasks" - sets the number of concurrent tasks per GPU
    "spark.sql.files.maxPartitionBytes" - sets the input partition size for DataSource API, The recommended value is "256m".
    "spark.locality.wait" - controls how long Spark waits to obtain better locality for tasks. 
    "spark.sql.adaptive.enabled" - enables Adaptive Query Execution.
    "spark.rapids.memory.pinnedPool.size" - sets the amount of pinned memory allocated per host.
    "spark.sql.adaptive.advisoryPartitionSizeInBytes" - sets the advisory size in bytes of the shuffle partition during adaptive optimization.
    For example,
    $SPARK_HOME/bin/spark3-shell \
           --conf spark.task.resource.gpu.amount=2 \
           --conf spark.rapids.sql.concurrentGpuTasks=2 \
           --conf spark.sql.files.maxPartitionBytes=256m \
           --conf spark.locality.wait=0s \
           --conf spark.sql.adaptive.enabled=true \
           --conf spark.rapids.memory.pinnedPool.size=2G \
           --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=1g
    --conf "spark.executor.memoryOverhead=5g"
    sets the amount of additional memory to be allocated per executor process
    You can override these configuration settings both from the command line and from code. For more information on environment variables, see the NVIDIA spark-rapids documentation and the Spark SQL Performance Tuning Guide.
  3. Run a job in spark3-shell.
    For example:
    scala> val df = sc.makeRDD(1 to 100000000, 6).toDF
    df: org.apache.spark.sql.DataFrame = [value: int]
    
    scala>val df2 = sc.makeRDD(1 to 100000000, 6).toDF
    df2: org.apache.spark.sql.DataFrame = [value: int]
    
    scala> df.select($"value" as "a").join(df2select($"value" as "b"), $"a" === $"b").count
    res0: Long = 100000000
  4. You can verify that the job run used GPUs, by logging on to the Yarn UI v2 to review the execution plan and the performance of your spark3-shell application:

    Select the Applications tab then select your [spark3-shell application]. Select ApplicationMaster > SQL > count at <console>:28 to see the execution plan.


    Execution plan with GPU usage enabled.

Running a Spark job using CDS 3.2 with GPU Support with UCX enabled

  1. Log on to the node where you want to run the job.
  2. Run the following command to launch spark3-shell:
    spark3-shell  --conf "spark.rapids.sql.enabled=true" \
                  --conf "spark.executor.memoryOverhead=5g"
                  --rapids-shuffle=true
    where
    --rapids-shuffle=true
    makes the following configuration changes for UCX:
    
    spark.shuffle.manager=com.nvidia.spark.rapids.spark320cdh.RapidsShuffleManager
    spark.executor.extraClassPath=/opt/cloudera/parcels/SPARK3_RAPIDS/lib/spark3/rapids-plugin/*
    spark.executorEnv.UCX_ERROR_SIGNALS=
    spark.executorEnv.UCX_MEMTYPE_CACHE=n
    For more information on environment variables, see the NVIDIA spark-rapids documentation.
  3. Run a job in spark3-shell.