Choosing Transformations to Minimize Shuffles
You can usually choose from many arrangements of actions and transformations that produce the same results. However, not all these arrangements result in the same performance. Avoiding common pitfalls and picking the right arrangement can significantly improve an application's performance.
When choosing an arrangement of transformations, minimize the number of shuffles and
the amount of data shuffled. Shuffles are expensive operations; all shuffle data must
be written to disk and then transferred over the network.
repartition, join, cogroup, and any
of the *By or *ByKey transformations can result in
shuffles. Not all these transformations are equal, however, and you should avoid the
following patterns:
-
groupByKeywhen performing an associative reductive operation. For example,rdd.groupByKey().mapValues(_.sum)produces the same result asrdd.reduceByKey(_ + _). However, the former transfers the entire dataset across the network, while the latter computes local sums for each key in each partition and combines those local sums into larger sums after shuffling. -
reduceByKeywhen the input and output value types are different. For example, consider writing a transformation that finds all the unique strings corresponding to each key. You could usemapto transform each element into aSetand then combine theSets withreduceByKey:rdd.map(kv => (kv._1, new Set[String]() + kv._2)).reduceByKey(_ ++ _)This results in unnecessary object creation because a new set must be allocated for each record.
Instead, use
aggregateByKey, which performs the map-side aggregation more efficiently:val zero = new collection.mutable.Set[String]() rdd.aggregateByKey(zero)((set, v) => set += v,(set1, set2) => set1 ++= set2) -
flatMap-join-groupBy. When two datasets are already grouped by key and you want to join them and keep them grouped, usecogroup. This avoids the overhead associated with unpacking and repacking the groups.
