Tuning the Number of Partitions
Spark has limited capacity to determine optimal parallelism. Every Spark stage has a number of tasks, each of which processes data sequentially. The number of tasks per stage is the most important parameter in determining performance.
As described in "Spark Execution Model," Spark groups datasets into stages. The number of tasks in a stage is the same as the number of partitions in the last dataset in the stage. The number of partitions in a dataset is the same as the number of partitions in the datasets on which it depends, with the following exceptions:
coalescetransformation creates a dataset with fewer partitions than its parent dataset.
uniontransformation creates a dataset with the sum of its parents' number of partitions.
cartesiantransformation creates a dataset with the product of its parents' number of partitions.
Datasets with no parents, such as those produced by
hadoopFile, have their partitions determined by the underlying
InputFormat used. Typically, there is a partition for each
HDFS block being read. The number of partitions for datasets produced by
parallelize are specified in the method, or
spark.default.parallelism if not specified. To determine the number
of partitions in an dataset, call
If the number of tasks is smaller than number of slots available to run them, CPU
usage is suboptimal. In addition, more memory is used by any aggregation operations
that occur in each task. In
*ByKey operations, objects are held in hashmaps or in-memory buffers
to group or sort.
groupByKey use these data structures in the tasks for the stages that
are on the fetching side of the shuffles they trigger.
aggregateByKey use data structures in the tasks for the stages on
both sides of the shuffles they trigger. If the records in these aggregation
operations exceed memory, the following issues can occur:
- Increased garbage collection, which can lead to pauses in computation.
- Spilling data to disk, causing disk I/O and sorting, which leads to job stalls.
To increase the number of partitions if the stage is reading from Hadoop:
repartitiontransformation, which triggers a shuffle.
InputFormatto create more splits.
- Write the input data to HDFS with a smaller block size.
If the stage is receiving input from another stage, the transformation that triggered
the stage boundary accepts a
val rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X)
Determining the optimal value for X requires experimentation. Find the number of partitions in the parent dataset, and then multiply that by 1.5 until performance stops improving.
(spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/ spark.executor.cores
safetyFractiondefault to 0.2 and 0.8 respectively.
The in-memory size of the total shuffle data is more difficult to determine. The closest heuristic is to find the ratio between shuffle spill memory and the shuffle spill disk for a stage that ran. Then, multiply the total shuffle write by this number. However, this can be compounded if the stage is performing a reduction:
(observed shuffle write) * (observed shuffle spill memory) * (spark.executor.cores)/ (observed shuffle spill disk) * (spark.executor.memory) * (spark.shuffle.memoryFraction) * (spark.shuffle.safetyFraction)
Then, round up slightly, because too many partitions is usually better than too few.
When in doubt, err on the side of a larger number of tasks (and thus partitions). This contrasts with recommendations for MapReduce, which unlike Spark, has a high startup overhead for tasks.