Example
Multiple Pair RDDs can be combined using cogroup()
Multiple Pair RDDs can be combined using cogroup()
scala> val rdd1 = sc.parallelize(Seq(
| ("key1", 1),
| ("key2", 2),
| ("key1", 3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[25] at parallelize at :21
scala> val rdd2 = sc.parallelize(Seq(
| ("key1", 5),
| ("key2", 4)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[26] at parallelize at :21
//cogroup() Example
scala> val grouped = rdd1.cogroup(rdd2)
grouped: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[28] at cogroup at :25
//Result
scala> grouped.collect()
res10: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((key1,(CompactBuffer(1, 3),CompactBuffer(5))), (key2,(CompactBuffer(2),CompactBuffer(4))))
// Iterate through each value in key
// and increment the value by '1'
scala> val updated = grouped.map{x =>
| {
| val key = x._1
| //println("Key -> " + key)
| val value = x._2
| val itl1 = value._1
| val itl2 = value._2
| val res1 = itl1.map{ x =>
| {
| //println("It1 : Key -> " + key + ", Val -> " + (x + 1))
| x + 1
| }
| }
| val res2 = itl2.map{ x =>
| {
| //println("It2 : Key -> " + key + ", Val -> " + (x + 1))
| x + 1
| }
| }
| //println("End")
| (key, (res1, res2))
| }
| }
updated: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[18] at map at :33
scala> updated.collect()
res17: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((key1,(List(2, 4),List(6))), (key2,(List(3),List(5))))