When Shuffles Do Not Occur
In some circumstances, the transformations described previously do not result in shuffles. Spark does not shuffle when a previous transformation has already partitioned the data according to the same partitioner. Consider the following flow:
rdd1 = someRdd.reduceByKey(...)
rdd2 = someOtherRdd.reduceByKey(...)
rdd3 = rdd1.join(rdd2)
Because no partitioner is passed to reduceByKey, the default
partitioner is used, resulting in rdd1 and rdd2 both
being hash-partitioned. These two reduceByKey transformations result
in two shuffles. If the datasets have the same number of partitions, a join requires
no additional shuffling. Because the datasets are partitioned identically, the set of
keys in any single partition of rdd1 can only occur in a single
partition of rdd2. Therefore, the contents of any single output
partition of rdd3 depends only on the contents of a single partition
in rdd1 and single partition in rdd2, and a third
shuffle is not required.
For example, if someRdd has four partitions,
someOtherRdd has two partitions, and both the
reduceByKeys use three partitions, the set of tasks that run would
look like this:

If rdd1 and rdd2 use different partitioners or use
the default (hash) partitioner with different numbers of partitions, only one of the
datasets (the one with the fewer number of partitions) needs to be reshuffled for the
join:

To avoid shuffles when joining two datasets, you can use broadcast variables. When one of the datasets is small enough to fit in memory in a single executor, it can be loaded into a hash table on the driver and then broadcast to every executor. A map transformation can then reference the hash table to do lookups.
