Tuning the Number of Partitions
Spark has limited capacity to determine optimal parallelism. Every Spark stage has a number of tasks, each of which processes data sequentially. The number of tasks per stage is the most important parameter in determining performance.
As described in "Spark Execution Model," Spark groups datasets into stages. The number of tasks in a stage is the same as the number of partitions in the last dataset in the stage. The number of partitions in a dataset is the same as the number of partitions in the datasets on which it depends, with the following exceptions:
-
The
coalesce
transformation creates a dataset with fewer partitions than its parent dataset. -
The
union
transformation creates a dataset with the sum of its parents' number of partitions. -
The
cartesian
transformation creates a dataset with the product of its parents' number of partitions.
Datasets with no parents, such as those produced by textFile
or
hadoopFile
, have their partitions determined by the underlying
MapReduce InputFormat
used. Typically, there is a partition for each
HDFS block being read. The number of partitions for datasets produced by
parallelize
are specified in the method, or
spark.default.parallelism
if not specified. To determine the number
of partitions in an dataset, call rdd.partitions().size()
.
If the number of tasks is smaller than number of slots available to run them, CPU
usage is suboptimal. In addition, more memory is used by any aggregation operations
that occur in each task. In join
, cogroup
, or
*ByKey
operations, objects are held in hashmaps or in-memory buffers
to group or sort. join
, cogroup
, and
groupByKey
use these data structures in the tasks for the stages that
are on the fetching side of the shuffles they trigger. reduceByKey
and aggregateByKey
use data structures in the tasks for the stages on
both sides of the shuffles they trigger. If the records in these aggregation
operations exceed memory, the following issues can occur:
- Increased garbage collection, which can lead to pauses in computation.
- Spilling data to disk, causing disk I/O and sorting, which leads to job stalls.
To increase the number of partitions if the stage is reading from Hadoop:
-
Use the
repartition
transformation, which triggers a shuffle. -
Configure your
InputFormat
to create more splits. - Write the input data to HDFS with a smaller block size.
If the stage is receiving input from another stage, the transformation that triggered
the stage boundary accepts a numPartitions
argument:
val rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X)
Determining the optimal value for X requires experimentation. Find the number of partitions in the parent dataset, and then multiply that by 1.5 until performance stops improving.
(spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/
spark.executor.cores
memoryFraction
and safetyFraction
default to 0.2 and
0.8 respectively.
The in-memory size of the total shuffle data is more difficult to determine. The closest heuristic is to find the ratio between shuffle spill memory and the shuffle spill disk for a stage that ran. Then, multiply the total shuffle write by this number. However, this can be compounded if the stage is performing a reduction:
(observed shuffle write) * (observed shuffle spill memory) * (spark.executor.cores)/
(observed shuffle spill disk) * (spark.executor.memory) * (spark.shuffle.memoryFraction) * (spark.shuffle.safetyFraction)
Then, round up slightly, because too many partitions is usually better than too few.
When in doubt, err on the side of a larger number of tasks (and thus partitions). This contrasts with recommendations for MapReduce, which unlike Spark, has a high startup overhead for tasks.