05 December 2015

Combiner in Pair RDDs : combineByKey()

Similar to combiner in MapReduce, when working with key/value pairs, combineByKey() interface can be used to customize the combiner functionality. Methods like reduceByKey() by default use their own combiner to combine the data locally in each Partition, for a given key

Similar to aggregate()(which is used with single element RDD), combineByKey() allows user to return different RDD element type compared to the element type of Input RDD

def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
Generic function to combine the elements for each key using a custom set of aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C Note that V and C can be different -- for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:

- createCombiner, which turns a V into a C (e.g., creates a one-element list) - mergeValue, to merge a V into a C (e.g., adds it to the end of a list) - mergeCombiners, to combine two C's into a single one.

In addition, users can control the partitioning of the output RDD, and whether to perform map-side aggregation (if a mapper can produce multiple items with the same key).

1st Argument : createCombiner is called when a key(in the RDD element) is found for the first time in a given Partition. This method creates an initial value for the accumulator for that key
2nd Argument : mergeValue is called when the key already has an accumulator
3rd Argument : mergeCombiners is called when more that one partition has accumulator for the same key


Let us calculate the average in each subject using combineByKey()
scala> val inputrdd = sc.parallelize(Seq(
     |                    ("maths", 50), ("maths", 60),
     |                    ("english", 65),
     |                    ("physics", 66), ("physics", 61), ("physics", 87)), 
     |                    1)
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:27

scala> inputrdd.getNumPartitions                      
res55: Int = 1

scala> val reduced = inputrdd.combineByKey(
     |     (mark) => {
     |       println(s"Create combiner -> ${mark}")
     |       (mark, 1)
     |     },
     |     (acc: (Int, Int), v) => {
     |       println(s"""Merge value : (${acc._1} + ${v}, ${acc._2} + 1)""")
     |       (acc._1 + v, acc._2 + 1)
     |     },
     |     (acc1: (Int, Int), acc2: (Int, Int)) => {
     |       println(s"""Merge Combiner : (${acc1._1} + ${acc2._1}, ${acc1._2} + ${acc2._2})""")
     |       (acc1._1 + acc2._1, acc1._2 + acc2._2)
     |     }
     | )
reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[42] at combineByKey at <console>:29

scala> reduced.collect()
Create combiner -> 50
Merge value : (50 + 60, 1 + 1)
Create combiner -> 65
Create combiner -> 66
Merge value : (66 + 61, 1 + 1)
Merge value : (127 + 87, 2 + 1)
res56: Array[(String, (Int, Int))] = Array((maths,(110,2)), (physics,(214,3)), (english,(65,1)))

scala> val result = reduced.mapValues(x => x._1 / x._2.toFloat)
result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[43] at mapValues at <console>:31

scala> result.collect()
res57: Array[(String, Float)] = Array((maths,55.0), (physics,71.333336), (english,65.0))


The map side aggregation done using combineByKey() can also be disabled(which is the case with methods like groupByKey() where the functionality of the combiner is not needed)


Learning Spark : Page 54