18 June 2016

A Word Count Example with Cached Partition

  1. scala> import org.apache.spark.storage.StorageLevel
  2. import org.apache.spark.storage.StorageLevel
  3.  
  4. scala> val lines = sc.textFile("hdfs:///user/raj/data.txt", 3)
  5. lines: org.apache.spark.rdd.RDD[String] = hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28
  6.  
  7. scala> // No of partitions
  8.  
  9. scala> lines.partitions.size
  10. res0: Int = 3
  11.  
  12. scala> // flatMap() : One of many transformation
  13.  
  14. scala> val words = lines.flatMap(x => x.split(" "))
  15. words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:30
  16.  
  17. scala> // Persist the data
  18.  
  19. scala> val units = words.map ( word => (word, 1) ).
  20. | persist(StorageLevel.MEMORY_ONLY)
  21. units: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:32
  22.  
  23. scala>
  24.  
  25. scala> val counts = units.reduceByKey ( (x, y) => x + y )
  26. counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:34
  27.  
  28. // Text file is read to compute the 'counts' RDD
  29. scala> counts.toDebugString
  30. res1: String =
  31. (3) ShuffledRDD[4] at reduceByKey at <console>:34 []
  32. +-(3) MapPartitionsRDD[3] at map at <console>:32 []
  33. | MapPartitionsRDD[2] at flatMap at <console>:30 []
  34. | hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28 []
  35. | hdfs:///user/raj/data.txt HadoopRDD[0] at textFile at <console>:28 []
  36.  
  37. scala> // First Action
  38.  
  39. scala> counts.collect()
  40. res2: Array[(String, Int)] = Array((another,1), (This,2), (is,2), (a,1), (test,2))
  41.  
  42. scala> val counts2 = units.reduceByKey((x, y) => x * y)
  43. counts2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:34
  44.  
  45. // Cache value is read to compute the 'counts2' RDD
  46. scala> counts2.toDebugString
  47. res3: String =
  48. (3) ShuffledRDD[5] at reduceByKey at <console>:34 []
  49. +-(3) MapPartitionsRDD[3] at map at <console>:32 []
  50. | CachedPartitions: 3; MemorySize: 696.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
  51. | MapPartitionsRDD[2] at flatMap at <console>:30 []
  52. | hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28 []
  53. | hdfs:///user/raj/data.txt HadoopRDD[0] at textFile at <console>:28 []
  54.  
  55. scala> // Second Action
  56.  
  57. scala> counts2.collect()
  58. res4: Array[(String, Int)] = Array((another,1), (This,1), (is,1), (a,1), (test,1))
  59.