Example
Multiple Pair RDDs can be combined using cogroup()
Multiple Pair RDDs can be combined using cogroup()
scala> val rdd1 = sc.parallelize(Seq( | ("key1", 1), | ("key2", 2), | ("key1", 3))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[25] at parallelize at:21 scala> val rdd2 = sc.parallelize(Seq( | ("key1", 5), | ("key2", 4))) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[26] at parallelize at :21 //cogroup() Example scala> val grouped = rdd1.cogroup(rdd2) grouped: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[28] at cogroup at :25 //Result scala> grouped.collect() res10: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((key1,(CompactBuffer(1, 3),CompactBuffer(5))), (key2,(CompactBuffer(2),CompactBuffer(4)))) // Iterate through each value in key // and increment the value by '1' scala> val updated = grouped.map{x => | { | val key = x._1 | //println("Key -> " + key) | val value = x._2 | val itl1 = value._1 | val itl2 = value._2 | val res1 = itl1.map{ x => | { | //println("It1 : Key -> " + key + ", Val -> " + (x + 1)) | x + 1 | } | } | val res2 = itl2.map{ x => | { | //println("It2 : Key -> " + key + ", Val -> " + (x + 1)) | x + 1 | } | } | //println("End") | (key, (res1, res2)) | } | } updated: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[18] at map at :33 scala> updated.collect() res17: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((key1,(List(2, 4),List(6))), (key2,(List(3),List(5))))