scala> import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel scala> val lines = sc.textFile("hdfs:///user/raj/data.txt", 3) lines: org.apache.spark.rdd.RDD[String] = hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28 scala> // No of partitions scala> lines.partitions.size res0: Int = 3 scala> // flatMap() : One of many transformation scala> val words = lines.flatMap(x => x.split(" ")) words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:30 scala> // Persist the data scala> val units = words.map ( word => (word, 1) ). | persist(StorageLevel.MEMORY_ONLY) units: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:32 scala> scala> val counts = units.reduceByKey ( (x, y) => x + y ) counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:34 // Text file is read to compute the 'counts' RDD scala> counts.toDebugString res1: String = (3) ShuffledRDD[4] at reduceByKey at <console>:34 [] +-(3) MapPartitionsRDD[3] at map at <console>:32 [] | MapPartitionsRDD[2] at flatMap at <console>:30 [] | hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28 [] | hdfs:///user/raj/data.txt HadoopRDD[0] at textFile at <console>:28 [] scala> // First Action scala> counts.collect() res2: Array[(String, Int)] = Array((another,1), (This,2), (is,2), (a,1), (test,2)) scala> val counts2 = units.reduceByKey((x, y) => x * y) counts2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:34 // Cache value is read to compute the 'counts2' RDD scala> counts2.toDebugString res3: String = (3) ShuffledRDD[5] at reduceByKey at <console>:34 [] +-(3) MapPartitionsRDD[3] at map at <console>:32 [] | CachedPartitions: 3; MemorySize: 696.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B | MapPartitionsRDD[2] at flatMap at <console>:30 [] | hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28 [] | hdfs:///user/raj/data.txt HadoopRDD[0] at textFile at <console>:28 [] scala> // Second Action scala> counts2.collect() res4: Array[(String, Int)] = Array((another,1), (This,1), (is,1), (a,1), (test,1))
18 June 2016
A Word Count Example with Cached Partition
Broadcast Variable Example
scala> // Sending a value from Driver to Worker Nodes without scala> // using Broadcast variable scala> val input = sc.parallelize(List(1, 2, 3)) input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:27 scala> val localVal = 2 localVal: Int = 2 scala> val added = input.map( x => x + localVal) added: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[18] at map at <console>:31 scala> added.foreach(println) 4 3 5 scala> //** Local variable is once again transferred to worked nodes scala> // for the next operation scala> val multiplied = input.map( x => x * 2) multiplied: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at map at <console>:29 scala> multiplied.foreach(println) 4 6 2
scala> // Sending a read-only value using Broadcast variable scala> // Can be used to send large read-only values to all worker scala> // nodes efficiently scala> val broadcastVar = sc.broadcast(2) broadcastVar: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(14) scala> val added = input.map(x => broadcastVar.value + x) added: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at map at <console>:31 scala> added.foreach(println) 5 3 4 scala> val multiplied = input.map(x => broadcastVar.value * x) multiplied: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[21] at map at <console>:31 scala> multiplied.foreach(println) 6 4 2 scala>
28 May 2016
A Word Count example using 'spark-shell'
[raj@Rajkumars-MacBook-Pro ~]$spark-shell --master local[*] 2016-05-28 15:37:24.325 java[3907:6309927] Unable to load realm info from SCDynamicStore Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.6.1 /_/ Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_45) Type in expressions to have them evaluated. Type :help for more information. Spark context available as sc. SQL context available as sqlContext. scala> val lines = sc.parallelize(List("This is a word", "This is another word"), 7) lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at:27 scala> // No of partitions scala> lines.partitions.size res0: Int = 7 scala> // flatMap() : One of many transformation scala> val words = lines.flatMap(line => line.split(" ")) words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at flatMap at :29 scala> val units = words.map ( word => (word, 1) ) units: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[2] at map at :31 scala> val counts = units.reduceByKey ( (x, y) => x + y ) counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at :33 scala> counts.toDebugString res1: String = (7) ShuffledRDD[3] at reduceByKey at :33 [] +-(7) MapPartitionsRDD[2] at map at :31 [] | MapPartitionsRDD[1] at flatMap at :29 [] | ParallelCollectionRDD[0] at parallelize at :27 [] scala> // collect() : One of many actions scala> counts.collect() res2: Array[(String, Int)] = Array((This,2), (is,2), (another,1), (a,1), (word,2))
04 May 2016
Accumulator : Example
Note : Use Accumulator only in action to get correct values. Do not use Accumulator in Transformation ; Use it only for debugging purpose in Transformation
scala> val input = sc.parallelize(List(1, 2, 3, 4, 5, | 6, 7, 8, 9, 10, | 11, 12, 13, 14, 15 | )) input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27 scala> println("No of partitions -> " + input.partitions.size) No of partitions -> 8 scala> val myAccum = sc.accumulator(0, "My Accumulator") myAccum: org.apache.spark.Accumulator[Int] = 0 scala> // Used inside an action scala> input.foreach{ x => | //Thread.sleep(50000) | myAccum += 1 | } scala> println("myAccum -> " + myAccum.value) myAccum -> 15c15 > a15
Subscribe to:
Posts (Atom)