18 June 2016

A Word Count Example with Cached Partition

  1. scala> import org.apache.spark.storage.StorageLevel
  2. import org.apache.spark.storage.StorageLevel
  3.  
  4. scala> val lines = sc.textFile("hdfs:///user/raj/data.txt", 3)
  5. lines: org.apache.spark.rdd.RDD[String] = hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28
  6.  
  7. scala> // No of partitions
  8.  
  9. scala> lines.partitions.size
  10. res0: Int = 3
  11.  
  12. scala> // flatMap() : One of many transformation
  13.  
  14. scala> val words = lines.flatMap(x => x.split(" "))
  15. words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:30
  16.  
  17. scala> // Persist the data
  18.  
  19. scala> val units = words.map ( word => (word, 1) ).
  20. | persist(StorageLevel.MEMORY_ONLY)
  21. units: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:32
  22.  
  23. scala>
  24.  
  25. scala> val counts = units.reduceByKey ( (x, y) => x + y )
  26. counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:34
  27.  
  28. // Text file is read to compute the 'counts' RDD
  29. scala> counts.toDebugString
  30. res1: String =
  31. (3) ShuffledRDD[4] at reduceByKey at <console>:34 []
  32. +-(3) MapPartitionsRDD[3] at map at <console>:32 []
  33. | MapPartitionsRDD[2] at flatMap at <console>:30 []
  34. | hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28 []
  35. | hdfs:///user/raj/data.txt HadoopRDD[0] at textFile at <console>:28 []
  36.  
  37. scala> // First Action
  38.  
  39. scala> counts.collect()
  40. res2: Array[(String, Int)] = Array((another,1), (This,2), (is,2), (a,1), (test,2))
  41.  
  42. scala> val counts2 = units.reduceByKey((x, y) => x * y)
  43. counts2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:34
  44.  
  45. // Cache value is read to compute the 'counts2' RDD
  46. scala> counts2.toDebugString
  47. res3: String =
  48. (3) ShuffledRDD[5] at reduceByKey at <console>:34 []
  49. +-(3) MapPartitionsRDD[3] at map at <console>:32 []
  50. | CachedPartitions: 3; MemorySize: 696.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
  51. | MapPartitionsRDD[2] at flatMap at <console>:30 []
  52. | hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28 []
  53. | hdfs:///user/raj/data.txt HadoopRDD[0] at textFile at <console>:28 []
  54.  
  55. scala> // Second Action
  56.  
  57. scala> counts2.collect()
  58. res4: Array[(String, Int)] = Array((another,1), (This,1), (is,1), (a,1), (test,1))
  59.  

Broadcast Variable Example

  1. scala> // Sending a value from Driver to Worker Nodes without
  2.  
  3. scala> // using Broadcast variable
  4.  
  5. scala> val input = sc.parallelize(List(1, 2, 3))
  6. input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:27
  7.  
  8. scala> val localVal = 2
  9. localVal: Int = 2
  10.  
  11. scala> val added = input.map( x => x + localVal)
  12. added: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[18] at map at <console>:31
  13.  
  14. scala> added.foreach(println)
  15. 4
  16. 3
  17. 5
  18.  
  19. scala> //** Local variable is once again transferred to worked nodes
  20.  
  21. scala> // for the next operation
  22.  
  23. scala> val multiplied = input.map( x => x * 2)
  24. multiplied: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at map at <console>:29
  25.  
  26. scala> multiplied.foreach(println)
  27. 4
  28. 6
  29. 2
  30.  
  1. scala> // Sending a read-only value using Broadcast variable
  2.  
  3. scala> // Can be used to send large read-only values to all worker
  4.  
  5. scala> // nodes efficiently
  6.  
  7. scala> val broadcastVar = sc.broadcast(2)
  8. broadcastVar: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(14)
  9.  
  10. scala> val added = input.map(x => broadcastVar.value + x)
  11. added: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at map at <console>:31
  12.  
  13. scala> added.foreach(println)
  14. 5
  15. 3
  16. 4
  17.  
  18. scala> val multiplied = input.map(x => broadcastVar.value * x)
  19. multiplied: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[21] at map at <console>:31
  20.  
  21. scala> multiplied.foreach(println)
  22. 6
  23. 4
  24. 2
  25.  
  26. scala>
  27.  

28 May 2016

A Word Count example using 'spark-shell'

  1. [raj@Rajkumars-MacBook-Pro ~]$spark-shell --master local[*]
  2. 2016-05-28 15:37:24.325 java[3907:6309927] Unable to load realm info from SCDynamicStore
  3. Welcome to
  4. ____ __
  5. / __/__ ___ _____/ /__
  6. _\ \/ _ \/ _ `/ __/ '_/
  7. /___/ .__/\_,_/_/ /_/\_\ version 1.6.1
  8. /_/
  9.  
  10. Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_45)
  11. Type in expressions to have them evaluated.
  12. Type :help for more information.
  13. Spark context available as sc.
  14. SQL context available as sqlContext.
  15.  
  16. scala> val lines = sc.parallelize(List("This is a word", "This is another word"), 7)
  17. lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at :27
  18. scala> // No of partitions
  19. scala> lines.partitions.size
  20. res0: Int = 7
  21. scala> // flatMap() : One of many transformation
  22. scala> val words = lines.flatMap(line => line.split(" "))
  23. words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at flatMap at :29
  24. scala> val units = words.map ( word => (word, 1) )
  25. units: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[2] at map at :31
  26. scala> val counts = units.reduceByKey ( (x, y) => x + y )
  27. counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at :33
  28. scala> counts.toDebugString
  29. res1: String =
  30. (7) ShuffledRDD[3] at reduceByKey at :33 []
  31. +-(7) MapPartitionsRDD[2] at map at :31 []
  32. | MapPartitionsRDD[1] at flatMap at :29 []
  33. | ParallelCollectionRDD[0] at parallelize at :27 []
  34. scala> // collect() : One of many actions
  35. scala> counts.collect()
  36. res2: Array[(String, Int)] = Array((This,2), (is,2), (another,1), (a,1), (word,2))

04 May 2016

Label 4...


Accumulator : Example

Note : Use Accumulator only in action to get correct values. Do not use Accumulator in Transformation ; Use it only for debugging purpose in Transformation
  1. scala> val input = sc.parallelize(List(1, 2, 3, 4, 5,
  2. | 6, 7, 8, 9, 10,
  3. | 11, 12, 13, 14, 15
  4. | ))
  5. input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
  6.  
  7. scala> println("No of partitions -> " + input.partitions.size)
  8. No of partitions -> 8
  9.  
  10. scala> val myAccum = sc.accumulator(0, "My Accumulator")
  11. myAccum: org.apache.spark.Accumulator[Int] = 0
  12.  
  13. scala> // Used inside an action
  14.  
  15. scala> input.foreach{ x =>
  16. | //Thread.sleep(50000)
  17. | myAccum += 1
  18. | }
  19.  
  20. scala> println("myAccum -> " + myAccum.value)
  21. myAccum -> 15
c15 > a15