Choosing Transformations to Minimize Shuffles
You can usually choose from many arrangements of actions and transformations that produce the same results. However, not all these arrangements result in the same performance. Avoiding common pitfalls and picking the right arrangement can significantly improve an application's performance.
When choosing an arrangement of transformations, minimize the number of shuffles and
the amount of data shuffled. Shuffles are expensive operations; all shuffle data must
be written to disk and then transferred over the network.
repartition
, join
, cogroup
, and any
of the *By
or *ByKey
transformations can result in
shuffles. Not all these transformations are equal, however, and you should avoid the
following patterns:
-
groupByKey
when performing an associative reductive operation. For example,rdd.groupByKey().mapValues(_.sum)
produces the same result asrdd.reduceByKey(_ + _)
. However, the former transfers the entire dataset across the network, while the latter computes local sums for each key in each partition and combines those local sums into larger sums after shuffling. -
reduceByKey
when the input and output value types are different. For example, consider writing a transformation that finds all the unique strings corresponding to each key. You could usemap
to transform each element into aSet
and then combine theSet
s withreduceByKey
:rdd.map(kv => (kv._1, new Set[String]() + kv._2)).reduceByKey(_ ++ _)
This results in unnecessary object creation because a new set must be allocated for each record.
Instead, use
aggregateByKey
, which performs the map-side aggregation more efficiently:val zero = new collection.mutable.Set[String]() rdd.aggregateByKey(zero)((set, v) => set += v,(set1, set2) => set1 ++= set2)
-
flatMap-join-groupBy
. When two datasets are already grouped by key and you want to join them and keep them grouped, usecogroup
. This avoids the overhead associated with unpacking and repacking the groups.