06 December 2015

Setting Partitioner for RDD

When doing Join on Pair RDDs, if one of the dataset we are using is a Master data, it makes a lot of sense to persist the data, as we do not want the RDD being created every time an action associated with the dataset is executed.

Apart from enabling persistence for the master dataset, we can also avoid the shuffling of the master dataset by providing a Partitioner

Example
scala> val masterdata =
     |    sc.
     |    parallelize(Seq(
     |       ("math",    55),
     |       ("math",    56),
     |       ("english", 57),
     |       ("english", 58),
     |       ("science", 59),
     |       ("science", 54))).
     |    partitionBy(new HashPartitioner(100)).
     |    persist()
masterdata: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[42] at partitionBy at :31

scala> masterdata.partitioner
res16: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@64)

scala> masterdata.partitions.length
res17: Int = 100

Following are the operations, in which shuffling can be avoided/minimized by using a Partitioner
- cogroup()
- groupWith()
- join()
- leftOuterJoin()
- rightOuterJoin()
- groupByKey()
- reduceByKey()
- combineByKey()
- lookup()

Operations that preserves the Partitioner of the Parent RDD (or) result in Partitioner being set
- cogroup()
- groupWith()
- join()
- leftOuterJoin()
- rightOuterJoin()
- groupByKey()
- reduceByKey()
- combineByKey()
- partitionBy()
- sort()
- mapValues()

Operations that will not set a Partitioner
- map()

Reference


Learning Spark : Partitioning : 63, 66