18 November 2015

aggregate() Example

Compared to reduce() & fold(), the aggregate() function has the advantage, it can return different Type vis-a-vis the RDD Element Type(ie Input Element type)

Syntax
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.

In this example, the RDD element type is (String, Int) whereas the return type is Int

Example
scala> val inputrdd = sc.parallelize(
     |    List(
     |       ("maths", 21),
     |       ("english", 22),
     |       ("science", 31)
     |    ),
     |    3
     | )
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[14] at parallelize at :21

scala> inputrdd.partitions.size
res18: Int = 3

scala> val result = inputrdd.aggregate(3) (
     |    /*
     |     * This is a seqOp for merging T into a U
     |     * ie (String, Int) in  into Int
     |     * (we take (String, Int) in 'value' & return Int)
     |     * Arguments :
     |     * acc   :  Reprsents the accumulated result
     |     * value :  Represents the element in 'inputrdd'
     |     *          In our case this of type (String, Int)
     |     * Return value
     |     * We are returning an Int
     |     */
     |    (acc, value) => (acc + value._2),
     |
     |    /*
     |     * This is a combOp for mergining two U's
     |     * (ie 2 Int)
     |     */
     |    (acc1, acc2) => (acc1 + acc2)
     | )
result: Int = 86

The result is calculated as follows,

Partition 1 : Sum(all Elements) + 3 (Zero value)
Partition 2 : Sum(all Elements) + 3 (Zero value)
Partition 3 : Sum(all Elements) + 3 (Zero value)
Result = Partition1 + Partition2 + Partition3 + 3(Zero value)

So we get 21 + 22 + 31 + (4 * 3) = 86

Reference


Learning Spark : Page 40