05 December 2015

Combiner in Pair RDDs : combineByKey()

Similar to combiner in MapReduce, when working with key/value pairs, combineByKey() interface can be used to customize the combiner functionality. Methods like reduceByKey() by default use their own combiner to combine the data locally in each Partition, for a given key

Similar to aggregate()(which is used with single element RDD), combineByKey() allows user to return different RDD element type compared to the element type of Input RDD

Syntax
def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
Generic function to combine the elements for each key using a custom set of aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C Note that V and C can be different -- for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:

- createCombiner, which turns a V into a C (e.g., creates a one-element list) - mergeValue, to merge a V into a C (e.g., adds it to the end of a list) - mergeCombiners, to combine two C's into a single one.

In addition, users can control the partitioning of the output RDD, and whether to perform map-side aggregation (if a mapper can produce multiple items with the same key).

Here,
1st Argument : createCombiner is called when a key(in the RDD element) is found for the first time in a given Partition. This method creates an initial value for the accumulator for that key
2nd Argument : mergeValue is called when the key already has an accumulator
3rd Argument : mergeCombiners is called when more that one partition has accumulator for the same key

Example

Let us calculate the average in each subject using combineByKey()
scala> val inputrdd = sc.parallelize(Seq(
     |                    ("maths", 50), ("maths", 60),
     |                    ("english", 65),
     |                    ("physics", 66), ("physics", 61), ("physics", 87)), 
     |                    1)
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:27

scala> inputrdd.getNumPartitions                      
res55: Int = 1

scala> val reduced = inputrdd.combineByKey(
     |     (mark) => {
     |       println(s"Create combiner -> ${mark}")
     |       (mark, 1)
     |     },
     |     (acc: (Int, Int), v) => {
     |       println(s"""Merge value : (${acc._1} + ${v}, ${acc._2} + 1)""")
     |       (acc._1 + v, acc._2 + 1)
     |     },
     |     (acc1: (Int, Int), acc2: (Int, Int)) => {
     |       println(s"""Merge Combiner : (${acc1._1} + ${acc2._1}, ${acc1._2} + ${acc2._2})""")
     |       (acc1._1 + acc2._1, acc1._2 + acc2._2)
     |     }
     | )
reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[42] at combineByKey at <console>:29

scala> reduced.collect()
Create combiner -> 50
Merge value : (50 + 60, 1 + 1)
Create combiner -> 65
Create combiner -> 66
Merge value : (66 + 61, 1 + 1)
Merge value : (127 + 87, 2 + 1)
res56: Array[(String, (Int, Int))] = Array((maths,(110,2)), (physics,(214,3)), (english,(65,1)))

scala> val result = reduced.mapValues(x => x._1 / x._2.toFloat)
result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[43] at mapValues at <console>:31

scala> result.collect()
res57: Array[(String, Float)] = Array((maths,55.0), (physics,71.333336), (english,65.0))

Note

The map side aggregation done using combineByKey() can also be disabled(which is the case with methods like groupByKey() where the functionality of the combiner is not needed)

Reference


Learning Spark : Page 54