When to Add a Shuffle Transformation
The rule of minimizing the number of shuffles has some exceptions.
An extra shuffle can be advantageous when it increases parallelism. For example, if
your data arrives in a few large unsplittable files, the partitioning dictated by the
InputFormat
might place large numbers of records in each partition,
while not generating enough partitions to use all available cores. In this case,
invoking repartition with a high number of partitions (which triggers a shuffle) after
loading the data allows the transformations that follow to use more of the cluster's
CPU.
Another example arises when using the reduce
or
aggregate
action to aggregate data into the driver. When aggregating
over a high number of partitions, the computation can quickly become bottlenecked on a
single thread in the driver merging all the results together. To lighten the load on
the driver, first use reduceByKey
or aggregateByKey
to perform a round of distributed aggregation that divides the dataset into a smaller
number of partitions. The values in each partition are merged with each other in
parallel, before being sent to the driver for a final round of aggregation. See
treeReduce
and
treeAggregate
for examples of how to do that.
This method is especially useful when the aggregation is already grouped by a key. For
example, consider an application that counts the occurrences of each word in a corpus
and pulls the results into the driver as a map. One approach, which can be
accomplished with the aggregate
action, is to compute a local map at
each partition and then merge the maps at the driver. The alternative approach, which
can be accomplished with aggregateByKey
, is to perform the count in a
fully distributed way, and then simply collectAsMap
the results to
the driver.