aggregate() Example

Compared to reduce() & fold(), the aggregate() function has the advantage, it can return different Type vis-a-vis the RDD Element Type(ie Input Element type)

Syntax
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.

In this example, the RDD element type is (String, Int) whereas the return type is Int

Example
scala> val inputrdd = sc.parallelize(
     |    List(
     |       ("maths", 21),
     |       ("english", 22),
     |       ("science", 31)
     |    ),
     |    3
     | )
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[14] at parallelize at :21

scala> inputrdd.partitions.size
res18: Int = 3

scala> val result = inputrdd.aggregate(3) (
     |    /*
     |     * This is a seqOp for merging T into a U
     |     * ie (String, Int) in  into Int
     |     * (we take (String, Int) in 'value' & return Int)
     |     * Arguments :
     |     * acc   :  Reprsents the accumulated result
     |     * value :  Represents the element in 'inputrdd'
     |     *          In our case this of type (String, Int)
     |     * Return value
     |     * We are returning an Int
     |     */
     |    (acc, value) => (acc + value._2),
     |
     |    /*
     |     * This is a combOp for mergining two U's
     |     * (ie 2 Int)
     |     */
     |    (acc1, acc2) => (acc1 + acc2)
     | )
result: Int = 86

The result is calculated as follows,

Partition 1 : Sum(all Elements) + 3 (Zero value)
Partition 2 : Sum(all Elements) + 3 (Zero value)
Partition 3 : Sum(all Elements) + 3 (Zero value)
Result = Partition1 + Partition2 + Partition3 + 3(Zero value)

So we get 21 + 22 + 31 + (4 * 3) = 86

Reference


Learning Spark : Page 40

8 comments:

  1. In the final calculation, how did we get 4*3? Isn't it 3*3 since we have 3 partitions?

    ReplyDelete
  2. Hi Vinay, Based on my understanding, Each partition has its own accumulator. so like what u have said, the total of zero value for 3 Partitions is 3 * (zero value) => 3 * 3
    Once the results are calculated in each Partition, the Accumulator result needs to be combined ; which is done by the combOp. Believe compOp also adds a zero value ; so we get 4 * 3 instead of 3 * 3

    ReplyDelete
  3. I really appreciate information shared above. It’s of great help. If someone want to learn Online (Virtual) instructor led live training in Apache Scala, kindly contact us http://www.maxmunus.com/contact
    MaxMunus Offer World Class Virtual Instructor led training on Apache Scala. We have industry expert trainer. We provide Training Material and Software Support. MaxMunus has successfully conducted 100000+ trainings in India, USA, UK, Australlia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain and UAE etc.
    For Demo Contact us.
    Nitesh Kumar
    MaxMunus
    E-mail: nitesh@maxmunus.com
    Skype id: nitesh_maxmunus
    Ph:(+91) 8553912023
    http://www.maxmunus.com/


    ReplyDelete
  4. I really appreciate information shared above. It’s of great help. If someone want to learn Online (Virtual) instructor lead live training in APACHE SPARK , kindly contact us http://www.maxmunus.com/contact
    MaxMunus Offer World Class Virtual Instructor led training On APACHE SPARK . We have industry expert trainer. We provide Training Material and Software Support. MaxMunus has successfully conducted 100000+ trainings in India, USA, UK, Australlia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain and UAE etc.
    For Demo Contact us.
    Saurabh Srivastava
    MaxMunus
    E-mail: saurabh@maxmunus.com
    Skype id: saurabhmaxmunus
    Ph:+91 8553576305 / 080 - 41103383
    http://www.maxmunus.com/


    ReplyDelete
  5. Nice post ! Thanks for sharing valuable information with us. Keep sharing.. Big data hadoop online Training Bangalore

    ReplyDelete
  6. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    apache spark training in electronic city

    ReplyDelete