- scala> import org.apache.spark.storage.StorageLevel
- import org.apache.spark.storage.StorageLevel
-
- scala> val lines = sc.textFile("hdfs:///user/raj/data.txt", 3)
- lines: org.apache.spark.rdd.RDD[String] = hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28
-
- scala> // No of partitions
-
- scala> lines.partitions.size
- res0: Int = 3
-
- scala> // flatMap() : One of many transformation
-
- scala> val words = lines.flatMap(x => x.split(" "))
- words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:30
-
- scala> // Persist the data
-
- scala> val units = words.map ( word => (word, 1) ).
- | persist(StorageLevel.MEMORY_ONLY)
- units: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:32
-
- scala>
-
- scala> val counts = units.reduceByKey ( (x, y) => x + y )
- counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:34
-
- // Text file is read to compute the 'counts' RDD
- scala> counts.toDebugString
- res1: String =
- (3) ShuffledRDD[4] at reduceByKey at <console>:34 []
- +-(3) MapPartitionsRDD[3] at map at <console>:32 []
- | MapPartitionsRDD[2] at flatMap at <console>:30 []
- | hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28 []
- | hdfs:///user/raj/data.txt HadoopRDD[0] at textFile at <console>:28 []
-
- scala> // First Action
-
- scala> counts.collect()
- res2: Array[(String, Int)] = Array((another,1), (This,2), (is,2), (a,1), (test,2))
-
- scala> val counts2 = units.reduceByKey((x, y) => x * y)
- counts2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:34
-
- // Cache value is read to compute the 'counts2' RDD
- scala> counts2.toDebugString
- res3: String =
- (3) ShuffledRDD[5] at reduceByKey at <console>:34 []
- +-(3) MapPartitionsRDD[3] at map at <console>:32 []
- | CachedPartitions: 3; MemorySize: 696.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
- | MapPartitionsRDD[2] at flatMap at <console>:30 []
- | hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28 []
- | hdfs:///user/raj/data.txt HadoopRDD[0] at textFile at <console>:28 []
-
- scala> // Second Action
-
- scala> counts2.collect()
- res4: Array[(String, Int)] = Array((another,1), (This,1), (is,1), (a,1), (test,1))
-