17 November 2015

reduce() & fold() Examples


reduce(<function type>) takes a Function Type ; which takes 2 elements of RDD Element Type as argument & returns the Element of same type

def reduce(f: (T, T) ⇒ T): T
Reduces the elements of this RDD using the specified associative binary operator.

scala> val rdd1 = sc.parallelize(List(1, 2, 5)) 
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :21

scala> val sum = rdd1.reduce{ (x, y) => x + y}
sum: Int = 8

fold() is similar to reduce except that it takes an 'Zero value'(Think of it as a kind of initial value) which will be used in the initial call on each Partition

def fold(zeroValue: T)(op: (T, T) ⇒ T): T
Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value". The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.
(?? : Modifying t1 is not working..although this example works)
(?? : What does 'op:' denotes. In some functions(ex : reduce) the function type is mentioned as 'f:')

scala> val rdd1 = sc.parallelize(List(
     | ("maths", 80),
     | ("science", 90)
     | ))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at parallelize at :21

scala> rdd1.partitions.length
res8: Int = 8

scala> val additionalMarks = ("extra", 4)
additionalMarks: (String, Int) = (extra,4)

scala> val sum = rdd1.fold(additionalMarks){ (acc, marks) =>
     |    val sum = acc._2 + marks._2
     |    ("total", sum)
     | }
sum: (String, Int) = (total,206)

Note here the Partition length is 8. So in each partition, after the RDD elements are added, the 'Zero value of 4' is also added(ie 8 * 4 = 32). When the result of each partition is added, once again a value of 4 is added to the result. Therefore we have ((8 * 4) + 4), totally 36.

Therefore  result = 80 + 90 + 36 = 206

Disadvantage : This disadvantage of both reduce() & fold() is, the return type should be the same as the RDD element type. aggregate() function can be used to avoid this limitation.

Note : foldByKey() is very similar to fold() except that it operates on a Pair RDD


Learning Spark : Page 38