Choosing Transformations to Minimize Shuffles
You can usually choose from many arrangements of actions and transformations that produce the same results. However, not all these arrangements result in the same performance. Avoiding common pitfalls and picking the right arrangement can significantly improve an application's performance.
When choosing an arrangement of transformations, minimize the number of shuffles and
the amount of data shuffled. Shuffles are expensive operations; all shuffle data must
be written to disk and then transferred over the network.
cogroup, and any
*ByKey transformations can result in
shuffles. Not all these transformations are equal, however, and you should avoid the
groupByKeywhen performing an associative reductive operation. For example,
rdd.groupByKey().mapValues(_.sum)produces the same result as
rdd.reduceByKey(_ + _). However, the former transfers the entire dataset across the network, while the latter computes local sums for each key in each partition and combines those local sums into larger sums after shuffling.
reduceByKeywhen the input and output value types are different. For example, consider writing a transformation that finds all the unique strings corresponding to each key. You could use
mapto transform each element into a
Setand then combine the
rdd.map(kv => (kv._1, new Set[String]() + kv._2)).reduceByKey(_ ++ _)
This results in unnecessary object creation because a new set must be allocated for each record.
aggregateByKey, which performs the map-side aggregation more efficiently:
val zero = new collection.mutable.Set[String]() rdd.aggregateByKey(zero)((set, v) => set += v,(set1, set2) => set1 ++= set2)
flatMap-join-groupBy. When two datasets are already grouped by key and you want to join them and keep them grouped, use
cogroup. This avoids the overhead associated with unpacking and repacking the groups.