Choosing Transformations to Minimize Shuffles

You can usually choose from many arrangements of actions and transformations that produce the same results. However, not all these arrangements result in the same performance. Avoiding common pitfalls and picking the right arrangement can significantly improve an application's performance.

When choosing an arrangement of transformations, minimize the number of shuffles and the amount of data shuffled. Shuffles are expensive operations; all shuffle data must be written to disk and then transferred over the network. repartition, join, cogroup, and any of the *By or *ByKey transformations can result in shuffles. Not all these transformations are equal, however, and you should avoid the following patterns:

  • groupByKey when performing an associative reductive operation. For example, rdd.groupByKey().mapValues(_.sum) produces the same result as rdd.reduceByKey(_ + _). However, the former transfers the entire dataset across the network, while the latter computes local sums for each key in each partition and combines those local sums into larger sums after shuffling.

  • reduceByKey when the input and output value types are different. For example, consider writing a transformation that finds all the unique strings corresponding to each key. You could use map to transform each element into a Set and then combine the Sets with reduceByKey: => (kv._1, new Set[String]() + kv._2)).reduceByKey(_ ++ _)

    This results in unnecessary object creation because a new set must be allocated for each record.

    Instead, use aggregateByKey, which performs the map-side aggregation more efficiently:

    val zero = new collection.mutable.Set[String]()
    rdd.aggregateByKey(zero)((set, v) => set += v,(set1, set2) => set1 ++= set2)
  • flatMap-join-groupBy. When two datasets are already grouped by key and you want to join them and keep them grouped, use cogroup. This avoids the overhead associated with unpacking and repacking the groups.