Running applications with CDS 3.1 for GPUs

With CDS 3.1 for GPUs, you can run Apache Spark 3 applications locally or distributed across a cluster, either by using an interactive shell or by submitting an application. Running Spark applications interactively is commonly performed during the data-exploration phase and for ad hoc analysis.

The Spark 3 job commands

With Spark 3, you use slightly different command names than with Spark 2, so that you can run both versions of Spark side-by-side without conflicts:

  • spark3-submit instead of spark-submit.
  • spark3-shell instead of spark-shell.
  • pyspark3 instead of pyspark.

For development and test purposes, you can also configure an alias on each host so that invoking the Spark 2 command name runs the corresponding Spark 3 executable.

Running a Spark job with CDS 3.1 for GPUs

  1. Log on to the node where you want to run the job.
  2. Run the following command to launch spark3-shell:
    spark3-shell  --conf "spark.rapids.sql.enabled=true" \
                  --conf "spark.executor.memoryOverhead=5g"
    where
    --conf spark.rapids.sql.enabled=true

    enables the following environment variables for GPUs:

    "spark.task.resource.gpu.amount" - sets GPU resource amount per task
    
    "spark.rapids.sql.concurrentGpuTasks" - sets the number of concurrent tasks per GPU
    "spark.sql.files.maxPartitionBytes" - sets the input partition size for DataSource API, The recommended value is "256m".
    "spark.locality.wait" - controls how long Spark waits to obtain better locality for tasks. 
    "spark.sql.adaptive.enabled" - enables Adaptive Query Execution.
    "spark.rapids.memory.pinnedPool.size" - sets the amount of pinned memory allocated per host.
    "spark.sql.adaptive.advisoryPartitionSizeInBytes" - sets the advisory size in bytes of the shuffle partition during adaptive optimization.
    For example,
    $SPARK_HOME/bin/spark3-shell \
           --conf spark.task.resource.gpu.amount=2 \
           --conf spark.rapids.sql.concurrentGpuTasks=2 \
           --conf spark.sql.files.maxPartitionBytes=256m \
           --conf spark.locality.wait=0s \
           --conf spark.sql.adaptive.enabled=true \
           --conf spark.rapids.memory.pinnedPool.size=2G \
           --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=1g
    --conf "spark.executor.memoryOverhead=5g"
    sets the amount of additional memory to be allocated per executor process
    You can override these configuration settings both from the command line and from code. For more information on environment variables, see the NVIDIA spark-rapids documentation and the Spark SQL Performance Tuning Guide.
  3. Run a job in spark3-shell.
    For example:
    scala> val df = sc.makeRDD(1 to 100000000, 6).toDF
    df: org.apache.spark.sql.DataFrame = [value: int]
    
    scala>val df2 = sc.makeRDD(1 to 100000000, 6).toDF
    df2: org.apache.spark.sql.DataFrame = [value: int]
    
    scala> df.select($"value" as "a").join(df2select($"value" as "b"), $"a" === $"b").count
    res0: Long = 100000000
  4. You can verify that the job run used GPUs, by logging on to the Yarn UI v2 to review the execution plan and the performance of your spark3-shell application:

    Select the Applications tab then select your [spark3-shell application]. Select ApplicationMaster > SQL > count at <console>:28 to see the execution plan.


    Execution plan with GPU usage enabled.

Accessing the Spark 3 History Server

The Spark 3 history server is available on port 18089, rather than port 18088 as with the Spark 2 history server.