Running Applications with CDS 3.2 Powered by Apache Spark
With CDS 3.2 Powered by Apache Spark (CDS 3.2), you can run Apache Spark 3 applications locally or distributed across a cluster, either by using an interactive shell or by submitting an application. Running Spark applications interactively is commonly performed during the data-exploration phase and for ad hoc analysis.
The Spark 3 job commands
With Spark 3, you use slightly different command names than with Spark 2, so that you can run both versions of Spark side-by-side without conflicts:
For development and test purposes, you can also configure each host so that invoking the Spark 2 command name runs the corresponding Spark 3 executable.
Canary test for pyspark3 command
The following example shows a simple
that refers to the SparkContext, calls the
function which runs a Spark 3 job, and writes data to HDFS. This
sequence of operations helps to check if there are obvious configuration
issues that prevent Spark 3 jobs from working at all. For the HDFS path
for the output directory, substitute a path that exists on your own
$ hdfs dfs -mkdir /user/jdoe/spark $ pyspark3 ... SparkSession available as 'spark'. >>> strings = ["one","two","three"] >>> s2 = sc.parallelize(strings) >>> s3 = s2.map(lambda word: word.upper()) >>> s3.collect() ['ONE', 'TWO', 'THREE'] >>> s3.saveAsTextFile('hdfs:///user/jdoe/spark/canary_test') >>> quit() $ hdfs dfs -ls /user/jdoe/spark Found 1 items drwxr-xr-x - jdoe spark-users 0 2016-08-26 14:41 /user/jdoe/spark/canary_test $ hdfs dfs -ls /user/jdoe/spark/canary_test Found 3 items -rw-r--r-- 3 jdoe spark-users 0 2016-08-26 14:41 /user/jdoe/spark/canary_test/_SUCCESS -rw-r--r-- 3 jdoe spark-users 4 2016-08-26 14:41 /user/jdoe/spark/canary_test/part-00000 -rw-r--r-- 3 jdoe spark-users 10 2016-08-26 14:41 /user/jdoe/spark/canary_test/part-00001 $ hdfs dfs -cat /user/jdoe/spark/canary_test/part-00000 ONE $ hdfs dfs -cat /user/jdoe/spark/canary_test/part-00001 TWO THREE
Fetching Spark 3 Maven Dependencies
The Maven coordinates are a combination of groupId, artifactId and
version. The groupId and artifactId are the same as for the upstream
Apache Spark project. For example, for
org.apache.spark, and artifactId is
spark-core_2.12, both the same as the upstream
project. The version is different for the Cloudera packaging: see Using the CDS 3 Powered by Apache Spark
Maven Repository for the exact name depending on which release
you are using.
Accessing the Spark 3 History Server
The Spark 3 history server is available on port 18089, rather than port 18088 as with the Spark 2 history server.
Running applications using CDS 3.2 with GPU Support
Running a Spark job using CDS 3.2 with GPU Support
- Log on to the node where you want to run the job.
- Run the following command to launch
spark3-shell --conf "spark.rapids.sql.enabled=true" \ --conf "spark.executor.memoryOverhead=5g"
enables the following environment variables for GPUs:
"spark.task.resource.gpu.amount" - sets GPU resource amount per task "spark.rapids.sql.concurrentGpuTasks" - sets the number of concurrent tasks per GPU "spark.sql.files.maxPartitionBytes" - sets the input partition size for DataSource API, The recommended value is "256m". "spark.locality.wait" - controls how long Spark waits to obtain better locality for tasks. "spark.sql.adaptive.enabled" - enables Adaptive Query Execution. "spark.rapids.memory.pinnedPool.size" - sets the amount of pinned memory allocated per host. "spark.sql.adaptive.advisoryPartitionSizeInBytes" - sets the advisory size in bytes of the shuffle partition during adaptive optimization.
$SPARK_HOME/bin/spark3-shell \ --conf spark.task.resource.gpu.amount=2 \ --conf spark.rapids.sql.concurrentGpuTasks=2 \ --conf spark.sql.files.maxPartitionBytes=256m \ --conf spark.locality.wait=0s \ --conf spark.sql.adaptive.enabled=true \ --conf spark.rapids.memory.pinnedPool.size=2G \ --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=1g
- sets the amount of additional memory to be allocated per executor process
- Run a job in spark3-shell.For example:
scala> val df = sc.makeRDD(1 to 100000000, 6).toDF df: org.apache.spark.sql.DataFrame = [value: int] scala>val df2 = sc.makeRDD(1 to 100000000, 6).toDF df2: org.apache.spark.sql.DataFrame = [value: int] scala> df.select($"value" as "a").join(df2select($"value" as "b"), $"a" === $"b").count res0: Long = 100000000
- You can verify that the job run used GPUs, by logging on to the Yarn UI v2 to review the
execution plan and the performance of your spark3-shell application:
Select the Applications tab then select your [spark3-shell application]. Select to see the execution plan.
Running a Spark job using CDS 3.2 with GPU Support with UCX enabled
- Log on to the node where you want to run the job.
- Run the following command to launch spark3-shell:
spark3-shell --conf "spark.rapids.sql.enabled=true" \ --conf "spark.executor.memoryOverhead=5g" --rapids-shuffle=true
- makes the following configuration changes for
spark.shuffle.manager=com.nvidia.spark.rapids.spark320cdh.RapidsShuffleManager spark.executor.extraClassPath=/opt/cloudera/parcels/SPARK3_RAPIDS/lib/spark3/rapids-plugin/* spark.executorEnv.UCX_ERROR_SIGNALS= spark.executorEnv.UCX_MEMTYPE_CACHE=n
- Run a job in spark3-shell.